1091 |
1091 |
1092 Returns a dict with an ``action`` key that details what action, |
1092 Returns a dict with an ``action`` key that details what action, |
1093 if any, the consumer should take next. |
1093 if any, the consumer should take next. |
1094 """ |
1094 """ |
1095 if not frame.streamid % 2: |
1095 if not frame.streamid % 2: |
1096 self._state = 'errored' |
1096 self._state = b'errored' |
1097 return self._makeerrorresult( |
1097 return self._makeerrorresult( |
1098 _('received frame with even numbered stream ID: %d') |
1098 _(b'received frame with even numbered stream ID: %d') |
1099 % frame.streamid |
1099 % frame.streamid |
1100 ) |
1100 ) |
1101 |
1101 |
1102 if frame.streamid not in self._incomingstreams: |
1102 if frame.streamid not in self._incomingstreams: |
1103 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: |
1103 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: |
1104 self._state = 'errored' |
1104 self._state = b'errored' |
1105 return self._makeerrorresult( |
1105 return self._makeerrorresult( |
1106 _( |
1106 _( |
1107 'received frame on unknown inactive stream without ' |
1107 b'received frame on unknown inactive stream without ' |
1108 'beginning of stream flag set' |
1108 b'beginning of stream flag set' |
1109 ) |
1109 ) |
1110 ) |
1110 ) |
1111 |
1111 |
1112 self._incomingstreams[frame.streamid] = inputstream(frame.streamid) |
1112 self._incomingstreams[frame.streamid] = inputstream(frame.streamid) |
1113 |
1113 |
1114 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
1114 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
1115 # TODO handle decoding frames |
1115 # TODO handle decoding frames |
1116 self._state = 'errored' |
1116 self._state = b'errored' |
1117 raise error.ProgrammingError( |
1117 raise error.ProgrammingError( |
1118 'support for decoding stream payloads ' 'not yet implemented' |
1118 b'support for decoding stream payloads ' b'not yet implemented' |
1119 ) |
1119 ) |
1120 |
1120 |
1121 if frame.streamflags & STREAM_FLAG_END_STREAM: |
1121 if frame.streamflags & STREAM_FLAG_END_STREAM: |
1122 del self._incomingstreams[frame.streamid] |
1122 del self._incomingstreams[frame.streamid] |
1123 |
1123 |
1124 handlers = { |
1124 handlers = { |
1125 'initial': self._onframeinitial, |
1125 b'initial': self._onframeinitial, |
1126 'protocol-settings-receiving': self._onframeprotocolsettings, |
1126 b'protocol-settings-receiving': self._onframeprotocolsettings, |
1127 'idle': self._onframeidle, |
1127 b'idle': self._onframeidle, |
1128 'command-receiving': self._onframecommandreceiving, |
1128 b'command-receiving': self._onframecommandreceiving, |
1129 'errored': self._onframeerrored, |
1129 b'errored': self._onframeerrored, |
1130 } |
1130 } |
1131 |
1131 |
1132 meth = handlers.get(self._state) |
1132 meth = handlers.get(self._state) |
1133 if not meth: |
1133 if not meth: |
1134 raise error.ProgrammingError('unhandled state: %s' % self._state) |
1134 raise error.ProgrammingError(b'unhandled state: %s' % self._state) |
1135 |
1135 |
1136 return meth(frame) |
1136 return meth(frame) |
1137 |
1137 |
1138 def oncommandresponsereadyobjects(self, stream, requestid, objs): |
1138 def oncommandresponsereadyobjects(self, stream, requestid, objs): |
1139 """Signal that objects are ready to be sent to the client. |
1139 """Signal that objects are ready to be sent to the client. |
1283 completed. |
1283 completed. |
1284 """ |
1284 """ |
1285 # TODO should we do anything about in-flight commands? |
1285 # TODO should we do anything about in-flight commands? |
1286 |
1286 |
1287 if not self._deferoutput or not self._bufferedframegens: |
1287 if not self._deferoutput or not self._bufferedframegens: |
1288 return 'noop', {} |
1288 return b'noop', {} |
1289 |
1289 |
1290 # If we buffered all our responses, emit those. |
1290 # If we buffered all our responses, emit those. |
1291 def makegen(): |
1291 def makegen(): |
1292 for gen in self._bufferedframegens: |
1292 for gen in self._bufferedframegens: |
1293 for frame in gen: |
1293 for frame in gen: |
1294 yield frame |
1294 yield frame |
1295 |
1295 |
1296 return 'sendframes', {'framegen': makegen(),} |
1296 return b'sendframes', {b'framegen': makegen(),} |
1297 |
1297 |
1298 def _handlesendframes(self, framegen): |
1298 def _handlesendframes(self, framegen): |
1299 if self._deferoutput: |
1299 if self._deferoutput: |
1300 self._bufferedframegens.append(framegen) |
1300 self._bufferedframegens.append(framegen) |
1301 return 'noop', {} |
1301 return b'noop', {} |
1302 else: |
1302 else: |
1303 return 'sendframes', {'framegen': framegen,} |
1303 return b'sendframes', {b'framegen': framegen,} |
1304 |
1304 |
1305 def onservererror(self, stream, requestid, msg): |
1305 def onservererror(self, stream, requestid, msg): |
1306 ensureserverstream(stream) |
1306 ensureserverstream(stream) |
1307 |
1307 |
1308 def sendframes(): |
1308 def sendframes(): |
1309 for frame in createerrorframe( |
1309 for frame in createerrorframe( |
1310 stream, requestid, msg, errtype='server' |
1310 stream, requestid, msg, errtype=b'server' |
1311 ): |
1311 ): |
1312 yield frame |
1312 yield frame |
1313 |
1313 |
1314 self._activecommands.remove(requestid) |
1314 self._activecommands.remove(requestid) |
1315 |
1315 |
1343 self._outgoingstreams[streamid] = s |
1343 self._outgoingstreams[streamid] = s |
1344 |
1344 |
1345 # Always use the *server's* preferred encoder over the client's, |
1345 # Always use the *server's* preferred encoder over the client's, |
1346 # as servers have more to lose from sub-optimal encoders being used. |
1346 # as servers have more to lose from sub-optimal encoders being used. |
1347 for name in STREAM_ENCODERS_ORDER: |
1347 for name in STREAM_ENCODERS_ORDER: |
1348 if name in self._sendersettings['contentencodings']: |
1348 if name in self._sendersettings[b'contentencodings']: |
1349 s.setencoder(self._ui, name) |
1349 s.setencoder(self._ui, name) |
1350 break |
1350 break |
1351 |
1351 |
1352 return s |
1352 return s |
1353 |
1353 |
1354 def _makeerrorresult(self, msg): |
1354 def _makeerrorresult(self, msg): |
1355 return 'error', {'message': msg,} |
1355 return b'error', {b'message': msg,} |
1356 |
1356 |
1357 def _makeruncommandresult(self, requestid): |
1357 def _makeruncommandresult(self, requestid): |
1358 entry = self._receivingcommands[requestid] |
1358 entry = self._receivingcommands[requestid] |
1359 |
1359 |
1360 if not entry['requestdone']: |
1360 if not entry[b'requestdone']: |
1361 self._state = 'errored' |
1361 self._state = b'errored' |
1362 raise error.ProgrammingError( |
1362 raise error.ProgrammingError( |
1363 'should not be called without ' 'requestdone set' |
1363 b'should not be called without ' b'requestdone set' |
1364 ) |
1364 ) |
1365 |
1365 |
1366 del self._receivingcommands[requestid] |
1366 del self._receivingcommands[requestid] |
1367 |
1367 |
1368 if self._receivingcommands: |
1368 if self._receivingcommands: |
1369 self._state = 'command-receiving' |
1369 self._state = b'command-receiving' |
1370 else: |
1370 else: |
1371 self._state = 'idle' |
1371 self._state = b'idle' |
1372 |
1372 |
1373 # Decode the payloads as CBOR. |
1373 # Decode the payloads as CBOR. |
1374 entry['payload'].seek(0) |
1374 entry[b'payload'].seek(0) |
1375 request = cborutil.decodeall(entry['payload'].getvalue())[0] |
1375 request = cborutil.decodeall(entry[b'payload'].getvalue())[0] |
1376 |
1376 |
1377 if b'name' not in request: |
1377 if b'name' not in request: |
1378 self._state = 'errored' |
1378 self._state = b'errored' |
1379 return self._makeerrorresult( |
1379 return self._makeerrorresult( |
1380 _('command request missing "name" field') |
1380 _(b'command request missing "name" field') |
1381 ) |
1381 ) |
1382 |
1382 |
1383 if b'args' not in request: |
1383 if b'args' not in request: |
1384 request[b'args'] = {} |
1384 request[b'args'] = {} |
1385 |
1385 |
1386 assert requestid not in self._activecommands |
1386 assert requestid not in self._activecommands |
1387 self._activecommands.add(requestid) |
1387 self._activecommands.add(requestid) |
1388 |
1388 |
1389 return ( |
1389 return ( |
1390 'runcommand', |
1390 b'runcommand', |
1391 { |
1391 { |
1392 'requestid': requestid, |
1392 b'requestid': requestid, |
1393 'command': request[b'name'], |
1393 b'command': request[b'name'], |
1394 'args': request[b'args'], |
1394 b'args': request[b'args'], |
1395 'redirect': request.get(b'redirect'), |
1395 b'redirect': request.get(b'redirect'), |
1396 'data': entry['data'].getvalue() if entry['data'] else None, |
1396 b'data': entry[b'data'].getvalue() if entry[b'data'] else None, |
1397 }, |
1397 }, |
1398 ) |
1398 ) |
1399 |
1399 |
1400 def _makewantframeresult(self): |
1400 def _makewantframeresult(self): |
1401 return 'wantframe', {'state': self._state,} |
1401 return b'wantframe', {b'state': self._state,} |
1402 |
1402 |
1403 def _validatecommandrequestframe(self, frame): |
1403 def _validatecommandrequestframe(self, frame): |
1404 new = frame.flags & FLAG_COMMAND_REQUEST_NEW |
1404 new = frame.flags & FLAG_COMMAND_REQUEST_NEW |
1405 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION |
1405 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION |
1406 |
1406 |
1407 if new and continuation: |
1407 if new and continuation: |
1408 self._state = 'errored' |
1408 self._state = b'errored' |
1409 return self._makeerrorresult( |
1409 return self._makeerrorresult( |
1410 _( |
1410 _( |
1411 'received command request frame with both new and ' |
1411 b'received command request frame with both new and ' |
1412 'continuation flags set' |
1412 b'continuation flags set' |
1413 ) |
1413 ) |
1414 ) |
1414 ) |
1415 |
1415 |
1416 if not new and not continuation: |
1416 if not new and not continuation: |
1417 self._state = 'errored' |
1417 self._state = b'errored' |
1418 return self._makeerrorresult( |
1418 return self._makeerrorresult( |
1419 _( |
1419 _( |
1420 'received command request frame with neither new nor ' |
1420 b'received command request frame with neither new nor ' |
1421 'continuation flags set' |
1421 b'continuation flags set' |
1422 ) |
1422 ) |
1423 ) |
1423 ) |
1424 |
1424 |
1425 def _onframeinitial(self, frame): |
1425 def _onframeinitial(self, frame): |
1426 # Called when we receive a frame when in the "initial" state. |
1426 # Called when we receive a frame when in the "initial" state. |
1427 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: |
1427 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: |
1428 self._state = 'protocol-settings-receiving' |
1428 self._state = b'protocol-settings-receiving' |
1429 self._protocolsettingsdecoder = cborutil.bufferingdecoder() |
1429 self._protocolsettingsdecoder = cborutil.bufferingdecoder() |
1430 return self._onframeprotocolsettings(frame) |
1430 return self._onframeprotocolsettings(frame) |
1431 |
1431 |
1432 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST: |
1432 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST: |
1433 self._state = 'idle' |
1433 self._state = b'idle' |
1434 return self._onframeidle(frame) |
1434 return self._onframeidle(frame) |
1435 |
1435 |
1436 else: |
1436 else: |
1437 self._state = 'errored' |
1437 self._state = b'errored' |
1438 return self._makeerrorresult( |
1438 return self._makeerrorresult( |
1439 _( |
1439 _( |
1440 'expected sender protocol settings or command request ' |
1440 b'expected sender protocol settings or command request ' |
1441 'frame; got %d' |
1441 b'frame; got %d' |
1442 ) |
1442 ) |
1443 % frame.typeid |
1443 % frame.typeid |
1444 ) |
1444 ) |
1445 |
1445 |
1446 def _onframeprotocolsettings(self, frame): |
1446 def _onframeprotocolsettings(self, frame): |
1447 assert self._state == 'protocol-settings-receiving' |
1447 assert self._state == b'protocol-settings-receiving' |
1448 assert self._protocolsettingsdecoder is not None |
1448 assert self._protocolsettingsdecoder is not None |
1449 |
1449 |
1450 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: |
1450 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: |
1451 self._state = 'errored' |
1451 self._state = b'errored' |
1452 return self._makeerrorresult( |
1452 return self._makeerrorresult( |
1453 _('expected sender protocol settings frame; got %d') |
1453 _(b'expected sender protocol settings frame; got %d') |
1454 % frame.typeid |
1454 % frame.typeid |
1455 ) |
1455 ) |
1456 |
1456 |
1457 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION |
1457 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION |
1458 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS |
1458 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS |
1459 |
1459 |
1460 if more and eos: |
1460 if more and eos: |
1461 self._state = 'errored' |
1461 self._state = b'errored' |
1462 return self._makeerrorresult( |
1462 return self._makeerrorresult( |
1463 _( |
1463 _( |
1464 'sender protocol settings frame cannot have both ' |
1464 b'sender protocol settings frame cannot have both ' |
1465 'continuation and end of stream flags set' |
1465 b'continuation and end of stream flags set' |
1466 ) |
1466 ) |
1467 ) |
1467 ) |
1468 |
1468 |
1469 if not more and not eos: |
1469 if not more and not eos: |
1470 self._state = 'errored' |
1470 self._state = b'errored' |
1471 return self._makeerrorresult( |
1471 return self._makeerrorresult( |
1472 _( |
1472 _( |
1473 'sender protocol settings frame must have continuation or ' |
1473 b'sender protocol settings frame must have continuation or ' |
1474 'end of stream flag set' |
1474 b'end of stream flag set' |
1475 ) |
1475 ) |
1476 ) |
1476 ) |
1477 |
1477 |
1478 # TODO establish limits for maximum amount of data that can be |
1478 # TODO establish limits for maximum amount of data that can be |
1479 # buffered. |
1479 # buffered. |
1480 try: |
1480 try: |
1481 self._protocolsettingsdecoder.decode(frame.payload) |
1481 self._protocolsettingsdecoder.decode(frame.payload) |
1482 except Exception as e: |
1482 except Exception as e: |
1483 self._state = 'errored' |
1483 self._state = b'errored' |
1484 return self._makeerrorresult( |
1484 return self._makeerrorresult( |
1485 _('error decoding CBOR from sender protocol settings frame: %s') |
1485 _( |
|
1486 b'error decoding CBOR from sender protocol settings frame: %s' |
|
1487 ) |
1486 % stringutil.forcebytestr(e) |
1488 % stringutil.forcebytestr(e) |
1487 ) |
1489 ) |
1488 |
1490 |
1489 if more: |
1491 if more: |
1490 return self._makewantframeresult() |
1492 return self._makewantframeresult() |
1493 |
1495 |
1494 decoded = self._protocolsettingsdecoder.getavailable() |
1496 decoded = self._protocolsettingsdecoder.getavailable() |
1495 self._protocolsettingsdecoder = None |
1497 self._protocolsettingsdecoder = None |
1496 |
1498 |
1497 if not decoded: |
1499 if not decoded: |
1498 self._state = 'errored' |
1500 self._state = b'errored' |
1499 return self._makeerrorresult( |
1501 return self._makeerrorresult( |
1500 _('sender protocol settings frame did not contain CBOR data') |
1502 _(b'sender protocol settings frame did not contain CBOR data') |
1501 ) |
1503 ) |
1502 elif len(decoded) > 1: |
1504 elif len(decoded) > 1: |
1503 self._state = 'errored' |
1505 self._state = b'errored' |
1504 return self._makeerrorresult( |
1506 return self._makeerrorresult( |
1505 _( |
1507 _( |
1506 'sender protocol settings frame contained multiple CBOR ' |
1508 b'sender protocol settings frame contained multiple CBOR ' |
1507 'values' |
1509 b'values' |
1508 ) |
1510 ) |
1509 ) |
1511 ) |
1510 |
1512 |
1511 d = decoded[0] |
1513 d = decoded[0] |
1512 |
1514 |
1513 if b'contentencodings' in d: |
1515 if b'contentencodings' in d: |
1514 self._sendersettings['contentencodings'] = d[b'contentencodings'] |
1516 self._sendersettings[b'contentencodings'] = d[b'contentencodings'] |
1515 |
1517 |
1516 self._state = 'idle' |
1518 self._state = b'idle' |
1517 |
1519 |
1518 return self._makewantframeresult() |
1520 return self._makewantframeresult() |
1519 |
1521 |
1520 def _onframeidle(self, frame): |
1522 def _onframeidle(self, frame): |
1521 # The only frame type that should be received in this state is a |
1523 # The only frame type that should be received in this state is a |
1522 # command request. |
1524 # command request. |
1523 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST: |
1525 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST: |
1524 self._state = 'errored' |
1526 self._state = b'errored' |
1525 return self._makeerrorresult( |
1527 return self._makeerrorresult( |
1526 _('expected command request frame; got %d') % frame.typeid |
1528 _(b'expected command request frame; got %d') % frame.typeid |
1527 ) |
1529 ) |
1528 |
1530 |
1529 res = self._validatecommandrequestframe(frame) |
1531 res = self._validatecommandrequestframe(frame) |
1530 if res: |
1532 if res: |
1531 return res |
1533 return res |
1532 |
1534 |
1533 if frame.requestid in self._receivingcommands: |
1535 if frame.requestid in self._receivingcommands: |
1534 self._state = 'errored' |
1536 self._state = b'errored' |
1535 return self._makeerrorresult( |
1537 return self._makeerrorresult( |
1536 _('request with ID %d already received') % frame.requestid |
1538 _(b'request with ID %d already received') % frame.requestid |
1537 ) |
1539 ) |
1538 |
1540 |
1539 if frame.requestid in self._activecommands: |
1541 if frame.requestid in self._activecommands: |
1540 self._state = 'errored' |
1542 self._state = b'errored' |
1541 return self._makeerrorresult( |
1543 return self._makeerrorresult( |
1542 _('request with ID %d is already active') % frame.requestid |
1544 _(b'request with ID %d is already active') % frame.requestid |
1543 ) |
1545 ) |
1544 |
1546 |
1545 new = frame.flags & FLAG_COMMAND_REQUEST_NEW |
1547 new = frame.flags & FLAG_COMMAND_REQUEST_NEW |
1546 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES |
1548 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES |
1547 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA |
1549 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA |
1548 |
1550 |
1549 if not new: |
1551 if not new: |
1550 self._state = 'errored' |
1552 self._state = b'errored' |
1551 return self._makeerrorresult( |
1553 return self._makeerrorresult( |
1552 _('received command request frame without new flag set') |
1554 _(b'received command request frame without new flag set') |
1553 ) |
1555 ) |
1554 |
1556 |
1555 payload = util.bytesio() |
1557 payload = util.bytesio() |
1556 payload.write(frame.payload) |
1558 payload.write(frame.payload) |
1557 |
1559 |
1558 self._receivingcommands[frame.requestid] = { |
1560 self._receivingcommands[frame.requestid] = { |
1559 'payload': payload, |
1561 b'payload': payload, |
1560 'data': None, |
1562 b'data': None, |
1561 'requestdone': not moreframes, |
1563 b'requestdone': not moreframes, |
1562 'expectingdata': bool(expectingdata), |
1564 b'expectingdata': bool(expectingdata), |
1563 } |
1565 } |
1564 |
1566 |
1565 # This is the final frame for this request. Dispatch it. |
1567 # This is the final frame for this request. Dispatch it. |
1566 if not moreframes and not expectingdata: |
1568 if not moreframes and not expectingdata: |
1567 return self._makeruncommandresult(frame.requestid) |
1569 return self._makeruncommandresult(frame.requestid) |
1568 |
1570 |
1569 assert moreframes or expectingdata |
1571 assert moreframes or expectingdata |
1570 self._state = 'command-receiving' |
1572 self._state = b'command-receiving' |
1571 return self._makewantframeresult() |
1573 return self._makewantframeresult() |
1572 |
1574 |
1573 def _onframecommandreceiving(self, frame): |
1575 def _onframecommandreceiving(self, frame): |
1574 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: |
1576 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: |
1575 # Process new command requests as such. |
1577 # Process new command requests as such. |
1581 return res |
1583 return res |
1582 |
1584 |
1583 # All other frames should be related to a command that is currently |
1585 # All other frames should be related to a command that is currently |
1584 # receiving but is not active. |
1586 # receiving but is not active. |
1585 if frame.requestid in self._activecommands: |
1587 if frame.requestid in self._activecommands: |
1586 self._state = 'errored' |
1588 self._state = b'errored' |
1587 return self._makeerrorresult( |
1589 return self._makeerrorresult( |
1588 _('received frame for request that is still active: %d') |
1590 _(b'received frame for request that is still active: %d') |
1589 % frame.requestid |
1591 % frame.requestid |
1590 ) |
1592 ) |
1591 |
1593 |
1592 if frame.requestid not in self._receivingcommands: |
1594 if frame.requestid not in self._receivingcommands: |
1593 self._state = 'errored' |
1595 self._state = b'errored' |
1594 return self._makeerrorresult( |
1596 return self._makeerrorresult( |
1595 _('received frame for request that is not receiving: %d') |
1597 _(b'received frame for request that is not receiving: %d') |
1596 % frame.requestid |
1598 % frame.requestid |
1597 ) |
1599 ) |
1598 |
1600 |
1599 entry = self._receivingcommands[frame.requestid] |
1601 entry = self._receivingcommands[frame.requestid] |
1600 |
1602 |
1601 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: |
1603 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: |
1602 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES |
1604 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES |
1603 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA) |
1605 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA) |
1604 |
1606 |
1605 if entry['requestdone']: |
1607 if entry[b'requestdone']: |
1606 self._state = 'errored' |
1608 self._state = b'errored' |
1607 return self._makeerrorresult( |
1609 return self._makeerrorresult( |
1608 _( |
1610 _( |
1609 'received command request frame when request frames ' |
1611 b'received command request frame when request frames ' |
1610 'were supposedly done' |
1612 b'were supposedly done' |
1611 ) |
1613 ) |
1612 ) |
1614 ) |
1613 |
1615 |
1614 if expectingdata != entry['expectingdata']: |
1616 if expectingdata != entry[b'expectingdata']: |
1615 self._state = 'errored' |
1617 self._state = b'errored' |
1616 return self._makeerrorresult( |
1618 return self._makeerrorresult( |
1617 _('mismatch between expect data flag and previous frame') |
1619 _(b'mismatch between expect data flag and previous frame') |
1618 ) |
1620 ) |
1619 |
1621 |
1620 entry['payload'].write(frame.payload) |
1622 entry[b'payload'].write(frame.payload) |
1621 |
1623 |
1622 if not moreframes: |
1624 if not moreframes: |
1623 entry['requestdone'] = True |
1625 entry[b'requestdone'] = True |
1624 |
1626 |
1625 if not moreframes and not expectingdata: |
1627 if not moreframes and not expectingdata: |
1626 return self._makeruncommandresult(frame.requestid) |
1628 return self._makeruncommandresult(frame.requestid) |
1627 |
1629 |
1628 return self._makewantframeresult() |
1630 return self._makewantframeresult() |
1629 |
1631 |
1630 elif frame.typeid == FRAME_TYPE_COMMAND_DATA: |
1632 elif frame.typeid == FRAME_TYPE_COMMAND_DATA: |
1631 if not entry['expectingdata']: |
1633 if not entry[b'expectingdata']: |
1632 self._state = 'errored' |
1634 self._state = b'errored' |
1633 return self._makeerrorresult( |
1635 return self._makeerrorresult( |
1634 _( |
1636 _( |
1635 'received command data frame for request that is not ' |
1637 b'received command data frame for request that is not ' |
1636 'expecting data: %d' |
1638 b'expecting data: %d' |
1637 ) |
1639 ) |
1638 % frame.requestid |
1640 % frame.requestid |
1639 ) |
1641 ) |
1640 |
1642 |
1641 if entry['data'] is None: |
1643 if entry[b'data'] is None: |
1642 entry['data'] = util.bytesio() |
1644 entry[b'data'] = util.bytesio() |
1643 |
1645 |
1644 return self._handlecommanddataframe(frame, entry) |
1646 return self._handlecommanddataframe(frame, entry) |
1645 else: |
1647 else: |
1646 self._state = 'errored' |
1648 self._state = b'errored' |
1647 return self._makeerrorresult( |
1649 return self._makeerrorresult( |
1648 _('received unexpected frame type: %d') % frame.typeid |
1650 _(b'received unexpected frame type: %d') % frame.typeid |
1649 ) |
1651 ) |
1650 |
1652 |
1651 def _handlecommanddataframe(self, frame, entry): |
1653 def _handlecommanddataframe(self, frame, entry): |
1652 assert frame.typeid == FRAME_TYPE_COMMAND_DATA |
1654 assert frame.typeid == FRAME_TYPE_COMMAND_DATA |
1653 |
1655 |
1654 # TODO support streaming data instead of buffering it. |
1656 # TODO support streaming data instead of buffering it. |
1655 entry['data'].write(frame.payload) |
1657 entry[b'data'].write(frame.payload) |
1656 |
1658 |
1657 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: |
1659 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: |
1658 return self._makewantframeresult() |
1660 return self._makewantframeresult() |
1659 elif frame.flags & FLAG_COMMAND_DATA_EOS: |
1661 elif frame.flags & FLAG_COMMAND_DATA_EOS: |
1660 entry['data'].seek(0) |
1662 entry[b'data'].seek(0) |
1661 return self._makeruncommandresult(frame.requestid) |
1663 return self._makeruncommandresult(frame.requestid) |
1662 else: |
1664 else: |
1663 self._state = 'errored' |
1665 self._state = b'errored' |
1664 return self._makeerrorresult( |
1666 return self._makeerrorresult( |
1665 _('command data frame without ' 'flags') |
1667 _(b'command data frame without ' b'flags') |
1666 ) |
1668 ) |
1667 |
1669 |
1668 def _onframeerrored(self, frame): |
1670 def _onframeerrored(self, frame): |
1669 return self._makeerrorresult(_('server already errored')) |
1671 return self._makeerrorresult(_(b'server already errored')) |
1670 |
1672 |
1671 |
1673 |
1672 class commandrequest(object): |
1674 class commandrequest(object): |
1673 """Represents a request to run a command.""" |
1675 """Represents a request to run a command.""" |
1674 |
1676 |
1776 and an optional file object containing the raw data for the command. |
1778 and an optional file object containing the raw data for the command. |
1777 |
1779 |
1778 Returns a 3-tuple of (request, action, action data). |
1780 Returns a 3-tuple of (request, action, action data). |
1779 """ |
1781 """ |
1780 if not self._canissuecommands: |
1782 if not self._canissuecommands: |
1781 raise error.ProgrammingError('cannot issue new commands') |
1783 raise error.ProgrammingError(b'cannot issue new commands') |
1782 |
1784 |
1783 requestid = self._nextrequestid |
1785 requestid = self._nextrequestid |
1784 self._nextrequestid += 2 |
1786 self._nextrequestid += 2 |
1785 |
1787 |
1786 request = commandrequest( |
1788 request = commandrequest( |
1787 requestid, name, args, datafh=datafh, redirect=redirect |
1789 requestid, name, args, datafh=datafh, redirect=redirect |
1788 ) |
1790 ) |
1789 |
1791 |
1790 if self._buffersends: |
1792 if self._buffersends: |
1791 self._pendingrequests.append(request) |
1793 self._pendingrequests.append(request) |
1792 return request, 'noop', {} |
1794 return request, b'noop', {} |
1793 else: |
1795 else: |
1794 if not self._cansend: |
1796 if not self._cansend: |
1795 raise error.ProgrammingError( |
1797 raise error.ProgrammingError( |
1796 'sends cannot be performed on ' 'this instance' |
1798 b'sends cannot be performed on ' b'this instance' |
1797 ) |
1799 ) |
1798 |
1800 |
1799 if not self._hasmultiplesend: |
1801 if not self._hasmultiplesend: |
1800 self._cansend = False |
1802 self._cansend = False |
1801 self._canissuecommands = False |
1803 self._canissuecommands = False |
1802 |
1804 |
1803 return ( |
1805 return ( |
1804 request, |
1806 request, |
1805 'sendframes', |
1807 b'sendframes', |
1806 {'framegen': self._makecommandframes(request),}, |
1808 {b'framegen': self._makecommandframes(request),}, |
1807 ) |
1809 ) |
1808 |
1810 |
1809 def flushcommands(self): |
1811 def flushcommands(self): |
1810 """Request that all queued commands be sent. |
1812 """Request that all queued commands be sent. |
1811 |
1813 |
1987 |
1989 |
1988 try: |
1990 try: |
1989 decoder.decode(frame.payload) |
1991 decoder.decode(frame.payload) |
1990 except Exception as e: |
1992 except Exception as e: |
1991 return ( |
1993 return ( |
1992 'error', |
1994 b'error', |
1993 { |
1995 { |
1994 'message': ( |
1996 b'message': ( |
1995 _( |
1997 _( |
1996 'error decoding CBOR from stream encoding ' |
1998 b'error decoding CBOR from stream encoding ' |
1997 'settings frame: %s' |
1999 b'settings frame: %s' |
1998 ) |
2000 ) |
1999 % stringutil.forcebytestr(e) |
2001 % stringutil.forcebytestr(e) |
2000 ), |
2002 ), |
2001 }, |
2003 }, |
2002 ) |
2004 ) |
2003 |
2005 |
2004 if more: |
2006 if more: |
2005 return 'noop', {} |
2007 return b'noop', {} |
2006 |
2008 |
2007 assert eos |
2009 assert eos |
2008 |
2010 |
2009 decoded = decoder.getavailable() |
2011 decoded = decoder.getavailable() |
2010 del self._streamsettingsdecoders[frame.streamid] |
2012 del self._streamsettingsdecoders[frame.streamid] |
2011 |
2013 |
2012 if not decoded: |
2014 if not decoded: |
2013 return ( |
2015 return ( |
2014 'error', |
2016 b'error', |
2015 { |
2017 { |
2016 'message': _( |
2018 b'message': _( |
2017 'stream encoding settings frame did not contain ' |
2019 b'stream encoding settings frame did not contain ' |
2018 'CBOR data' |
2020 b'CBOR data' |
2019 ), |
2021 ), |
2020 }, |
2022 }, |
2021 ) |
2023 ) |
2022 |
2024 |
2023 try: |
2025 try: |
2024 self._incomingstreams[frame.streamid].setdecoder( |
2026 self._incomingstreams[frame.streamid].setdecoder( |
2025 self._ui, decoded[0], decoded[1:] |
2027 self._ui, decoded[0], decoded[1:] |
2026 ) |
2028 ) |
2027 except Exception as e: |
2029 except Exception as e: |
2028 return ( |
2030 return ( |
2029 'error', |
2031 b'error', |
2030 { |
2032 { |
2031 'message': ( |
2033 b'message': ( |
2032 _('error setting stream decoder: %s') |
2034 _(b'error setting stream decoder: %s') |
2033 % stringutil.forcebytestr(e) |
2035 % stringutil.forcebytestr(e) |
2034 ), |
2036 ), |
2035 }, |
2037 }, |
2036 ) |
2038 ) |
2037 |
2039 |
2038 return 'noop', {} |
2040 return b'noop', {} |
2039 |
2041 |
2040 def _oncommandresponseframe(self, request, frame): |
2042 def _oncommandresponseframe(self, request, frame): |
2041 if frame.flags & FLAG_COMMAND_RESPONSE_EOS: |
2043 if frame.flags & FLAG_COMMAND_RESPONSE_EOS: |
2042 request.state = 'received' |
2044 request.state = b'received' |
2043 del self._activerequests[request.requestid] |
2045 del self._activerequests[request.requestid] |
2044 |
2046 |
2045 return ( |
2047 return ( |
2046 'responsedata', |
2048 b'responsedata', |
2047 { |
2049 { |
2048 'request': request, |
2050 b'request': request, |
2049 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION, |
2051 b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION, |
2050 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS, |
2052 b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS, |
2051 'data': frame.payload, |
2053 b'data': frame.payload, |
2052 }, |
2054 }, |
2053 ) |
2055 ) |
2054 |
2056 |
2055 def _onerrorresponseframe(self, request, frame): |
2057 def _onerrorresponseframe(self, request, frame): |
2056 request.state = 'errored' |
2058 request.state = b'errored' |
2057 del self._activerequests[request.requestid] |
2059 del self._activerequests[request.requestid] |
2058 |
2060 |
2059 # The payload should be a CBOR map. |
2061 # The payload should be a CBOR map. |
2060 m = cborutil.decodeall(frame.payload)[0] |
2062 m = cborutil.decodeall(frame.payload)[0] |
2061 |
2063 |
2062 return ( |
2064 return ( |
2063 'error', |
2065 b'error', |
2064 {'request': request, 'type': m['type'], 'message': m['message'],}, |
2066 { |
|
2067 b'request': request, |
|
2068 b'type': m[b'type'], |
|
2069 b'message': m[b'message'], |
|
2070 }, |
2065 ) |
2071 ) |