--- a/mercurial/wireprotoframing.py Thu Oct 04 15:43:21 2018 -0700
+++ b/mercurial/wireprotoframing.py Thu Oct 04 16:44:21 2018 -0700
@@ -668,6 +668,13 @@
return makeframe(requestid, self.streamid, streamflags, typeid, flags,
payload)
+ def setdecoder(self, name, extraobjs):
+ """Set the decoder for this stream.
+
+ Receives the stream profile name and any additional CBOR objects
+ decoded from the stream encoding settings frame payloads.
+ """
+
def ensureserverstream(stream):
if stream.streamid % 2:
raise error.ProgrammingError('server should only write to even '
@@ -1367,6 +1374,7 @@
self._pendingrequests = collections.deque()
self._activerequests = {}
self._incomingstreams = {}
+ self._streamsettingsdecoders = {}
def callcommand(self, name, args, datafh=None, redirect=None):
"""Request that a command be executed.
@@ -1484,6 +1492,9 @@
if frame.streamflags & STREAM_FLAG_END_STREAM:
del self._incomingstreams[frame.streamid]
+ if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
+ return self._onstreamsettingsframe(frame)
+
if frame.requestid not in self._activerequests:
return 'error', {
'message': (_('received frame for inactive request ID: %d') %
@@ -1505,6 +1516,64 @@
return meth(request, frame)
+ def _onstreamsettingsframe(self, frame):
+ assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
+
+ more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
+ eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
+
+ if more and eos:
+ return 'error', {
+ 'message': (_('stream encoding settings frame cannot have both '
+ 'continuation and end of stream flags set')),
+ }
+
+ if not more and not eos:
+ return 'error', {
+ 'message': _('stream encoding settings frame must have '
+ 'continuation or end of stream flag set'),
+ }
+
+ if frame.streamid not in self._streamsettingsdecoders:
+ decoder = cborutil.bufferingdecoder()
+ self._streamsettingsdecoders[frame.streamid] = decoder
+
+ decoder = self._streamsettingsdecoders[frame.streamid]
+
+ try:
+ decoder.decode(frame.payload)
+ except Exception as e:
+ return 'error', {
+ 'message': (_('error decoding CBOR from stream encoding '
+ 'settings frame: %s') %
+ stringutil.forcebytestr(e)),
+ }
+
+ if more:
+ return 'noop', {}
+
+ assert eos
+
+ decoded = decoder.getavailable()
+ del self._streamsettingsdecoders[frame.streamid]
+
+ if not decoded:
+ return 'error', {
+ 'message': _('stream encoding settings frame did not contain '
+ 'CBOR data'),
+ }
+
+ try:
+ self._incomingstreams[frame.streamid].setdecoder(decoded[0],
+ decoded[1:])
+ except Exception as e:
+ return 'error', {
+ 'message': (_('error setting stream decoder: %s') %
+ stringutil.forcebytestr(e)),
+ }
+
+ return 'noop', {}
+
def _oncommandresponseframe(self, request, frame):
if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
request.state = 'received'