383 flags=flags, |
383 flags=flags, |
384 payload=chunk) |
384 payload=chunk) |
385 |
385 |
386 if done: |
386 if done: |
387 break |
387 break |
|
388 |
|
389 def createbytesresponseframesfromgen(stream, requestid, gen, |
|
390 maxframesize=DEFAULT_MAX_FRAME_SIZE): |
|
391 overall = cbor.dumps({b'status': b'ok'}, canonical=True) |
|
392 |
|
393 yield stream.makeframe(requestid=requestid, |
|
394 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
|
395 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
|
396 payload=overall) |
|
397 |
|
398 cb = util.chunkbuffer(gen) |
|
399 |
|
400 flags = 0 |
|
401 |
|
402 while True: |
|
403 chunk = cb.read(maxframesize) |
|
404 if not chunk: |
|
405 break |
|
406 |
|
407 yield stream.makeframe(requestid=requestid, |
|
408 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
|
409 flags=flags, |
|
410 payload=chunk) |
|
411 |
|
412 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION |
|
413 |
|
414 flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION |
|
415 flags |= FLAG_COMMAND_RESPONSE_EOS |
|
416 yield stream.makeframe(requestid=requestid, |
|
417 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
|
418 flags=flags, |
|
419 payload=b'') |
|
420 |
|
421 def createcommanderrorresponse(stream, requestid, message, args=None): |
|
422 m = { |
|
423 b'status': b'error', |
|
424 b'error': { |
|
425 b'message': message, |
|
426 } |
|
427 } |
|
428 |
|
429 if args: |
|
430 m[b'error'][b'args'] = args |
|
431 |
|
432 overall = cbor.dumps(m, canonical=True) |
|
433 |
|
434 yield stream.makeframe(requestid=requestid, |
|
435 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
|
436 flags=FLAG_COMMAND_RESPONSE_EOS, |
|
437 payload=overall) |
388 |
438 |
389 def createerrorframe(stream, requestid, msg, errtype): |
439 def createerrorframe(stream, requestid, msg, errtype): |
390 # TODO properly handle frame size limits. |
440 # TODO properly handle frame size limits. |
391 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE |
441 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE |
392 |
442 |
632 else: |
682 else: |
633 return 'sendframes', { |
683 return 'sendframes', { |
634 'framegen': result, |
684 'framegen': result, |
635 } |
685 } |
636 |
686 |
|
687 def oncommandresponsereadygen(self, stream, requestid, gen): |
|
688 """Signal that a bytes response is ready, with data as a generator.""" |
|
689 ensureserverstream(stream) |
|
690 |
|
691 def sendframes(): |
|
692 for frame in createbytesresponseframesfromgen(stream, requestid, |
|
693 gen): |
|
694 yield frame |
|
695 |
|
696 self._activecommands.remove(requestid) |
|
697 |
|
698 return self._handlesendframes(sendframes()) |
|
699 |
637 def oninputeof(self): |
700 def oninputeof(self): |
638 """Signals that end of input has been received. |
701 """Signals that end of input has been received. |
639 |
702 |
640 No more frames will be received. All pending activity should be |
703 No more frames will be received. All pending activity should be |
641 completed. |
704 completed. |
653 |
716 |
654 return 'sendframes', { |
717 return 'sendframes', { |
655 'framegen': makegen(), |
718 'framegen': makegen(), |
656 } |
719 } |
657 |
720 |
|
721 def _handlesendframes(self, framegen): |
|
722 if self._deferoutput: |
|
723 self._bufferedframegens.append(framegen) |
|
724 return 'noop', {} |
|
725 else: |
|
726 return 'sendframes', { |
|
727 'framegen': framegen, |
|
728 } |
|
729 |
658 def onservererror(self, stream, requestid, msg): |
730 def onservererror(self, stream, requestid, msg): |
659 ensureserverstream(stream) |
731 ensureserverstream(stream) |
660 |
732 |
661 return 'sendframes', { |
733 def sendframes(): |
662 'framegen': createerrorframe(stream, requestid, msg, |
734 for frame in createerrorframe(stream, requestid, msg, |
663 errtype='server'), |
735 errtype='server'): |
664 } |
736 yield frame |
|
737 |
|
738 self._activecommands.remove(requestid) |
|
739 |
|
740 return self._handlesendframes(sendframes()) |
|
741 |
|
742 def oncommanderror(self, stream, requestid, message, args=None): |
|
743 """Called when a command encountered an error before sending output.""" |
|
744 ensureserverstream(stream) |
|
745 |
|
746 def sendframes(): |
|
747 for frame in createcommanderrorresponse(stream, requestid, message, |
|
748 args): |
|
749 yield frame |
|
750 |
|
751 self._activecommands.remove(requestid) |
|
752 |
|
753 return self._handlesendframes(sendframes()) |
665 |
754 |
666 def makeoutputstream(self): |
755 def makeoutputstream(self): |
667 """Create a stream to be used for sending data to the client.""" |
756 """Create a stream to be used for sending data to the client.""" |
668 streamid = self._nextoutgoingstreamid |
757 streamid = self._nextoutgoingstreamid |
669 self._nextoutgoingstreamid += 2 |
758 self._nextoutgoingstreamid += 2 |