Mercurial > public > mercurial-scm > hg
comparison mercurial/wireprotoframing.py @ 39560:84bf6ded9317
wireprotoframing: buffer emitted data to reduce frame count
An upcoming commit introduces a wire protocol command that can emit
hundreds of thousands of small objects. Without a buffering layer,
we would emit a single, small frame for every object. Performance
profiling revealed this to be a source of significant overhead for
both client and server.
This commit introduces a very crude buffering layer so that we emit
fewer, bigger frames in such a scenario. This code will likely get
rewritten in the future to be part of the streams API, as we'll
need a similar strategy for compressing data. I don't want to think
about it too much at the moment though.
server
before: user 32.500+0.000 sys 1.160+0.000
after: user 20.230+0.010 sys 0.180+0.000
client
before: user 133.400+0.000 sys 93.120+0.000
after: user 68.370+0.000 sys 32.950+0.000
This appears to indicate we have significant overhead in the frame
processing code on both client and server. It might be worth profiling
that at some point...
Differential Revision: https://phab.mercurial-scm.org/D4473
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 29 Aug 2018 16:43:17 -0700 |
parents | 07b58266bce3 |
children | bce1c1af7518 |
comparison
equal
deleted
inserted
replaced
39559:07b58266bce3 | 39560:84bf6ded9317 |
---|---|
508 | 508 |
509 yield stream.makeframe(requestid=requestid, | 509 yield stream.makeframe(requestid=requestid, |
510 typeid=FRAME_TYPE_TEXT_OUTPUT, | 510 typeid=FRAME_TYPE_TEXT_OUTPUT, |
511 flags=0, | 511 flags=0, |
512 payload=payload) | 512 payload=payload) |
513 | |
514 class bufferingcommandresponseemitter(object): | |
515 """Helper object to emit command response frames intelligently. | |
516 | |
517 Raw command response data is likely emitted in chunks much smaller | |
518 than what can fit in a single frame. This class exists to buffer | |
519 chunks until enough data is available to fit in a single frame. | |
520 | |
521 TODO we'll need something like this when compression is supported. | |
522 So it might make sense to implement this functionality at the stream | |
523 level. | |
524 """ | |
525 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): | |
526 self._stream = stream | |
527 self._requestid = requestid | |
528 self._maxsize = maxframesize | |
529 self._chunks = [] | |
530 self._chunkssize = 0 | |
531 | |
532 def send(self, data): | |
533 """Send new data for emission. | |
534 | |
535 Is a generator of new frames that were derived from the new input. | |
536 | |
537 If the special input ``None`` is received, flushes all buffered | |
538 data to frames. | |
539 """ | |
540 | |
541 if data is None: | |
542 for frame in self._flush(): | |
543 yield frame | |
544 return | |
545 | |
546 # There is a ton of potential to do more complicated things here. | |
547 # Our immediate goal is to coalesce small chunks into big frames, | |
548 # not achieve the fewest number of frames possible. So we go with | |
549 # a simple implementation: | |
550 # | |
551 # * If a chunk is too large for a frame, we flush and emit frames | |
552 # for the new chunk. | |
553 # * If a chunk can be buffered without total buffered size limits | |
554 # being exceeded, we do that. | |
555 # * If a chunk causes us to go over our buffering limit, we flush | |
556 # and then buffer the new chunk. | |
557 | |
558 if len(data) > self._maxsize: | |
559 for frame in self._flush(): | |
560 yield frame | |
561 | |
562 # Now emit frames for the big chunk. | |
563 offset = 0 | |
564 while True: | |
565 chunk = data[offset:offset + self._maxsize] | |
566 offset += len(chunk) | |
567 | |
568 yield self._stream.makeframe( | |
569 self._requestid, | |
570 typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
571 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | |
572 payload=chunk) | |
573 | |
574 if offset == len(data): | |
575 return | |
576 | |
577 # If we don't have enough to constitute a full frame, buffer and | |
578 # return. | |
579 if len(data) + self._chunkssize < self._maxsize: | |
580 self._chunks.append(data) | |
581 self._chunkssize += len(data) | |
582 return | |
583 | |
584 # Else flush what we have and buffer the new chunk. We could do | |
585 # something more intelligent here, like break the chunk. Let's | |
586 # keep things simple for now. | |
587 for frame in self._flush(): | |
588 yield frame | |
589 | |
590 self._chunks.append(data) | |
591 self._chunkssize = len(data) | |
592 | |
593 def _flush(self): | |
594 payload = b''.join(self._chunks) | |
595 assert len(payload) <= self._maxsize | |
596 | |
597 self._chunks[:] = [] | |
598 self._chunkssize = 0 | |
599 | |
600 yield self._stream.makeframe( | |
601 self._requestid, | |
602 typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
603 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | |
604 payload=payload) | |
513 | 605 |
514 class stream(object): | 606 class stream(object): |
515 """Represents a logical unidirectional series of frames.""" | 607 """Represents a logical unidirectional series of frames.""" |
516 | 608 |
517 def __init__(self, streamid, active=False): | 609 def __init__(self, streamid, active=False): |
714 # In all cases, when the function finishes, the request is fully | 806 # In all cases, when the function finishes, the request is fully |
715 # handled and no new frames for it should be seen. | 807 # handled and no new frames for it should be seen. |
716 | 808 |
717 def sendframes(): | 809 def sendframes(): |
718 emitted = False | 810 emitted = False |
811 emitter = bufferingcommandresponseemitter(stream, requestid) | |
719 while True: | 812 while True: |
720 try: | 813 try: |
721 o = next(objs) | 814 o = next(objs) |
722 except StopIteration: | 815 except StopIteration: |
816 for frame in emitter.send(None): | |
817 yield frame | |
818 | |
723 if emitted: | 819 if emitted: |
724 yield createcommandresponseeosframe(stream, requestid) | 820 yield createcommandresponseeosframe(stream, requestid) |
725 break | 821 break |
726 | 822 |
727 except error.WireprotoCommandError as e: | 823 except error.WireprotoCommandError as e: |
741 try: | 837 try: |
742 if not emitted: | 838 if not emitted: |
743 yield createcommandresponseokframe(stream, requestid) | 839 yield createcommandresponseokframe(stream, requestid) |
744 emitted = True | 840 emitted = True |
745 | 841 |
746 # TODO buffer chunks so emitted frame payloads can be | 842 for chunk in cborutil.streamencode(o): |
747 # larger. | 843 for frame in emitter.send(chunk): |
748 for frame in createbytesresponseframesfromgen( | 844 yield frame |
749 stream, requestid, cborutil.streamencode(o)): | 845 |
750 yield frame | |
751 except Exception as e: | 846 except Exception as e: |
752 for frame in createerrorframe(stream, requestid, | 847 for frame in createerrorframe(stream, requestid, |
753 '%s' % e, | 848 '%s' % e, |
754 errtype='server'): | 849 errtype='server'): |
755 yield frame | 850 yield frame |