Mercurial > public > mercurial-scm > hg-stable
diff mercurial/wireprotoframing.py @ 40129:57782791b7e9
wireprotov2: handle stream encoding settings frames
Like what we just did for the server reactor, we teach the client
reactor to handle stream encoding settings frames. The code is
very similar.
We define a method on the stream class to handle processing the data
within the decoded frames. However, it doesn't yet do anything useful.
Differential Revision: https://phab.mercurial-scm.org/D4918
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Thu, 04 Oct 2018 16:44:21 -0700 |
parents | 080419fa4fe4 |
children | 293835e0fff7 |
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py Thu Oct 04 15:43:21 2018 -0700 +++ b/mercurial/wireprotoframing.py Thu Oct 04 16:44:21 2018 -0700 @@ -668,6 +668,13 @@ return makeframe(requestid, self.streamid, streamflags, typeid, flags, payload) + def setdecoder(self, name, extraobjs): + """Set the decoder for this stream. + + Receives the stream profile name and any additional CBOR objects + decoded from the stream encoding settings frame payloads. + """ + def ensureserverstream(stream): if stream.streamid % 2: raise error.ProgrammingError('server should only write to even ' @@ -1367,6 +1374,7 @@ self._pendingrequests = collections.deque() self._activerequests = {} self._incomingstreams = {} + self._streamsettingsdecoders = {} def callcommand(self, name, args, datafh=None, redirect=None): """Request that a command be executed. @@ -1484,6 +1492,9 @@ if frame.streamflags & STREAM_FLAG_END_STREAM: del self._incomingstreams[frame.streamid] + if frame.typeid == FRAME_TYPE_STREAM_SETTINGS: + return self._onstreamsettingsframe(frame) + if frame.requestid not in self._activerequests: return 'error', { 'message': (_('received frame for inactive request ID: %d') % @@ -1505,6 +1516,64 @@ return meth(request, frame) + def _onstreamsettingsframe(self, frame): + assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS + + more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION + eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS + + if more and eos: + return 'error', { + 'message': (_('stream encoding settings frame cannot have both ' + 'continuation and end of stream flags set')), + } + + if not more and not eos: + return 'error', { + 'message': _('stream encoding settings frame must have ' + 'continuation or end of stream flag set'), + } + + if frame.streamid not in self._streamsettingsdecoders: + decoder = cborutil.bufferingdecoder() + self._streamsettingsdecoders[frame.streamid] = decoder + + decoder = self._streamsettingsdecoders[frame.streamid] + + try: + decoder.decode(frame.payload) + except Exception as e: + return 'error', { + 'message': (_('error decoding CBOR from stream encoding ' + 'settings frame: %s') % + stringutil.forcebytestr(e)), + } + + if more: + return 'noop', {} + + assert eos + + decoded = decoder.getavailable() + del self._streamsettingsdecoders[frame.streamid] + + if not decoded: + return 'error', { + 'message': _('stream encoding settings frame did not contain ' + 'CBOR data'), + } + + try: + self._incomingstreams[frame.streamid].setdecoder(decoded[0], + decoded[1:]) + except Exception as e: + return 'error', { + 'message': (_('error setting stream decoder: %s') % + stringutil.forcebytestr(e)), + } + + return 'noop', {} + def _oncommandresponseframe(self, request, frame): if frame.flags & FLAG_COMMAND_RESPONSE_EOS: request.state = 'received'