comparison mercurial/wireprotoframing.py @ 37544:55b5ba8d4e68

wireproto: client reactor support for receiving frames We can now feed received frames into the client reactor and it will validate their sanity, dispatch them appropriately. The hacky HTTP peer has been updated to use the new code. No existing tests changed, somewhat proving the code works as expected. Rudimentary unit tests for the new functionality have been implemented. Differential Revision: https://phab.mercurial-scm.org/D3224
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 09 Apr 2018 16:54:20 -0700
parents 01361be9e2dc
children b9502b5f2066
comparison
equal deleted inserted replaced
37543:01361be9e2dc 37544:55b5ba8d4e68
920 self._nextrequestid = 1 920 self._nextrequestid = 1
921 # We only support a single outgoing stream for now. 921 # We only support a single outgoing stream for now.
922 self._outgoingstream = stream(1) 922 self._outgoingstream = stream(1)
923 self._pendingrequests = collections.deque() 923 self._pendingrequests = collections.deque()
924 self._activerequests = {} 924 self._activerequests = {}
925 self._incomingstreams = {}
925 926
926 def callcommand(self, name, args, datafh=None): 927 def callcommand(self, name, args, datafh=None):
927 """Request that a command be executed. 928 """Request that a command be executed.
928 929
929 Receives the command name, a dict of arguments to pass to the command, 930 Receives the command name, a dict of arguments to pass to the command,
1005 1006
1006 for frame in res: 1007 for frame in res:
1007 yield frame 1008 yield frame
1008 1009
1009 request.state = 'sent' 1010 request.state = 'sent'
1011
1012 def onframerecv(self, frame):
1013 """Process a frame that has been received off the wire.
1014
1015 Returns a 2-tuple of (action, meta) describing further action the
1016 caller needs to take as a result of receiving this frame.
1017 """
1018 if frame.streamid % 2:
1019 return 'error', {
1020 'message': (
1021 _('received frame with odd numbered stream ID: %d') %
1022 frame.streamid),
1023 }
1024
1025 if frame.streamid not in self._incomingstreams:
1026 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1027 return 'error', {
1028 'message': _('received frame on unknown stream '
1029 'without beginning of stream flag set'),
1030 }
1031
1032 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1033 raise error.ProgrammingError('support for decoding stream '
1034 'payloads not yet implemneted')
1035
1036 if frame.streamflags & STREAM_FLAG_END_STREAM:
1037 del self._incomingstreams[frame.streamid]
1038
1039 if frame.requestid not in self._activerequests:
1040 return 'error', {
1041 'message': (_('received frame for inactive request ID: %d') %
1042 frame.requestid),
1043 }
1044
1045 request = self._activerequests[frame.requestid]
1046 request.state = 'receiving'
1047
1048 handlers = {
1049 FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
1050 }
1051
1052 meth = handlers.get(frame.typeid)
1053 if not meth:
1054 raise error.ProgrammingError('unhandled frame type: %d' %
1055 frame.typeid)
1056
1057 return meth(request, frame)
1058
1059 def _onbytesresponseframe(self, request, frame):
1060 if frame.flags & FLAG_BYTES_RESPONSE_EOS:
1061 request.state = 'received'
1062 del self._activerequests[request.requestid]
1063
1064 return 'responsedata', {
1065 'request': request,
1066 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
1067 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
1068 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
1069 'data': frame.payload,
1070 }