mercurial/wireprotoframing.py
changeset 37058 c5e9c3b47366
parent 37057 2ec1fb9de638
child 37060 0a6c5cc09a88
--- a/mercurial/wireprotoframing.py	Wed Mar 14 16:51:34 2018 -0700
+++ b/mercurial/wireprotoframing.py	Wed Mar 14 16:53:30 2018 -0700
@@ -327,6 +327,23 @@
 
     noop
        Indicates no additional action is required.
+
+    Known Issues
+    ------------
+
+    There are no limits to the number of partially received commands or their
+    size. A malicious client could stream command request data and exhaust the
+    server's memory.
+
+    Partially received commands are not acted upon when end of input is
+    reached. Should the server error if it receives a partial request?
+    Should the client send a message to abort a partially transmitted request
+    to facilitate graceful shutdown?
+
+    Active requests that haven't been responded to aren't tracked. This means
+    that if we receive a command and instruct its dispatch, another command
+    with its request ID can come in over the wire and there will be a race
+    between who responds to what.
     """
 
     def __init__(self, deferoutput=False):
@@ -342,14 +359,8 @@
         self._deferoutput = deferoutput
         self._state = 'idle'
         self._bufferedframegens = []
-        self._activerequestid = None
-        self._activecommand = None
-        self._activeargs = None
-        self._activedata = None
-        self._expectingargs = None
-        self._expectingdata = None
-        self._activeargname = None
-        self._activeargchunks = None
+        # request id -> dict of commands that are actively being received.
+        self._receivingcommands = {}
 
     def onframerecv(self, requestid, frametype, frameflags, payload):
         """Process a frame that has been received off the wire.
@@ -359,8 +370,7 @@
         """
         handlers = {
             'idle': self._onframeidle,
-            'command-receiving-args': self._onframereceivingargs,
-            'command-receiving-data': self._onframereceivingdata,
+            'command-receiving': self._onframecommandreceiving,
             'errored': self._onframeerrored,
         }
 
@@ -391,6 +401,8 @@
         No more frames will be received. All pending activity should be
         completed.
         """
+        # TODO should we do anything about in-flight commands?
+
         if not self._deferoutput or not self._bufferedframegens:
             return 'noop', {}
 
@@ -414,12 +426,20 @@
             'message': msg,
         }
 
-    def _makeruncommandresult(self):
+    def _makeruncommandresult(self, requestid):
+        entry = self._receivingcommands[requestid]
+        del self._receivingcommands[requestid]
+
+        if self._receivingcommands:
+            self._state = 'command-receiving'
+        else:
+            self._state = 'idle'
+
         return 'runcommand', {
-            'requestid': self._activerequestid,
-            'command': self._activecommand,
-            'args': self._activeargs,
-            'data': self._activedata.getvalue() if self._activedata else None,
+            'requestid': requestid,
+            'command': entry['command'],
+            'args': entry['args'],
+            'data': entry['data'].getvalue() if entry['data'] else None,
         }
 
     def _makewantframeresult(self):
@@ -435,34 +455,76 @@
             return self._makeerrorresult(
                 _('expected command frame; got %d') % frametype)
 
-        self._activerequestid = requestid
-        self._activecommand = payload
-        self._activeargs = {}
-        self._activedata = None
+        if requestid in self._receivingcommands:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('request with ID %d already received') % requestid)
+
+        expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
+        expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+
+        self._receivingcommands[requestid] = {
+            'command': payload,
+            'args': {},
+            'data': None,
+            'expectingargs': expectingargs,
+            'expectingdata': expectingdata,
+        }
 
         if frameflags & FLAG_COMMAND_NAME_EOS:
-            return self._makeruncommandresult()
-
-        self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
-        self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+            return self._makeruncommandresult(requestid)
 
