Mercurial > public > mercurial-scm > hg
comparison mercurial/wireprotoframing.py @ 37544:55b5ba8d4e68
wireproto: client reactor support for receiving frames
We can now feed received frames into the client reactor and it will
validate their sanity, dispatch them appropriately.
The hacky HTTP peer has been updated to use the new code. No
existing tests changed, somewhat proving the code works as
expected.
Rudimentary unit tests for the new functionality have been
implemented.
Differential Revision: https://phab.mercurial-scm.org/D3224
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 09 Apr 2018 16:54:20 -0700 |
parents | 01361be9e2dc |
children | b9502b5f2066 |
comparison
equal
deleted
inserted
replaced
37543:01361be9e2dc | 37544:55b5ba8d4e68 |
---|---|
920 self._nextrequestid = 1 | 920 self._nextrequestid = 1 |
921 # We only support a single outgoing stream for now. | 921 # We only support a single outgoing stream for now. |
922 self._outgoingstream = stream(1) | 922 self._outgoingstream = stream(1) |
923 self._pendingrequests = collections.deque() | 923 self._pendingrequests = collections.deque() |
924 self._activerequests = {} | 924 self._activerequests = {} |
925 self._incomingstreams = {} | |
925 | 926 |
926 def callcommand(self, name, args, datafh=None): | 927 def callcommand(self, name, args, datafh=None): |
927 """Request that a command be executed. | 928 """Request that a command be executed. |
928 | 929 |
929 Receives the command name, a dict of arguments to pass to the command, | 930 Receives the command name, a dict of arguments to pass to the command, |
1005 | 1006 |
1006 for frame in res: | 1007 for frame in res: |
1007 yield frame | 1008 yield frame |
1008 | 1009 |
1009 request.state = 'sent' | 1010 request.state = 'sent' |
1011 | |
1012 def onframerecv(self, frame): | |
1013 """Process a frame that has been received off the wire. | |
1014 | |
1015 Returns a 2-tuple of (action, meta) describing further action the | |
1016 caller needs to take as a result of receiving this frame. | |
1017 """ | |
1018 if frame.streamid % 2: | |
1019 return 'error', { | |
1020 'message': ( | |
1021 _('received frame with odd numbered stream ID: %d') % | |
1022 frame.streamid), | |
1023 } | |
1024 | |
1025 if frame.streamid not in self._incomingstreams: | |
1026 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: | |
1027 return 'error', { | |
1028 'message': _('received frame on unknown stream ' | |
1029 'without beginning of stream flag set'), | |
1030 } | |
1031 | |
1032 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: | |
1033 raise error.ProgrammingError('support for decoding stream ' | |
1034 'payloads not yet implemneted') | |
1035 | |
1036 if frame.streamflags & STREAM_FLAG_END_STREAM: | |
1037 del self._incomingstreams[frame.streamid] | |
1038 | |
1039 if frame.requestid not in self._activerequests: | |
1040 return 'error', { | |
1041 'message': (_('received frame for inactive request ID: %d') % | |
1042 frame.requestid), | |
1043 } | |
1044 | |
1045 request = self._activerequests[frame.requestid] | |
1046 request.state = 'receiving' | |
1047 | |
1048 handlers = { | |
1049 FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe, | |
1050 } | |
1051 | |
1052 meth = handlers.get(frame.typeid) | |
1053 if not meth: | |
1054 raise error.ProgrammingError('unhandled frame type: %d' % | |
1055 frame.typeid) | |
1056 | |
1057 return meth(request, frame) | |
1058 | |
1059 def _onbytesresponseframe(self, request, frame): | |
1060 if frame.flags & FLAG_BYTES_RESPONSE_EOS: | |
1061 request.state = 'received' | |
1062 del self._activerequests[request.requestid] | |
1063 | |
1064 return 'responsedata', { | |
1065 'request': request, | |
1066 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION, | |
1067 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS, | |
1068 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR, | |
1069 'data': frame.payload, | |
1070 } |