comparison mercurial/sshpeer.py @ 36367:043e77f3be09

sshpeer: return framed file object when needed Currently, wireproto.wirepeer has a default implementation of _submitbatch() and sshv1peer has a very similar implementation. The main difference is that sshv1peer is aware of the total amount of bytes it can read whereas the default implementation reads the stream until no more data is returned. The default implementation works for HTTP, since there is a known end to HTTP responses (either Content-Length or 0 sized chunk). This commit teaches sshv1peer to use our just-introduced "cappedreader" class for wrapping a file object to limit the number of bytes that can be read. We do this by introducing an argument to specify whether the response is framed. If set, we returned a cappedreader instance instead of the raw pipe. _call() always has framed responses. So we set this argument unconditionally and then .read() the entirety of the result. Strictly speaking, we don't need to use cappedreader in this case and can inline frame decoding/read logic. But I like when things are consistent. The overhead should be negligible. _callstream() and _callcompressable() are special: whether framing is used depends on the specific command. So, we define a set of commands that have framed response. It currently only contains "batch." As a result of this change, the one-off implementation of _submitbatch() in sshv1peer can be removed since it is now safe to .read() the response's file object until end of stream. cappedreader takes care of not overrunning the frame. Differential Revision: https://phab.mercurial-scm.org/D2380
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 21 Feb 2018 08:35:48 -0800
parents a34d5ef53c2e
children 066e6a9d52bb
comparison
equal deleted inserted replaced
36366:a34d5ef53c2e 36367:043e77f3be09
347 self._pipeo = stdin 347 self._pipeo = stdin
348 self._pipei = stdout 348 self._pipei = stdout
349 self._pipee = stderr 349 self._pipee = stderr
350 self._caps = caps 350 self._caps = caps
351 351
352 # Commands that have a "framed" response where the first line of the
353 # response contains the length of that response.
354 _FRAMED_COMMANDS = {
355 'batch',
356 }
357
352 # Begin of _basepeer interface. 358 # Begin of _basepeer interface.
353 359
354 @util.propertycache 360 @util.propertycache
355 def ui(self): 361 def ui(self):
356 return self._ui 362 return self._ui
389 def _cleanup(self): 395 def _cleanup(self):
390 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee) 396 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
391 397
392 __del__ = _cleanup 398 __del__ = _cleanup
393 399
394 def _submitbatch(self, req): 400 def _sendrequest(self, cmd, args, framed=False):
395 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
396 available = self._getamount()
397 # TODO this response parsing is probably suboptimal for large
398 # batches with large responses.
399 toread = min(available, 1024)
400 work = rsp.read(toread)
401 available -= toread
402 chunk = work
403 while chunk:
404 while ';' in work:
405 one, work = work.split(';', 1)
406 yield wireproto.unescapearg(one)
407 toread = min(available, 1024)
408 chunk = rsp.read(toread)
409 available -= toread
410 work += chunk
411 yield wireproto.unescapearg(work)
412
413 def _sendrequest(self, cmd, args):
414 if (self.ui.debugflag 401 if (self.ui.debugflag
415 and self.ui.configbool('devel', 'debug.peer-request')): 402 and self.ui.configbool('devel', 'debug.peer-request')):
416 dbg = self.ui.debug 403 dbg = self.ui.debug
417 line = 'devel-peer-request: %s\n' 404 line = 'devel-peer-request: %s\n'
418 dbg(line % cmd) 405 dbg(line % cmd)
442 self._pipeo.write(dv) 429 self._pipeo.write(dv)
443 else: 430 else:
444 self._pipeo.write(v) 431 self._pipeo.write(v)
445 self._pipeo.flush() 432 self._pipeo.flush()
446 433
434 # We know exactly how many bytes are in the response. So return a proxy
435 # around the raw output stream that allows reading exactly this many
436 # bytes. Callers then can read() without fear of overrunning the
437 # response.
438 if framed:
439 amount = self._getamount()
440 return util.cappedreader(self._pipei, amount)
441
447 return self._pipei 442 return self._pipei
448 443
449 def _callstream(self, cmd, **args): 444 def _callstream(self, cmd, **args):
450 args = pycompat.byteskwargs(args) 445 args = pycompat.byteskwargs(args)
451 return self._sendrequest(cmd, args) 446 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
452 447
453 def _callcompressable(self, cmd, **args): 448 def _callcompressable(self, cmd, **args):
454 args = pycompat.byteskwargs(args) 449 args = pycompat.byteskwargs(args)
455 return self._sendrequest(cmd, args) 450 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
456 451
457 def _call(self, cmd, **args): 452 def _call(self, cmd, **args):
458 args = pycompat.byteskwargs(args) 453 args = pycompat.byteskwargs(args)
459 self._sendrequest(cmd, args) 454 return self._sendrequest(cmd, args, framed=True).read()
460 return self._readframed()
461 455
462 def _callpush(self, cmd, fp, **args): 456 def _callpush(self, cmd, fp, **args):
463 r = self._call(cmd, **args) 457 r = self._call(cmd, **args)
464 if r: 458 if r:
465 return '', r 459 return '', r