comparison mercurial/wireprotoframing.py @ 39559:07b58266bce3

wireprotov2: implement commands as a generator of objects Previously, wire protocol version 2 inherited version 1's model of having separate types to represent the results of different wire protocol commands. As I implemented more powerful commands in future commits, I found I was using a common pattern of returning a special type to hold a generator. This meant the command function required a closure to do most of the work. That made logic flow more difficult to follow. I also noticed that many commands were effectively a sequence of objects to be CBOR encoded. I think it makes sense to define version 2 commands as generators. This way, commands can simply emit the data structures they wish to send to the client. This eliminates the need for a closure in command functions and removes encoding from the bodies of commands. As part of this commit, the handling of response objects has been moved into the serverreactor class. This puts the reactor in the driver's seat with regards to CBOR encoding and error handling. Having error handling in the function that emits frames is particularly important because exceptions in that function can lead to things getting in a bad state: I'm fairly certain that uncaught exceptions in the frame generator were causing deadlocks. I also introduced a dedicated error type for explicit error reporting in command handlers. This will be used in subsequent commits. There's still a bit of work to be done here, especially around formalizing the error handling "protocol." I've added yet another TODO to track this so we don't forget. Test output changed because we're using generators and no longer know we are at the end of the data until we hit the end of the generator. This means we can't emit the end-of-stream flag until we've exhausted the generator. Hence the introduction of 0-sized end-of-stream frames. Differential Revision: https://phab.mercurial-scm.org/D4472
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 05 Sep 2018 09:06:40 -0700
parents 43d92d68ac88
children 84bf6ded9317
comparison
equal deleted inserted replaced
39558:b0e0db1565d1 39559:07b58266bce3
386 if done: 386 if done:
387 break 387 break
388 388
389 def createbytesresponseframesfromgen(stream, requestid, gen, 389 def createbytesresponseframesfromgen(stream, requestid, gen,
390 maxframesize=DEFAULT_MAX_FRAME_SIZE): 390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
391 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) 391 """Generator of frames from a generator of byte chunks.
392 392
393 yield stream.makeframe(requestid=requestid, 393 This assumes that another frame will follow whatever this emits. i.e.
394 typeid=FRAME_TYPE_COMMAND_RESPONSE, 394 this always emits the continuation flag and never emits the end-of-stream
395 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 395 flag.
396 payload=overall) 396 """
397
398 cb = util.chunkbuffer(gen) 397 cb = util.chunkbuffer(gen)
399 398 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
400 flags = 0
401 399
402 while True: 400 while True:
403 chunk = cb.read(maxframesize) 401 chunk = cb.read(maxframesize)
404 if not chunk: 402 if not chunk:
405 break 403 break
409 flags=flags, 407 flags=flags,
410 payload=chunk) 408 payload=chunk)
411 409
412 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION 410 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
413 411
414 flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION 412 def createcommandresponseokframe(stream, requestid):
415 flags |= FLAG_COMMAND_RESPONSE_EOS 413 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
416 yield stream.makeframe(requestid=requestid, 414
417 typeid=FRAME_TYPE_COMMAND_RESPONSE, 415 return stream.makeframe(requestid=requestid,
418 flags=flags, 416 typeid=FRAME_TYPE_COMMAND_RESPONSE,
419 payload=b'') 417 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
418 payload=overall)
419
420 def createcommandresponseeosframe(stream, requestid):
421 """Create an empty payload frame representing command end-of-stream."""
422 return stream.makeframe(requestid=requestid,
423 typeid=FRAME_TYPE_COMMAND_RESPONSE,
424 flags=FLAG_COMMAND_RESPONSE_EOS,
425 payload=b'')
420 426
421 def createcommanderrorresponse(stream, requestid, message, args=None): 427 def createcommanderrorresponse(stream, requestid, message, args=None):
422 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom 428 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
423 # formatting works consistently? 429 # formatting works consistently?
424 m = { 430 m = {
684 else: 690 else:
685 return 'sendframes', { 691 return 'sendframes', {
686 'framegen': result, 692 'framegen': result,
687 } 693 }
688 694
689 def oncommandresponsereadygen(self, stream, requestid, gen): 695 def oncommandresponsereadyobjects(self, stream, requestid, objs):
690 """Signal that a bytes response is ready, with data as a generator.""" 696 """Signal that objects are ready to be sent to the client.
697
698 ``objs`` is an iterable of objects (typically a generator) that will
699 be encoded via CBOR and added to frames, which will be sent to the
700 client.
701 """
691 ensureserverstream(stream) 702 ensureserverstream(stream)
692 703
704 # We need to take care over exception handling. Uncaught exceptions
705 # when generating frames could lead to premature end of the frame
706 # stream and the possibility of the server or client process getting
707 # in a bad state.
708 #
709 # Keep in mind that if ``objs`` is a generator, advancing it could
710 # raise exceptions that originated in e.g. wire protocol command
711 # functions. That is why we differentiate between exceptions raised
712 # when iterating versus other exceptions that occur.
713 #
714 # In all cases, when the function finishes, the request is fully
715 # handled and no new frames for it should be seen.
716
693 def sendframes(): 717 def sendframes():
694 for frame in createbytesresponseframesfromgen(stream, requestid, 718 emitted = False
695 gen): 719 while True:
696 yield frame 720 try:
721 o = next(objs)
722 except StopIteration:
723 if emitted:
724 yield createcommandresponseeosframe(stream, requestid)
725 break
726
727 except error.WireprotoCommandError as e:
728 for frame in createcommanderrorresponse(
729 stream, requestid, e.message, e.messageargs):
730 yield frame
731 break
732
733 except Exception as e:
734 for frame in createerrorframe(stream, requestid,
735 '%s' % e,
736 errtype='server'):
737 yield frame
738
739 break
740
741 try:
742 if not emitted:
743 yield createcommandresponseokframe(stream, requestid)
744 emitted = True
745
746 # TODO buffer chunks so emitted frame payloads can be
747 # larger.
748 for frame in createbytesresponseframesfromgen(
749 stream, requestid, cborutil.streamencode(o)):
750 yield frame
751 except Exception as e:
752 for frame in createerrorframe(stream, requestid,
753 '%s' % e,
754 errtype='server'):
755 yield frame
756
757 break
697 758
698 self._activecommands.remove(requestid) 759 self._activecommands.remove(requestid)
699 760
700 return self._handlesendframes(sendframes()) 761 return self._handlesendframes(sendframes())
701 762