diff mercurial/wireprotoframing.py @ 37728:564a3eec6e63

wireprotov2: add support for more response types This adds types to represent error and generator responses from server commands. Differential Revision: https://phab.mercurial-scm.org/D3388
author Gregory Szorc <gregory.szorc@gmail.com>
date Sun, 15 Apr 2018 10:37:29 -0700
parents 0c184ca594bb
children 36f487a332ad
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py	Sat Apr 14 15:38:11 2018 -0700
+++ b/mercurial/wireprotoframing.py	Sun Apr 15 10:37:29 2018 -0700
@@ -386,6 +386,56 @@
         if done:
             break
 
+def createbytesresponseframesfromgen(stream, requestid, gen,
+                                     maxframesize=DEFAULT_MAX_FRAME_SIZE):
+    overall = cbor.dumps({b'status': b'ok'}, canonical=True)
+
+    yield stream.makeframe(requestid=requestid,
+                           typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                           flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+                           payload=overall)
+
+    cb = util.chunkbuffer(gen)
+
+    flags = 0
+
+    while True:
+        chunk = cb.read(maxframesize)
+        if not chunk:
+            break
+
+        yield stream.makeframe(requestid=requestid,
+                               typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                               flags=flags,
+                               payload=chunk)
+
+        flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
+
+    flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
+    flags |= FLAG_COMMAND_RESPONSE_EOS
+    yield stream.makeframe(requestid=requestid,
+                           typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                           flags=flags,
+                           payload=b'')
+
+def createcommanderrorresponse(stream, requestid, message, args=None):
+    m = {
+        b'status': b'error',
+        b'error': {
+            b'message': message,
+        }
+    }
+
+    if args:
+        m[b'error'][b'args'] = args
+
+    overall = cbor.dumps(m, canonical=True)
+
+    yield stream.makeframe(requestid=requestid,
+                           typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                           flags=FLAG_COMMAND_RESPONSE_EOS,
+                           payload=overall)
+
 def createerrorframe(stream, requestid, msg, errtype):
     # TODO properly handle frame size limits.
     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -634,6 +684,19 @@
                 'framegen': result,
             }
 
+    def oncommandresponsereadygen(self, stream, requestid, gen):
+        """Signal that a bytes response is ready, with data as a generator."""
+        ensureserverstream(stream)
+
+        def sendframes():
+            for frame in createbytesresponseframesfromgen(stream, requestid,
+                                                          gen):
+                yield frame
+
+            self._activecommands.remove(requestid)
+
+        return self._handlesendframes(sendframes())
+
     def oninputeof(self):
         """Signals that end of input has been received.
 
@@ -655,13 +718,39 @@
             'framegen': makegen(),
         }
 
+    def _handlesendframes(self, framegen):
+        if self._deferoutput:
+            self._bufferedframegens.append(framegen)
+            return 'noop', {}
+        else:
+            return 'sendframes', {
+                'framegen': framegen,
+            }
+
     def onservererror(self, stream, requestid, msg):
         ensureserverstream(stream)
 
-        return 'sendframes', {
-            'framegen': createerrorframe(stream, requestid, msg,
-                                         errtype='server'),
-        }
+        def sendframes():
+            for frame in createerrorframe(stream, requestid, msg,
+                                          errtype='server'):
+                yield frame
+
+            self._activecommands.remove(requestid)
+
+        return self._handlesendframes(sendframes())
+
+    def oncommanderror(self, stream, requestid, message, args=None):
+        """Called when a command encountered an error before sending output."""
+        ensureserverstream(stream)
+
+        def sendframes():
+            for frame in createcommanderrorresponse(stream, requestid, message,
+                                                    args):
+                yield frame
+
+            self._activecommands.remove(requestid)
+
+        return self._handlesendframes(sendframes())
 
     def makeoutputstream(self):
         """Create a stream to be used for sending data to the client."""