-        if self._expectingargs:
-            self._state = 'command-receiving-args'
-            return self._makewantframeresult()
-        elif self._expectingdata:
-            self._activedata = util.bytesio()
-            self._state = 'command-receiving-data'
+        if expectingargs or expectingdata:
+            self._state = 'command-receiving'
             return self._makewantframeresult()
         else:
             self._state = 'errored'
             return self._makeerrorresult(_('missing frame flags on '
                                            'command frame'))
 
-    def _onframereceivingargs(self, requestid, frametype, frameflags, payload):
-        if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
+    def _onframecommandreceiving(self, requestid, frametype, frameflags,
+                                 payload):
+        # It could be a new command request. Process it as such.
+        if frametype == FRAME_TYPE_COMMAND_NAME:
+            return self._onframeidle(requestid, frametype, frameflags, payload)
+
+        # All other frames should be related to a command that is currently
+        # receiving.
+        if requestid not in self._receivingcommands:
             self._state = 'errored'
-            return self._makeerrorresult(_('expected command argument '
-                                           'frame; got %d') % frametype)
+            return self._makeerrorresult(
+                _('received frame for request that is not receiving: %d') %
+                  requestid)
+
+        entry = self._receivingcommands[requestid]
+
+        if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
+            if not entry['expectingargs']:
+                self._state = 'errored'
+                return self._makeerrorresult(_(
+                    'received command argument frame for request that is not '
+                    'expecting arguments: %d') % requestid)
+
+            return self._handlecommandargsframe(requestid, entry, frametype,
+                                                frameflags, payload)
+
+        elif frametype == FRAME_TYPE_COMMAND_DATA:
+            if not entry['expectingdata']:
+                self._state = 'errored'
+                return self._makeerrorresult(_(
+                    'received command data frame for request that is not '
+                    'expecting data: %d') % requestid)
+
+            if entry['data'] is None:
+                entry['data'] = util.bytesio()
+
+            return self._handlecommanddataframe(requestid, entry, frametype,
+                                                frameflags, payload)
+
+    def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
+                                payload):
+        # The frame and state of command should have already been validated.
+        assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
 
         offset = 0
         namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
@@ -483,10 +545,6 @@
         # and wait for the next frame.
         if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
             raise error.ProgrammingError('not yet implemented')
-            self._activeargname = argname
-            self._activeargchunks = [argvalue]
-            self._state = 'command-arg-continuation'
-            return self._makewantframeresult()
 
         # Common case: the argument value is completely contained in this
         # frame.
@@ -496,36 +554,30 @@
             return self._makeerrorresult(_('malformed argument frame: '
                                            'partial argument value'))
 
-        self._activeargs[argname] = argvalue
+        entry['args'][argname] = argvalue
 
         if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
-            if self._expectingdata:
-                self._state = 'command-receiving-data'
-                self._activedata = util.bytesio()
+            if entry['expectingdata']:
                 # TODO signal request to run a command once we don't
                 # buffer data frames.
                 return self._makewantframeresult()
             else:
-                self._state = 'waiting'
-                return self._makeruncommandresult()
+                return self._makeruncommandresult(requestid)
         else:
             return self._makewantframeresult()
 
-    def _onframereceivingdata(self, requestid, frametype, frameflags, payload):
-        if frametype != FRAME_TYPE_COMMAND_DATA:
-            self._state = 'errored'
-            return self._makeerrorresult(_('expected command data frame; '
-                                           'got %d') % frametype)
+    def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
+                                payload):
+        assert frametype == FRAME_TYPE_COMMAND_DATA
 
         # TODO support streaming data instead of buffering it.
-        self._activedata.write(payload)
+        entry['data'].write(payload)
 
         if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
             return self._makewantframeresult()
         elif frameflags & FLAG_COMMAND_DATA_EOS:
-            self._activedata.seek(0)
-            self._state = 'idle'
-            return self._makeruncommandresult()
+            entry['data'].seek(0)
+            return self._makeruncommandresult(requestid)
         else:
             self._state = 'errored'
             return self._makeerrorresult(_('command data frame without '