Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/sshpeer.py @ 36398:043e77f3be09
sshpeer: return framed file object when needed
Currently, wireproto.wirepeer has a default implementation of
_submitbatch() and sshv1peer has a very similar implementation.
The main difference is that sshv1peer is aware of the total amount
of bytes it can read whereas the default implementation reads the
stream until no more data is returned. The default implementation
works for HTTP, since there is a known end to HTTP responses (either
Content-Length or 0 sized chunk).
This commit teaches sshv1peer to use our just-introduced "cappedreader"
class for wrapping a file object to limit the number of bytes that
can be read. We do this by introducing an argument to specify whether
the response is framed. If set, we returned a cappedreader instance
instead of the raw pipe.
_call() always has framed responses. So we set this argument
unconditionally and then .read() the entirety of the result.
Strictly speaking, we don't need to use cappedreader in this case
and can inline frame decoding/read logic. But I like when things
are consistent. The overhead should be negligible.
_callstream() and _callcompressable() are special: whether framing
is used depends on the specific command. So, we define a set
of commands that have framed response. It currently only
contains "batch."
As a result of this change, the one-off implementation of
_submitbatch() in sshv1peer can be removed since it is now
safe to .read() the response's file object until end of stream.
cappedreader takes care of not overrunning the frame.
Differential Revision: https://phab.mercurial-scm.org/D2380
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 21 Feb 2018 08:35:48 -0800 |
parents | a34d5ef53c2e |
children | 066e6a9d52bb |
comparison
equal
deleted
inserted
replaced
36397:a34d5ef53c2e | 36398:043e77f3be09 |
---|---|
347 self._pipeo = stdin | 347 self._pipeo = stdin |
348 self._pipei = stdout | 348 self._pipei = stdout |
349 self._pipee = stderr | 349 self._pipee = stderr |
350 self._caps = caps | 350 self._caps = caps |
351 | 351 |
352 # Commands that have a "framed" response where the first line of the | |
353 # response contains the length of that response. | |
354 _FRAMED_COMMANDS = { | |
355 'batch', | |
356 } | |
357 | |
352 # Begin of _basepeer interface. | 358 # Begin of _basepeer interface. |
353 | 359 |
354 @util.propertycache | 360 @util.propertycache |
355 def ui(self): | 361 def ui(self): |
356 return self._ui | 362 return self._ui |
389 def _cleanup(self): | 395 def _cleanup(self): |
390 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee) | 396 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee) |
391 | 397 |
392 __del__ = _cleanup | 398 __del__ = _cleanup |
393 | 399 |
394 def _submitbatch(self, req): | 400 def _sendrequest(self, cmd, args, framed=False): |
395 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req)) | |
396 available = self._getamount() | |
397 # TODO this response parsing is probably suboptimal for large | |
398 # batches with large responses. | |
399 toread = min(available, 1024) | |
400 work = rsp.read(toread) | |
401 available -= toread | |
402 chunk = work | |
403 while chunk: | |
404 while ';' in work: | |
405 one, work = work.split(';', 1) | |
406 yield wireproto.unescapearg(one) | |
407 toread = min(available, 1024) | |
408 chunk = rsp.read(toread) | |
409 available -= toread | |
410 work += chunk | |
411 yield wireproto.unescapearg(work) | |
412 | |
413 def _sendrequest(self, cmd, args): | |
414 if (self.ui.debugflag | 401 if (self.ui.debugflag |
415 and self.ui.configbool('devel', 'debug.peer-request')): | 402 and self.ui.configbool('devel', 'debug.peer-request')): |
416 dbg = self.ui.debug | 403 dbg = self.ui.debug |
417 line = 'devel-peer-request: %s\n' | 404 line = 'devel-peer-request: %s\n' |
418 dbg(line % cmd) | 405 dbg(line % cmd) |
442 self._pipeo.write(dv) | 429 self._pipeo.write(dv) |
443 else: | 430 else: |
444 self._pipeo.write(v) | 431 self._pipeo.write(v) |
445 self._pipeo.flush() | 432 self._pipeo.flush() |
446 | 433 |
434 # We know exactly how many bytes are in the response. So return a proxy | |
435 # around the raw output stream that allows reading exactly this many | |
436 # bytes. Callers then can read() without fear of overrunning the | |
437 # response. | |
438 if framed: | |
439 amount = self._getamount() | |
440 return util.cappedreader(self._pipei, amount) | |
441 | |
447 return self._pipei | 442 return self._pipei |
448 | 443 |
449 def _callstream(self, cmd, **args): | 444 def _callstream(self, cmd, **args): |
450 args = pycompat.byteskwargs(args) | 445 args = pycompat.byteskwargs(args) |
451 return self._sendrequest(cmd, args) | 446 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS) |
452 | 447 |
453 def _callcompressable(self, cmd, **args): | 448 def _callcompressable(self, cmd, **args): |
454 args = pycompat.byteskwargs(args) | 449 args = pycompat.byteskwargs(args) |
455 return self._sendrequest(cmd, args) | 450 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS) |
456 | 451 |
457 def _call(self, cmd, **args): | 452 def _call(self, cmd, **args): |
458 args = pycompat.byteskwargs(args) | 453 args = pycompat.byteskwargs(args) |
459 self._sendrequest(cmd, args) | 454 return self._sendrequest(cmd, args, framed=True).read() |
460 return self._readframed() | |
461 | 455 |
462 def _callpush(self, cmd, fp, **args): | 456 def _callpush(self, cmd, fp, **args): |
463 r = self._call(cmd, **args) | 457 r = self._call(cmd, **args) |
464 if r: | 458 if r: |
465 return '', r | 459 return '', r |