--- a/mercurial/wireprotoframing.py Sat Apr 14 15:38:11 2018 -0700
+++ b/mercurial/wireprotoframing.py Sun Apr 15 10:37:29 2018 -0700
@@ -386,6 +386,56 @@
if done:
break
+def createbytesresponseframesfromgen(stream, requestid, gen,
+ maxframesize=DEFAULT_MAX_FRAME_SIZE):
+ overall = cbor.dumps({b'status': b'ok'}, canonical=True)
+
+ yield stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+ payload=overall)
+
+ cb = util.chunkbuffer(gen)
+
+ flags = 0
+
+ while True:
+ chunk = cb.read(maxframesize)
+ if not chunk:
+ break
+
+ yield stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=flags,
+ payload=chunk)
+
+ flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
+
+ flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
+ flags |= FLAG_COMMAND_RESPONSE_EOS
+ yield stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=flags,
+ payload=b'')
+
+def createcommanderrorresponse(stream, requestid, message, args=None):
+ m = {
+ b'status': b'error',
+ b'error': {
+ b'message': message,
+ }
+ }
+
+ if args:
+ m[b'error'][b'args'] = args
+
+ overall = cbor.dumps(m, canonical=True)
+
+ yield stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_EOS,
+ payload=overall)
+
def createerrorframe(stream, requestid, msg, errtype):
# TODO properly handle frame size limits.
assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -634,6 +684,19 @@
'framegen': result,
}
+ def oncommandresponsereadygen(self, stream, requestid, gen):
+ """Signal that a bytes response is ready, with data as a generator."""
+ ensureserverstream(stream)
+
+ def sendframes():
+ for frame in createbytesresponseframesfromgen(stream, requestid,
+ gen):
+ yield frame
+
+ self._activecommands.remove(requestid)
+
+ return self._handlesendframes(sendframes())
+
def oninputeof(self):
"""Signals that end of input has been received.
@@ -655,13 +718,39 @@
'framegen': makegen(),
}
+ def _handlesendframes(self, framegen):
+ if self._deferoutput:
+ self._bufferedframegens.append(framegen)
+ return 'noop', {}
+ else:
+ return 'sendframes', {
+ 'framegen': framegen,
+ }
+
def onservererror(self, stream, requestid, msg):
ensureserverstream(stream)
- return 'sendframes', {
- 'framegen': createerrorframe(stream, requestid, msg,
- errtype='server'),
- }
+ def sendframes():
+ for frame in createerrorframe(stream, requestid, msg,
+ errtype='server'):
+ yield frame
+
+ self._activecommands.remove(requestid)
+
+ return self._handlesendframes(sendframes())
+
+ def oncommanderror(self, stream, requestid, message, args=None):
+ """Called when a command encountered an error before sending output."""
+ ensureserverstream(stream)
+
+ def sendframes():
+ for frame in createcommanderrorresponse(stream, requestid, message,
+ args):
+ yield frame
+
+ self._activecommands.remove(requestid)
+
+ return self._handlesendframes(sendframes())
def makeoutputstream(self):
"""Create a stream to be used for sending data to the client."""