mercurial/wireprotoframing.py
changeset 43077 687b865b95ad
parent 43076 2372284d9457
child 43089 c59eb1560c44
equal deleted inserted replaced
43076:2372284d9457 43077:687b865b95ad
   125     namemap = {v: k for k, v in mapping.iteritems()}
   125     namemap = {v: k for k, v in mapping.iteritems()}
   126     flags = []
   126     flags = []
   127     val = 1
   127     val = 1
   128     while value >= val:
   128     while value >= val:
   129         if value & val:
   129         if value & val:
   130             flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
   130             flags.append(namemap.get(val, b'<unknown 0x%02x>' % val))
   131         val <<= 1
   131         val <<= 1
   132 
   132 
   133     return b'|'.join(flags)
   133     return b'|'.join(flags)
   134 
   134 
   135 
   135 
   156     flags = attr.ib()
   156     flags = attr.ib()
   157     payload = attr.ib()
   157     payload = attr.ib()
   158 
   158 
   159     @encoding.strmethod
   159     @encoding.strmethod
   160     def __repr__(self):
   160     def __repr__(self):
   161         typename = '<unknown 0x%02x>' % self.typeid
   161         typename = b'<unknown 0x%02x>' % self.typeid
   162         for name, value in FRAME_TYPES.iteritems():
   162         for name, value in FRAME_TYPES.iteritems():
   163             if value == self.typeid:
   163             if value == self.typeid:
   164                 typename = name
   164                 typename = name
   165                 break
   165                 break
   166 
   166 
   167         return (
   167         return (
   168             'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
   168             b'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
   169             'type=%s; flags=%s)'
   169             b'type=%s; flags=%s)'
   170             % (
   170             % (
   171                 len(self.payload),
   171                 len(self.payload),
   172                 self.requestid,
   172                 self.requestid,
   173                 self.streamid,
   173                 self.streamid,
   174                 humanflags(STREAM_FLAGS, self.streamflags),
   174                 humanflags(STREAM_FLAGS, self.streamflags),
   304     if readcount == 0:
   304     if readcount == 0:
   305         return None
   305         return None
   306 
   306 
   307     if readcount != FRAME_HEADER_SIZE:
   307     if readcount != FRAME_HEADER_SIZE:
   308         raise error.Abort(
   308         raise error.Abort(
   309             _('received incomplete frame: got %d bytes: %s')
   309             _(b'received incomplete frame: got %d bytes: %s')
   310             % (readcount, header)
   310             % (readcount, header)
   311         )
   311         )
   312 
   312 
   313     h = parseheader(header)
   313     h = parseheader(header)
   314 
   314 
   315     payload = fh.read(h.length)
   315     payload = fh.read(h.length)
   316     if len(payload) != h.length:
   316     if len(payload) != h.length:
   317         raise error.Abort(
   317         raise error.Abort(
   318             _('frame length error: expected %d; got %d')
   318             _(b'frame length error: expected %d; got %d')
   319             % (h.length, len(payload))
   319             % (h.length, len(payload))
   320         )
   320         )
   321 
   321 
   322     return frame(
   322     return frame(
   323         h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
   323         h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
   536 
   536 
   537     for (formatting, args, labels) in atoms:
   537     for (formatting, args, labels) in atoms:
   538         # TODO look for localstr, other types here?
   538         # TODO look for localstr, other types here?
   539 
   539 
   540         if not isinstance(formatting, bytes):
   540         if not isinstance(formatting, bytes):
   541             raise ValueError('must use bytes formatting strings')
   541             raise ValueError(b'must use bytes formatting strings')
   542         for arg in args:
   542         for arg in args:
   543             if not isinstance(arg, bytes):
   543             if not isinstance(arg, bytes):
   544                 raise ValueError('must use bytes for arguments')
   544                 raise ValueError(b'must use bytes for arguments')
   545         for label in labels:
   545         for label in labels:
   546             if not isinstance(label, bytes):
   546             if not isinstance(label, bytes):
   547                 raise ValueError('must use bytes for labels')
   547                 raise ValueError(b'must use bytes for labels')
   548 
   548 
   549         # Formatting string must be ASCII.
   549         # Formatting string must be ASCII.
   550         formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
   550         formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
   551 
   551 
   552         # Arguments must be UTF-8.
   552         # Arguments must be UTF-8.
   566         atomdicts.append(atom)
   566         atomdicts.append(atom)
   567 
   567 
   568     payload = b''.join(cborutil.streamencode(atomdicts))
   568     payload = b''.join(cborutil.streamencode(atomdicts))
   569 
   569 
   570     if len(payload) > maxframesize:
   570     if len(payload) > maxframesize:
   571         raise ValueError('cannot encode data in a single frame')
   571         raise ValueError(b'cannot encode data in a single frame')
   572 
   572 
   573     yield stream.makeframe(
   573     yield stream.makeframe(
   574         requestid=requestid,
   574         requestid=requestid,
   575         typeid=FRAME_TYPE_TEXT_OUTPUT,
   575         typeid=FRAME_TYPE_TEXT_OUTPUT,
   576         flags=0,
   576         flags=0,
   708     """Decoder for the "identity" stream encoding profile."""
   708     """Decoder for the "identity" stream encoding profile."""
   709 
   709 
   710     def __init__(self, ui, extraobjs):
   710     def __init__(self, ui, extraobjs):
   711         if extraobjs:
   711         if extraobjs:
   712             raise error.Abort(
   712             raise error.Abort(
   713                 _('identity decoder received unexpected ' 'additional values')
   713                 _(b'identity decoder received unexpected ' b'additional values')
   714             )
   714             )
   715 
   715 
   716     def decode(self, data):
   716     def decode(self, data):
   717         return data
   717         return data
   718 
   718 
   742     def __init__(self, ui, extraobjs):
   742     def __init__(self, ui, extraobjs):
   743         import zlib
   743         import zlib
   744 
   744 
   745         if extraobjs:
   745         if extraobjs:
   746             raise error.Abort(
   746             raise error.Abort(
   747                 _('zlib decoder received unexpected ' 'additional values')
   747                 _(b'zlib decoder received unexpected ' b'additional values')
   748             )
   748             )
   749 
   749 
   750         self._decompressor = zlib.decompressobj()
   750         self._decompressor = zlib.decompressobj()
   751 
   751 
   752     def decode(self, data):
   752     def decode(self, data):
   799 
   799 
   800 class zstd8mbdecoder(zstdbasedecoder):
   800 class zstd8mbdecoder(zstdbasedecoder):
   801     def __init__(self, ui, extraobjs):
   801     def __init__(self, ui, extraobjs):
   802         if extraobjs:
   802         if extraobjs:
   803             raise error.Abort(
   803             raise error.Abort(
   804                 _('zstd8mb decoder received unexpected ' 'additional values')
   804                 _(b'zstd8mb decoder received unexpected ' b'additional values')
   805             )
   805             )
   806 
   806 
   807         super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
   807         super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
   808 
   808 
   809 
   809 
   870 
   870 
   871         Receives the stream profile name and any additional CBOR objects
   871         Receives the stream profile name and any additional CBOR objects
   872         decoded from the stream encoding settings frame payloads.
   872         decoded from the stream encoding settings frame payloads.
   873         """
   873         """
   874         if name not in STREAM_ENCODERS:
   874         if name not in STREAM_ENCODERS:
   875             raise error.Abort(_('unknown stream decoder: %s') % name)
   875             raise error.Abort(_(b'unknown stream decoder: %s') % name)
   876 
   876 
   877         self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
   877         self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
   878 
   878 
   879     def decode(self, data):
   879     def decode(self, data):
   880         # Default is identity decoder. We don't bother instantiating one
   880         # Default is identity decoder. We don't bother instantiating one
   904         """Set the encoder for this stream.
   904         """Set the encoder for this stream.
   905 
   905 
   906         Receives the stream profile name.
   906         Receives the stream profile name.
   907         """
   907         """
   908         if name not in STREAM_ENCODERS:
   908         if name not in STREAM_ENCODERS:
   909             raise error.Abort(_('unknown stream encoder: %s') % name)
   909             raise error.Abort(_(b'unknown stream encoder: %s') % name)
   910 
   910 
   911         self._encoder = STREAM_ENCODERS[name][0](ui)
   911         self._encoder = STREAM_ENCODERS[name][0](ui)
   912         self._encodername = name
   912         self._encodername = name
   913 
   913 
   914     def encode(self, data):
   914     def encode(self, data):
   977 
   977 
   978 
   978 
   979 def ensureserverstream(stream):
   979 def ensureserverstream(stream):
   980     if stream.streamid % 2:
   980     if stream.streamid % 2:
   981         raise error.ProgrammingError(
   981         raise error.ProgrammingError(
   982             'server should only write to even '
   982             b'server should only write to even '
   983             'numbered streams; %d is not even' % stream.streamid
   983             b'numbered streams; %d is not even' % stream.streamid
   984         )
   984         )
   985 
   985 
   986 
   986 
   987 DEFAULT_PROTOCOL_SETTINGS = {
   987 DEFAULT_PROTOCOL_SETTINGS = {
   988     'contentencodings': [b'identity'],
   988     b'contentencodings': [b'identity'],
   989 }
   989 }
   990 
   990 
   991 
   991 
   992 class serverreactor(object):
   992 class serverreactor(object):
   993     """Holds state of a server handling frame-based protocol requests.
   993     """Holds state of a server handling frame-based protocol requests.
  1064         send those frames. This is useful for half-duplex transports where the
  1064         send those frames. This is useful for half-duplex transports where the
  1065         sender cannot receive until all data has been transmitted.
  1065         sender cannot receive until all data has been transmitted.
  1066         """
  1066         """
  1067         self._ui = ui
  1067         self._ui = ui
  1068         self._deferoutput = deferoutput
  1068         self._deferoutput = deferoutput
  1069         self._state = 'initial'
  1069         self._state = b'initial'
  1070         self._nextoutgoingstreamid = 2
  1070         self._nextoutgoingstreamid = 2
  1071         self._bufferedframegens = []
  1071         self._bufferedframegens = []
  1072         # stream id -> stream instance for all active streams from the client.
  1072         # stream id -> stream instance for all active streams from the client.
  1073         self._incomingstreams = {}
  1073         self._incomingstreams = {}
  1074         self._outgoingstreams = {}
  1074         self._outgoingstreams = {}
  1091 
  1091 
  1092         Returns a dict with an ``action`` key that details what action,
  1092         Returns a dict with an ``action`` key that details what action,
  1093         if any, the consumer should take next.
  1093         if any, the consumer should take next.
  1094         """
  1094         """
  1095         if not frame.streamid % 2:
  1095         if not frame.streamid % 2:
  1096             self._state = 'errored'
  1096             self._state = b'errored'
  1097             return self._makeerrorresult(
  1097             return self._makeerrorresult(
  1098                 _('received frame with even numbered stream ID: %d')
  1098                 _(b'received frame with even numbered stream ID: %d')
  1099                 % frame.streamid
  1099                 % frame.streamid
  1100             )
  1100             )
  1101 
  1101 
  1102         if frame.streamid not in self._incomingstreams:
  1102         if frame.streamid not in self._incomingstreams:
  1103             if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
  1103             if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
  1104                 self._state = 'errored'
  1104                 self._state = b'errored'
  1105                 return self._makeerrorresult(
  1105                 return self._makeerrorresult(
  1106                     _(
  1106                     _(
  1107                         'received frame on unknown inactive stream without '
  1107                         b'received frame on unknown inactive stream without '
  1108                         'beginning of stream flag set'
  1108                         b'beginning of stream flag set'
  1109                     )
  1109                     )
  1110                 )
  1110                 )
  1111 
  1111 
  1112             self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
  1112             self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
  1113 
  1113 
  1114         if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
  1114         if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
  1115             # TODO handle decoding frames
  1115             # TODO handle decoding frames
  1116             self._state = 'errored'
  1116             self._state = b'errored'
  1117             raise error.ProgrammingError(
  1117             raise error.ProgrammingError(
  1118                 'support for decoding stream payloads ' 'not yet implemented'
  1118                 b'support for decoding stream payloads ' b'not yet implemented'
  1119             )
  1119             )
  1120 
  1120 
  1121         if frame.streamflags & STREAM_FLAG_END_STREAM:
  1121         if frame.streamflags & STREAM_FLAG_END_STREAM:
  1122             del self._incomingstreams[frame.streamid]
  1122             del self._incomingstreams[frame.streamid]
  1123 
  1123 
  1124         handlers = {
  1124         handlers = {
  1125             'initial': self._onframeinitial,
  1125             b'initial': self._onframeinitial,
  1126             'protocol-settings-receiving': self._onframeprotocolsettings,
  1126             b'protocol-settings-receiving': self._onframeprotocolsettings,
  1127             'idle': self._onframeidle,
  1127             b'idle': self._onframeidle,
  1128             'command-receiving': self._onframecommandreceiving,
  1128             b'command-receiving': self._onframecommandreceiving,
  1129             'errored': self._onframeerrored,
  1129             b'errored': self._onframeerrored,
  1130         }
  1130         }
  1131 
  1131 
  1132         meth = handlers.get(self._state)
  1132         meth = handlers.get(self._state)
  1133         if not meth:
  1133         if not meth:
  1134             raise error.ProgrammingError('unhandled state: %s' % self._state)
  1134             raise error.ProgrammingError(b'unhandled state: %s' % self._state)
  1135 
  1135 
  1136         return meth(frame)
  1136         return meth(frame)
  1137 
  1137 
  1138     def oncommandresponsereadyobjects(self, stream, requestid, objs):
  1138     def oncommandresponsereadyobjects(self, stream, requestid, objs):
  1139         """Signal that objects are ready to be sent to the client.
  1139         """Signal that objects are ready to be sent to the client.
  1188 
  1188 
  1189                 except Exception as e:
  1189                 except Exception as e:
  1190                     for frame in createerrorframe(
  1190                     for frame in createerrorframe(
  1191                         stream,
  1191                         stream,
  1192                         requestid,
  1192                         requestid,
  1193                         '%s' % stringutil.forcebytestr(e),
  1193                         b'%s' % stringutil.forcebytestr(e),
  1194                         errtype='server',
  1194                         errtype=b'server',
  1195                     ):
  1195                     ):
  1196 
  1196 
  1197                         yield frame
  1197                         yield frame
  1198 
  1198 
  1199                     break
  1199                     break
  1202                     # Alternate location responses can only be the first and
  1202                     # Alternate location responses can only be the first and
  1203                     # only object in the output stream.
  1203                     # only object in the output stream.
  1204                     if isinstance(o, wireprototypes.alternatelocationresponse):
  1204                     if isinstance(o, wireprototypes.alternatelocationresponse):
  1205                         if emitted:
  1205                         if emitted:
  1206                             raise error.ProgrammingError(
  1206                             raise error.ProgrammingError(
  1207                                 'alternatelocationresponse seen after initial '
  1207                                 b'alternatelocationresponse seen after initial '
  1208                                 'output object'
  1208                                 b'output object'
  1209                             )
  1209                             )
  1210 
  1210 
  1211                         frame = stream.makestreamsettingsframe(requestid)
  1211                         frame = stream.makestreamsettingsframe(requestid)
  1212                         if frame:
  1212                         if frame:
  1213                             yield frame
  1213                             yield frame
  1220                         emitted = True
  1220                         emitted = True
  1221                         continue
  1221                         continue
  1222 
  1222 
  1223                     if alternatelocationsent:
  1223                     if alternatelocationsent:
  1224                         raise error.ProgrammingError(
  1224                         raise error.ProgrammingError(
  1225                             'object follows alternatelocationresponse'
  1225                             b'object follows alternatelocationresponse'
  1226                         )
  1226                         )
  1227 
  1227 
  1228                     if not emitted:
  1228                     if not emitted:
  1229                         # Frame is optional.
  1229                         # Frame is optional.
  1230                         frame = stream.makestreamsettingsframe(requestid)
  1230                         frame = stream.makestreamsettingsframe(requestid)
  1264                             for frame in emitter.send(chunk):
  1264                             for frame in emitter.send(chunk):
  1265                                 yield frame
  1265                                 yield frame
  1266 
  1266 
  1267                 except Exception as e:
  1267                 except Exception as e:
  1268                     for frame in createerrorframe(
  1268                     for frame in createerrorframe(
  1269                         stream, requestid, '%s' % e, errtype='server'
  1269                         stream, requestid, b'%s' % e, errtype=b'server'
  1270                     ):
  1270                     ):
  1271                         yield frame
  1271                         yield frame
  1272 
  1272 
  1273                     break
  1273                     break
  1274 
  1274 
  1283         completed.
  1283         completed.
  1284         """
  1284         """
  1285         # TODO should we do anything about in-flight commands?
  1285         # TODO should we do anything about in-flight commands?
  1286 
  1286 
  1287         if not self._deferoutput or not self._bufferedframegens:
  1287         if not self._deferoutput or not self._bufferedframegens:
  1288             return 'noop', {}
  1288             return b'noop', {}
  1289 
  1289 
  1290         # If we buffered all our responses, emit those.
  1290         # If we buffered all our responses, emit those.
  1291         def makegen():
  1291         def makegen():
  1292             for gen in self._bufferedframegens:
  1292             for gen in self._bufferedframegens:
  1293                 for frame in gen:
  1293                 for frame in gen:
  1294                     yield frame
  1294                     yield frame
  1295 
  1295 
  1296         return 'sendframes', {'framegen': makegen(),}
  1296         return b'sendframes', {b'framegen': makegen(),}
  1297 
  1297 
  1298     def _handlesendframes(self, framegen):
  1298     def _handlesendframes(self, framegen):
  1299         if self._deferoutput:
  1299         if self._deferoutput:
  1300             self._bufferedframegens.append(framegen)
  1300             self._bufferedframegens.append(framegen)
  1301             return 'noop', {}
  1301             return b'noop', {}
  1302         else:
  1302         else:
  1303             return 'sendframes', {'framegen': framegen,}
  1303             return b'sendframes', {b'framegen': framegen,}
  1304 
  1304 
  1305     def onservererror(self, stream, requestid, msg):
  1305     def onservererror(self, stream, requestid, msg):
  1306         ensureserverstream(stream)
  1306         ensureserverstream(stream)
  1307 
  1307 
  1308         def sendframes():
  1308         def sendframes():
  1309             for frame in createerrorframe(
  1309             for frame in createerrorframe(
  1310                 stream, requestid, msg, errtype='server'
  1310                 stream, requestid, msg, errtype=b'server'
  1311             ):
  1311             ):
  1312                 yield frame
  1312                 yield frame
  1313 
  1313 
  1314             self._activecommands.remove(requestid)
  1314             self._activecommands.remove(requestid)
  1315 
  1315 
  1343         self._outgoingstreams[streamid] = s
  1343         self._outgoingstreams[streamid] = s
  1344 
  1344 
  1345         # Always use the *server's* preferred encoder over the client's,
  1345         # Always use the *server's* preferred encoder over the client's,
  1346         # as servers have more to lose from sub-optimal encoders being used.
  1346         # as servers have more to lose from sub-optimal encoders being used.
  1347         for name in STREAM_ENCODERS_ORDER:
  1347         for name in STREAM_ENCODERS_ORDER:
  1348             if name in self._sendersettings['contentencodings']:
  1348             if name in self._sendersettings[b'contentencodings']:
  1349                 s.setencoder(self._ui, name)
  1349                 s.setencoder(self._ui, name)
  1350                 break
  1350                 break
  1351 
  1351 
  1352         return s
  1352         return s
  1353 
  1353 
  1354     def _makeerrorresult(self, msg):
  1354     def _makeerrorresult(self, msg):
  1355         return 'error', {'message': msg,}
  1355         return b'error', {b'message': msg,}
  1356 
  1356 
  1357     def _makeruncommandresult(self, requestid):
  1357     def _makeruncommandresult(self, requestid):
  1358         entry = self._receivingcommands[requestid]
  1358         entry = self._receivingcommands[requestid]
  1359 
  1359 
  1360         if not entry['requestdone']:
  1360         if not entry[b'requestdone']:
  1361             self._state = 'errored'
  1361             self._state = b'errored'
  1362             raise error.ProgrammingError(
  1362             raise error.ProgrammingError(
  1363                 'should not be called without ' 'requestdone set'
  1363                 b'should not be called without ' b'requestdone set'
  1364             )
  1364             )
  1365 
  1365 
  1366         del self._receivingcommands[requestid]
  1366         del self._receivingcommands[requestid]
  1367 
  1367 
  1368         if self._receivingcommands:
  1368         if self._receivingcommands:
  1369             self._state = 'command-receiving'
  1369             self._state = b'command-receiving'
  1370         else:
  1370         else:
  1371             self._state = 'idle'
  1371             self._state = b'idle'
  1372 
  1372 
  1373         # Decode the payloads as CBOR.
  1373         # Decode the payloads as CBOR.
  1374         entry['payload'].seek(0)
  1374         entry[b'payload'].seek(0)
  1375         request = cborutil.decodeall(entry['payload'].getvalue())[0]
  1375         request = cborutil.decodeall(entry[b'payload'].getvalue())[0]
  1376 
  1376 
  1377         if b'name' not in request:
  1377         if b'name' not in request:
  1378             self._state = 'errored'
  1378             self._state = b'errored'
  1379             return self._makeerrorresult(
  1379             return self._makeerrorresult(
  1380                 _('command request missing "name" field')
  1380                 _(b'command request missing "name" field')
  1381             )
  1381             )
  1382 
  1382 
  1383         if b'args' not in request:
  1383         if b'args' not in request:
  1384             request[b'args'] = {}
  1384             request[b'args'] = {}
  1385 
  1385 
  1386         assert requestid not in self._activecommands
  1386         assert requestid not in self._activecommands
  1387         self._activecommands.add(requestid)
  1387         self._activecommands.add(requestid)
  1388 
  1388 
  1389         return (
  1389         return (
  1390             'runcommand',
  1390             b'runcommand',
  1391             {
  1391             {
  1392                 'requestid': requestid,
  1392                 b'requestid': requestid,
  1393                 'command': request[b'name'],
  1393                 b'command': request[b'name'],
  1394                 'args': request[b'args'],
  1394                 b'args': request[b'args'],
  1395                 'redirect': request.get(b'redirect'),
  1395                 b'redirect': request.get(b'redirect'),
  1396                 'data': entry['data'].getvalue() if entry['data'] else None,
  1396                 b'data': entry[b'data'].getvalue() if entry[b'data'] else None,
  1397             },
  1397             },
  1398         )
  1398         )
  1399 
  1399 
  1400     def _makewantframeresult(self):
  1400     def _makewantframeresult(self):
  1401         return 'wantframe', {'state': self._state,}
  1401         return b'wantframe', {b'state': self._state,}
  1402 
  1402 
  1403     def _validatecommandrequestframe(self, frame):
  1403     def _validatecommandrequestframe(self, frame):
  1404         new = frame.flags & FLAG_COMMAND_REQUEST_NEW
  1404         new = frame.flags & FLAG_COMMAND_REQUEST_NEW
  1405         continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
  1405         continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
  1406 
  1406 
  1407         if new and continuation:
  1407         if new and continuation:
  1408             self._state = 'errored'
  1408             self._state = b'errored'
  1409             return self._makeerrorresult(
  1409             return self._makeerrorresult(
  1410                 _(
  1410                 _(
  1411                     'received command request frame with both new and '
  1411                     b'received command request frame with both new and '
  1412                     'continuation flags set'
  1412                     b'continuation flags set'
  1413                 )
  1413                 )
  1414             )
  1414             )
  1415 
  1415 
  1416         if not new and not continuation:
  1416         if not new and not continuation:
  1417             self._state = 'errored'
  1417             self._state = b'errored'
  1418             return self._makeerrorresult(
  1418             return self._makeerrorresult(
  1419                 _(
  1419                 _(
  1420                     'received command request frame with neither new nor '
  1420                     b'received command request frame with neither new nor '
  1421                     'continuation flags set'
  1421                     b'continuation flags set'
  1422                 )
  1422                 )
  1423             )
  1423             )
  1424 
  1424 
  1425     def _onframeinitial(self, frame):
  1425     def _onframeinitial(self, frame):
  1426         # Called when we receive a frame when in the "initial" state.
  1426         # Called when we receive a frame when in the "initial" state.
  1427         if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
  1427         if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
  1428             self._state = 'protocol-settings-receiving'
  1428             self._state = b'protocol-settings-receiving'
  1429             self._protocolsettingsdecoder = cborutil.bufferingdecoder()
  1429             self._protocolsettingsdecoder = cborutil.bufferingdecoder()
  1430             return self._onframeprotocolsettings(frame)
  1430             return self._onframeprotocolsettings(frame)
  1431 
  1431 
  1432         elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
  1432         elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
  1433             self._state = 'idle'
  1433             self._state = b'idle'
  1434             return self._onframeidle(frame)
  1434             return self._onframeidle(frame)
  1435 
  1435 
  1436         else:
  1436         else:
  1437             self._state = 'errored'
  1437             self._state = b'errored'
  1438             return self._makeerrorresult(
  1438             return self._makeerrorresult(
  1439                 _(
  1439                 _(
  1440                     'expected sender protocol settings or command request '
  1440                     b'expected sender protocol settings or command request '
  1441                     'frame; got %d'
  1441                     b'frame; got %d'
  1442                 )
  1442                 )
  1443                 % frame.typeid
  1443                 % frame.typeid
  1444             )
  1444             )
  1445 
  1445 
  1446     def _onframeprotocolsettings(self, frame):
  1446     def _onframeprotocolsettings(self, frame):
  1447         assert self._state == 'protocol-settings-receiving'
  1447         assert self._state == b'protocol-settings-receiving'
  1448         assert self._protocolsettingsdecoder is not None
  1448         assert self._protocolsettingsdecoder is not None
  1449 
  1449 
  1450         if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
  1450         if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
  1451             self._state = 'errored'
  1451             self._state = b'errored'
  1452             return self._makeerrorresult(
  1452             return self._makeerrorresult(
  1453                 _('expected sender protocol settings frame; got %d')
  1453                 _(b'expected sender protocol settings frame; got %d')
  1454                 % frame.typeid
  1454                 % frame.typeid
  1455             )
  1455             )
  1456 
  1456 
  1457         more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
  1457         more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
  1458         eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
  1458         eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
  1459 
  1459 
  1460         if more and eos:
  1460         if more and eos:
  1461             self._state = 'errored'
  1461             self._state = b'errored'
  1462             return self._makeerrorresult(
  1462             return self._makeerrorresult(
  1463                 _(
  1463                 _(
  1464                     'sender protocol settings frame cannot have both '
  1464                     b'sender protocol settings frame cannot have both '
  1465                     'continuation and end of stream flags set'
  1465                     b'continuation and end of stream flags set'
  1466                 )
  1466                 )
  1467             )
  1467             )
  1468 
  1468 
  1469         if not more and not eos:
  1469         if not more and not eos:
  1470             self._state = 'errored'
  1470             self._state = b'errored'
  1471             return self._makeerrorresult(
  1471             return self._makeerrorresult(
  1472                 _(
  1472                 _(
  1473                     'sender protocol settings frame must have continuation or '
  1473                     b'sender protocol settings frame must have continuation or '
  1474                     'end of stream flag set'
  1474                     b'end of stream flag set'
  1475                 )
  1475                 )
  1476             )
  1476             )
  1477 
  1477 
  1478         # TODO establish limits for maximum amount of data that can be
  1478         # TODO establish limits for maximum amount of data that can be
  1479         # buffered.
  1479         # buffered.
  1480         try:
  1480         try:
  1481             self._protocolsettingsdecoder.decode(frame.payload)
  1481             self._protocolsettingsdecoder.decode(frame.payload)
  1482         except Exception as e:
  1482         except Exception as e:
  1483             self._state = 'errored'
  1483             self._state = b'errored'
  1484             return self._makeerrorresult(
  1484             return self._makeerrorresult(
  1485                 _('error decoding CBOR from sender protocol settings frame: %s')
  1485                 _(
       
  1486                     b'error decoding CBOR from sender protocol settings frame: %s'
       
  1487                 )
  1486                 % stringutil.forcebytestr(e)
  1488                 % stringutil.forcebytestr(e)
  1487             )
  1489             )
  1488 
  1490 
  1489         if more:
  1491         if more:
  1490             return self._makewantframeresult()
  1492             return self._makewantframeresult()
  1493 
  1495 
  1494         decoded = self._protocolsettingsdecoder.getavailable()
  1496         decoded = self._protocolsettingsdecoder.getavailable()
  1495         self._protocolsettingsdecoder = None
  1497         self._protocolsettingsdecoder = None
  1496 
  1498 
  1497         if not decoded:
  1499         if not decoded:
  1498             self._state = 'errored'
  1500             self._state = b'errored'
  1499             return self._makeerrorresult(
  1501             return self._makeerrorresult(
  1500                 _('sender protocol settings frame did not contain CBOR data')
  1502                 _(b'sender protocol settings frame did not contain CBOR data')
  1501             )
  1503             )
  1502         elif len(decoded) > 1:
  1504         elif len(decoded) > 1:
  1503             self._state = 'errored'
  1505             self._state = b'errored'
  1504             return self._makeerrorresult(
  1506             return self._makeerrorresult(
  1505                 _(
  1507                 _(
  1506                     'sender protocol settings frame contained multiple CBOR '
  1508                     b'sender protocol settings frame contained multiple CBOR '
  1507                     'values'
  1509                     b'values'
  1508                 )
  1510                 )
  1509             )
  1511             )
  1510 
  1512 
  1511         d = decoded[0]
  1513         d = decoded[0]
  1512 
  1514 
  1513         if b'contentencodings' in d:
  1515         if b'contentencodings' in d:
  1514             self._sendersettings['contentencodings'] = d[b'contentencodings']
  1516             self._sendersettings[b'contentencodings'] = d[b'contentencodings']
  1515 
  1517 
  1516         self._state = 'idle'
  1518         self._state = b'idle'
  1517 
  1519 
  1518         return self._makewantframeresult()
  1520         return self._makewantframeresult()
  1519 
  1521 
  1520     def _onframeidle(self, frame):
  1522     def _onframeidle(self, frame):
  1521         # The only frame type that should be received in this state is a
  1523         # The only frame type that should be received in this state is a
  1522         # command request.
  1524         # command request.
  1523         if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
  1525         if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
  1524             self._state = 'errored'
  1526             self._state = b'errored'
  1525             return self._makeerrorresult(
  1527             return self._makeerrorresult(
  1526                 _('expected command request frame; got %d') % frame.typeid
  1528                 _(b'expected command request frame; got %d') % frame.typeid
  1527             )
  1529             )
  1528 
  1530 
  1529         res = self._validatecommandrequestframe(frame)
  1531         res = self._validatecommandrequestframe(frame)
  1530         if res:
  1532         if res:
  1531             return res
  1533             return res
  1532 
  1534 
  1533         if frame.requestid in self._receivingcommands:
  1535         if frame.requestid in self._receivingcommands:
  1534             self._state = 'errored'
  1536             self._state = b'errored'
  1535             return self._makeerrorresult(
  1537             return self._makeerrorresult(
  1536                 _('request with ID %d already received') % frame.requestid
  1538                 _(b'request with ID %d already received') % frame.requestid
  1537             )
  1539             )
  1538 
  1540 
  1539         if frame.requestid in self._activecommands:
  1541         if frame.requestid in self._activecommands:
  1540             self._state = 'errored'
  1542             self._state = b'errored'
  1541             return self._makeerrorresult(
  1543             return self._makeerrorresult(
  1542                 _('request with ID %d is already active') % frame.requestid
  1544                 _(b'request with ID %d is already active') % frame.requestid
  1543             )
  1545             )
  1544 
  1546 
  1545         new = frame.flags & FLAG_COMMAND_REQUEST_NEW
  1547         new = frame.flags & FLAG_COMMAND_REQUEST_NEW
  1546         moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
  1548         moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
  1547         expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
  1549         expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
  1548 
  1550 
  1549         if not new:
  1551         if not new:
  1550             self._state = 'errored'
  1552             self._state = b'errored'
  1551             return self._makeerrorresult(
  1553             return self._makeerrorresult(
  1552                 _('received command request frame without new flag set')
  1554                 _(b'received command request frame without new flag set')
  1553             )
  1555             )
  1554 
  1556 
  1555         payload = util.bytesio()
  1557         payload = util.bytesio()
  1556         payload.write(frame.payload)
  1558         payload.write(frame.payload)
  1557 
  1559 
  1558         self._receivingcommands[frame.requestid] = {
  1560         self._receivingcommands[frame.requestid] = {
  1559             'payload': payload,
  1561             b'payload': payload,
  1560             'data': None,
  1562             b'data': None,
  1561             'requestdone': not moreframes,
  1563             b'requestdone': not moreframes,
  1562             'expectingdata': bool(expectingdata),
  1564             b'expectingdata': bool(expectingdata),
  1563         }
  1565         }
  1564 
  1566 
  1565         # This is the final frame for this request. Dispatch it.
  1567         # This is the final frame for this request. Dispatch it.
  1566         if not moreframes and not expectingdata:
  1568         if not moreframes and not expectingdata:
  1567             return self._makeruncommandresult(frame.requestid)
  1569             return self._makeruncommandresult(frame.requestid)
  1568 
  1570 
  1569         assert moreframes or expectingdata
  1571         assert moreframes or expectingdata
  1570         self._state = 'command-receiving'
  1572         self._state = b'command-receiving'
  1571         return self._makewantframeresult()
  1573         return self._makewantframeresult()
  1572 
  1574 
  1573     def _onframecommandreceiving(self, frame):
  1575     def _onframecommandreceiving(self, frame):
  1574         if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
  1576         if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
  1575             # Process new command requests as such.
  1577             # Process new command requests as such.
  1581                 return res
  1583                 return res
  1582 
  1584 
  1583         # All other frames should be related to a command that is currently
  1585         # All other frames should be related to a command that is currently
  1584         # receiving but is not active.
  1586         # receiving but is not active.
  1585         if frame.requestid in self._activecommands:
  1587         if frame.requestid in self._activecommands:
  1586             self._state = 'errored'
  1588             self._state = b'errored'
  1587             return self._makeerrorresult(
  1589             return self._makeerrorresult(
  1588                 _('received frame for request that is still active: %d')
  1590                 _(b'received frame for request that is still active: %d')
  1589                 % frame.requestid
  1591                 % frame.requestid
  1590             )
  1592             )
  1591 
  1593 
  1592         if frame.requestid not in self._receivingcommands:
  1594         if frame.requestid not in self._receivingcommands:
  1593             self._state = 'errored'
  1595             self._state = b'errored'
  1594             return self._makeerrorresult(
  1596             return self._makeerrorresult(
  1595                 _('received frame for request that is not receiving: %d')
  1597                 _(b'received frame for request that is not receiving: %d')
  1596                 % frame.requestid
  1598                 % frame.requestid
  1597             )
  1599             )
  1598 
  1600 
  1599         entry = self._receivingcommands[frame.requestid]
  1601         entry = self._receivingcommands[frame.requestid]
  1600 
  1602 
  1601         if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
  1603         if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
  1602             moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
  1604             moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
  1603             expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
  1605             expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
  1604 
  1606 
  1605             if entry['requestdone']:
  1607             if entry[b'requestdone']:
  1606                 self._state = 'errored'
  1608                 self._state = b'errored'
  1607                 return self._makeerrorresult(
  1609                 return self._makeerrorresult(
  1608                     _(
  1610                     _(
  1609                         'received command request frame when request frames '
  1611                         b'received command request frame when request frames '
  1610                         'were supposedly done'
  1612                         b'were supposedly done'
  1611                     )
  1613                     )
  1612                 )
  1614                 )
  1613 
  1615 
  1614             if expectingdata != entry['expectingdata']:
  1616             if expectingdata != entry[b'expectingdata']:
  1615                 self._state = 'errored'
  1617                 self._state = b'errored'
  1616                 return self._makeerrorresult(
  1618                 return self._makeerrorresult(
  1617                     _('mismatch between expect data flag and previous frame')
  1619                     _(b'mismatch between expect data flag and previous frame')
  1618                 )
  1620                 )
  1619 
  1621 
  1620             entry['payload'].write(frame.payload)
  1622             entry[b'payload'].write(frame.payload)
  1621 
  1623 
  1622             if not moreframes:
  1624             if not moreframes:
  1623                 entry['requestdone'] = True
  1625                 entry[b'requestdone'] = True
  1624 
  1626 
  1625             if not moreframes and not expectingdata:
  1627             if not moreframes and not expectingdata:
  1626                 return self._makeruncommandresult(frame.requestid)
  1628                 return self._makeruncommandresult(frame.requestid)
  1627 
  1629 
  1628             return self._makewantframeresult()
  1630             return self._makewantframeresult()
  1629 
  1631 
  1630         elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
  1632         elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
  1631             if not entry['expectingdata']:
  1633             if not entry[b'expectingdata']:
  1632                 self._state = 'errored'
  1634                 self._state = b'errored'
  1633                 return self._makeerrorresult(
  1635                 return self._makeerrorresult(
  1634                     _(
  1636                     _(
  1635                         'received command data frame for request that is not '
  1637                         b'received command data frame for request that is not '
  1636                         'expecting data: %d'
  1638                         b'expecting data: %d'
  1637                     )
  1639                     )
  1638                     % frame.requestid
  1640                     % frame.requestid
  1639                 )
  1641                 )
  1640 
  1642 
  1641             if entry['data'] is None:
  1643             if entry[b'data'] is None:
  1642                 entry['data'] = util.bytesio()
  1644                 entry[b'data'] = util.bytesio()
  1643 
  1645 
  1644             return self._handlecommanddataframe(frame, entry)
  1646             return self._handlecommanddataframe(frame, entry)
  1645         else:
  1647         else:
  1646             self._state = 'errored'
  1648             self._state = b'errored'
  1647             return self._makeerrorresult(
  1649             return self._makeerrorresult(
  1648                 _('received unexpected frame type: %d') % frame.typeid
  1650                 _(b'received unexpected frame type: %d') % frame.typeid
  1649             )
  1651             )
  1650 
  1652 
  1651     def _handlecommanddataframe(self, frame, entry):
  1653     def _handlecommanddataframe(self, frame, entry):
  1652         assert frame.typeid == FRAME_TYPE_COMMAND_DATA
  1654         assert frame.typeid == FRAME_TYPE_COMMAND_DATA
  1653 
  1655 
  1654         # TODO support streaming data instead of buffering it.
  1656         # TODO support streaming data instead of buffering it.
  1655         entry['data'].write(frame.payload)
  1657         entry[b'data'].write(frame.payload)
  1656 
  1658 
  1657         if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
  1659         if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
  1658             return self._makewantframeresult()
  1660             return self._makewantframeresult()
  1659         elif frame.flags & FLAG_COMMAND_DATA_EOS:
  1661         elif frame.flags & FLAG_COMMAND_DATA_EOS:
  1660             entry['data'].seek(0)
  1662             entry[b'data'].seek(0)
  1661             return self._makeruncommandresult(frame.requestid)
  1663             return self._makeruncommandresult(frame.requestid)
  1662         else:
  1664         else:
  1663             self._state = 'errored'
  1665             self._state = b'errored'
  1664             return self._makeerrorresult(
  1666             return self._makeerrorresult(
  1665                 _('command data frame without ' 'flags')
  1667                 _(b'command data frame without ' b'flags')
  1666             )
  1668             )
  1667 
  1669 
  1668     def _onframeerrored(self, frame):
  1670     def _onframeerrored(self, frame):
  1669         return self._makeerrorresult(_('server already errored'))
  1671         return self._makeerrorresult(_(b'server already errored'))
  1670 
  1672 
  1671 
  1673 
  1672 class commandrequest(object):
  1674 class commandrequest(object):
  1673     """Represents a request to run a command."""
  1675     """Represents a request to run a command."""
  1674 
  1676 
  1676         self.requestid = requestid
  1678         self.requestid = requestid
  1677         self.name = name
  1679         self.name = name
  1678         self.args = args
  1680         self.args = args
  1679         self.datafh = datafh
  1681         self.datafh = datafh
  1680         self.redirect = redirect
  1682         self.redirect = redirect
  1681         self.state = 'pending'
  1683         self.state = b'pending'
  1682 
  1684 
  1683 
  1685 
  1684 class clientreactor(object):
  1686 class clientreactor(object):
  1685     """Holds state of a client issuing frame-based protocol requests.
  1687     """Holds state of a client issuing frame-based protocol requests.
  1686 
  1688 
  1776         and an optional file object containing the raw data for the command.
  1778         and an optional file object containing the raw data for the command.
  1777 
  1779 
  1778         Returns a 3-tuple of (request, action, action data).
  1780         Returns a 3-tuple of (request, action, action data).
  1779         """
  1781         """
  1780         if not self._canissuecommands:
  1782         if not self._canissuecommands:
  1781             raise error.ProgrammingError('cannot issue new commands')
  1783             raise error.ProgrammingError(b'cannot issue new commands')
  1782 
  1784 
  1783         requestid = self._nextrequestid
  1785         requestid = self._nextrequestid
  1784         self._nextrequestid += 2
  1786         self._nextrequestid += 2
  1785 
  1787 
  1786         request = commandrequest(
  1788         request = commandrequest(
  1787             requestid, name, args, datafh=datafh, redirect=redirect
  1789             requestid, name, args, datafh=datafh, redirect=redirect
  1788         )
  1790         )
  1789 
  1791 
  1790         if self._buffersends:
  1792         if self._buffersends:
  1791             self._pendingrequests.append(request)
  1793             self._pendingrequests.append(request)
  1792             return request, 'noop', {}
  1794             return request, b'noop', {}
  1793         else:
  1795         else:
  1794             if not self._cansend:
  1796             if not self._cansend:
  1795                 raise error.ProgrammingError(
  1797                 raise error.ProgrammingError(
  1796                     'sends cannot be performed on ' 'this instance'
  1798                     b'sends cannot be performed on ' b'this instance'
  1797                 )
  1799                 )
  1798 
  1800 
  1799             if not self._hasmultiplesend:
  1801             if not self._hasmultiplesend:
  1800                 self._cansend = False
  1802                 self._cansend = False
  1801                 self._canissuecommands = False
  1803                 self._canissuecommands = False
  1802 
  1804 
  1803             return (
  1805             return (
  1804                 request,
  1806                 request,
  1805                 'sendframes',
  1807                 b'sendframes',
  1806                 {'framegen': self._makecommandframes(request),},
  1808                 {b'framegen': self._makecommandframes(request),},
  1807             )
  1809             )
  1808 
  1810 
  1809     def flushcommands(self):
  1811     def flushcommands(self):
  1810         """Request that all queued commands be sent.
  1812         """Request that all queued commands be sent.
  1811 
  1813 
  1815 
  1817 
  1816         If instances aren't configured for multiple sends, no new command
  1818         If instances aren't configured for multiple sends, no new command
  1817         requests are allowed after this is called.
  1819         requests are allowed after this is called.
  1818         """
  1820         """
  1819         if not self._pendingrequests:
  1821         if not self._pendingrequests:
  1820             return 'noop', {}
  1822             return b'noop', {}
  1821 
  1823 
  1822         if not self._cansend:
  1824         if not self._cansend:
  1823             raise error.ProgrammingError(
  1825             raise error.ProgrammingError(
  1824                 'sends cannot be performed on this ' 'instance'
  1826                 b'sends cannot be performed on this ' b'instance'
  1825             )
  1827             )
  1826 
  1828 
  1827         # If the instance only allows sending once, mark that we have fired
  1829         # If the instance only allows sending once, mark that we have fired
  1828         # our one shot.
  1830         # our one shot.
  1829         if not self._hasmultiplesend:
  1831         if not self._hasmultiplesend:
  1834             while self._pendingrequests:
  1836             while self._pendingrequests:
  1835                 request = self._pendingrequests.popleft()
  1837                 request = self._pendingrequests.popleft()
  1836                 for frame in self._makecommandframes(request):
  1838                 for frame in self._makecommandframes(request):
  1837                     yield frame
  1839                     yield frame
  1838 
  1840 
  1839         return 'sendframes', {'framegen': makeframes(),}
  1841         return b'sendframes', {b'framegen': makeframes(),}
  1840 
  1842 
  1841     def _makecommandframes(self, request):
  1843     def _makecommandframes(self, request):
  1842         """Emit frames to issue a command request.
  1844         """Emit frames to issue a command request.
  1843 
  1845 
  1844         As a side-effect, update request accounting to reflect its changed
  1846         As a side-effect, update request accounting to reflect its changed
  1845         state.
  1847         state.
  1846         """
  1848         """
  1847         self._activerequests[request.requestid] = request
  1849         self._activerequests[request.requestid] = request
  1848         request.state = 'sending'
  1850         request.state = b'sending'
  1849 
  1851 
  1850         if not self._protocolsettingssent and self._clientcontentencoders:
  1852         if not self._protocolsettingssent and self._clientcontentencoders:
  1851             self._protocolsettingssent = True
  1853             self._protocolsettingssent = True
  1852 
  1854 
  1853             payload = b''.join(
  1855             payload = b''.join(
  1873         )
  1875         )
  1874 
  1876 
  1875         for frame in res:
  1877         for frame in res:
  1876             yield frame
  1878             yield frame
  1877 
  1879 
  1878         request.state = 'sent'
  1880         request.state = b'sent'
  1879 
  1881 
  1880     def onframerecv(self, frame):
  1882     def onframerecv(self, frame):
  1881         """Process a frame that has been received off the wire.
  1883         """Process a frame that has been received off the wire.
  1882 
  1884 
  1883         Returns a 2-tuple of (action, meta) describing further action the
  1885         Returns a 2-tuple of (action, meta) describing further action the
  1884         caller needs to take as a result of receiving this frame.
  1886         caller needs to take as a result of receiving this frame.
  1885         """
  1887         """
  1886         if frame.streamid % 2:
  1888         if frame.streamid % 2:
  1887             return (
  1889             return (
  1888                 'error',
  1890                 b'error',
  1889                 {
  1891                 {
  1890                     'message': (
  1892                     b'message': (
  1891                         _('received frame with odd numbered stream ID: %d')
  1893                         _(b'received frame with odd numbered stream ID: %d')
  1892                         % frame.streamid
  1894                         % frame.streamid
  1893                     ),
  1895                     ),
  1894                 },
  1896                 },
  1895             )
  1897             )
  1896 
  1898 
  1897         if frame.streamid not in self._incomingstreams:
  1899         if frame.streamid not in self._incomingstreams:
  1898             if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
  1900             if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
  1899                 return (
  1901                 return (
  1900                     'error',
  1902                     b'error',
  1901                     {
  1903                     {
  1902                         'message': _(
  1904                         b'message': _(
  1903                             'received frame on unknown stream '
  1905                             b'received frame on unknown stream '
  1904                             'without beginning of stream flag set'
  1906                             b'without beginning of stream flag set'
  1905                         ),
  1907                         ),
  1906                     },
  1908                     },
  1907                 )
  1909                 )
  1908 
  1910 
  1909             self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
  1911             self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
  1922         if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
  1924         if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
  1923             return self._onstreamsettingsframe(frame)
  1925             return self._onstreamsettingsframe(frame)
  1924 
  1926 
  1925         if frame.requestid not in self._activerequests:
  1927         if frame.requestid not in self._activerequests:
  1926             return (
  1928             return (
  1927                 'error',
  1929                 b'error',
  1928                 {
  1930                 {
  1929                     'message': (
  1931                     b'message': (
  1930                         _('received frame for inactive request ID: %d')
  1932                         _(b'received frame for inactive request ID: %d')
  1931                         % frame.requestid
  1933                         % frame.requestid
  1932                     ),
  1934                     ),
  1933                 },
  1935                 },
  1934             )
  1936             )
  1935 
  1937 
  1936         request = self._activerequests[frame.requestid]
  1938         request = self._activerequests[frame.requestid]
  1937         request.state = 'receiving'
  1939         request.state = b'receiving'
  1938 
  1940 
  1939         handlers = {
  1941         handlers = {
  1940             FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
  1942             FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
  1941             FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
  1943             FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
  1942         }
  1944         }
  1943 
  1945 
  1944         meth = handlers.get(frame.typeid)
  1946         meth = handlers.get(frame.typeid)
  1945         if not meth:
  1947         if not meth:
  1946             raise error.ProgrammingError(
  1948             raise error.ProgrammingError(
  1947                 'unhandled frame type: %d' % frame.typeid
  1949                 b'unhandled frame type: %d' % frame.typeid
  1948             )
  1950             )
  1949 
  1951 
  1950         return meth(request, frame)
  1952         return meth(request, frame)
  1951 
  1953 
  1952     def _onstreamsettingsframe(self, frame):
  1954     def _onstreamsettingsframe(self, frame):
  1955         more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
  1957         more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
  1956         eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
  1958         eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
  1957 
  1959 
  1958         if more and eos:
  1960         if more and eos:
  1959             return (
  1961             return (
  1960                 'error',
  1962                 b'error',
  1961                 {
  1963                 {
  1962                     'message': (
  1964                     b'message': (
  1963                         _(
  1965                         _(
  1964                             'stream encoding settings frame cannot have both '
  1966                             b'stream encoding settings frame cannot have both '
  1965                             'continuation and end of stream flags set'
  1967                             b'continuation and end of stream flags set'
  1966                         )
  1968                         )
  1967                     ),
  1969                     ),
  1968                 },
  1970                 },
  1969             )
  1971             )
  1970 
  1972 
  1971         if not more and not eos:
  1973         if not more and not eos:
  1972             return (
  1974             return (
  1973                 'error',
  1975                 b'error',
  1974                 {
  1976                 {
  1975                     'message': _(
  1977                     b'message': _(
  1976                         'stream encoding settings frame must have '
  1978                         b'stream encoding settings frame must have '
  1977                         'continuation or end of stream flag set'
  1979                         b'continuation or end of stream flag set'
  1978                     ),
  1980                     ),
  1979                 },
  1981                 },
  1980             )
  1982             )
  1981 
  1983 
  1982         if frame.streamid not in self._streamsettingsdecoders:
  1984         if frame.streamid not in self._streamsettingsdecoders:
  1987 
  1989 
  1988         try:
  1990         try:
  1989             decoder.decode(frame.payload)
  1991             decoder.decode(frame.payload)
  1990         except Exception as e:
  1992         except Exception as e:
  1991             return (
  1993             return (
  1992                 'error',
  1994                 b'error',
  1993                 {
  1995                 {
  1994                     'message': (
  1996                     b'message': (
  1995                         _(
  1997                         _(
  1996                             'error decoding CBOR from stream encoding '
  1998                             b'error decoding CBOR from stream encoding '
  1997                             'settings frame: %s'
  1999                             b'settings frame: %s'
  1998                         )
  2000                         )
  1999                         % stringutil.forcebytestr(e)
  2001                         % stringutil.forcebytestr(e)
  2000                     ),
  2002                     ),
  2001                 },
  2003                 },
  2002             )
  2004             )
  2003 
  2005 
  2004         if more:
  2006         if more:
  2005             return 'noop', {}
  2007             return b'noop', {}
  2006 
  2008 
  2007         assert eos
  2009         assert eos
  2008 
  2010 
  2009         decoded = decoder.getavailable()
  2011         decoded = decoder.getavailable()
  2010         del self._streamsettingsdecoders[frame.streamid]
  2012         del self._streamsettingsdecoders[frame.streamid]
  2011 
  2013 
  2012         if not decoded:
  2014         if not decoded:
  2013             return (
  2015             return (
  2014                 'error',
  2016                 b'error',
  2015                 {
  2017                 {
  2016                     'message': _(
  2018                     b'message': _(
  2017                         'stream encoding settings frame did not contain '
  2019                         b'stream encoding settings frame did not contain '
  2018                         'CBOR data'
  2020                         b'CBOR data'
  2019                     ),
  2021                     ),
  2020                 },
  2022                 },
  2021             )
  2023             )
  2022 
  2024 
  2023         try:
  2025         try:
  2024             self._incomingstreams[frame.streamid].setdecoder(
  2026             self._incomingstreams[frame.streamid].setdecoder(
  2025                 self._ui, decoded[0], decoded[1:]
  2027                 self._ui, decoded[0], decoded[1:]
  2026             )
  2028             )
  2027         except Exception as e:
  2029         except Exception as e:
  2028             return (
  2030             return (
  2029                 'error',
  2031                 b'error',
  2030                 {
  2032                 {
  2031                     'message': (
  2033                     b'message': (
  2032                         _('error setting stream decoder: %s')
  2034                         _(b'error setting stream decoder: %s')
  2033                         % stringutil.forcebytestr(e)
  2035                         % stringutil.forcebytestr(e)
  2034                     ),
  2036                     ),
  2035                 },
  2037                 },
  2036             )
  2038             )
  2037 
  2039 
  2038         return 'noop', {}
  2040         return b'noop', {}
  2039 
  2041 
  2040     def _oncommandresponseframe(self, request, frame):
  2042     def _oncommandresponseframe(self, request, frame):
  2041         if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
  2043         if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
  2042             request.state = 'received'
  2044             request.state = b'received'
  2043             del self._activerequests[request.requestid]
  2045             del self._activerequests[request.requestid]
  2044 
  2046 
  2045         return (
  2047         return (
  2046             'responsedata',
  2048             b'responsedata',
  2047             {
  2049             {
  2048                 'request': request,
  2050                 b'request': request,
  2049                 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
  2051                 b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
  2050                 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
  2052                 b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
  2051                 'data': frame.payload,
  2053                 b'data': frame.payload,
  2052             },
  2054             },
  2053         )
  2055         )
  2054 
  2056 
  2055     def _onerrorresponseframe(self, request, frame):
  2057     def _onerrorresponseframe(self, request, frame):
  2056         request.state = 'errored'
  2058         request.state = b'errored'
  2057         del self._activerequests[request.requestid]
  2059         del self._activerequests[request.requestid]
  2058 
  2060 
  2059         # The payload should be a CBOR map.
  2061         # The payload should be a CBOR map.
  2060         m = cborutil.decodeall(frame.payload)[0]
  2062         m = cborutil.decodeall(frame.payload)[0]
  2061 
  2063 
  2062         return (
  2064         return (
  2063             'error',
  2065             b'error',
  2064             {'request': request, 'type': m['type'], 'message': m['message'],},
  2066             {
       
  2067                 b'request': request,
       
  2068                 b'type': m[b'type'],
       
  2069                 b'message': m[b'message'],
       
  2070             },
  2065         )
  2071         )