mercurial/wireprotoframing.py
changeset 37058 c5e9c3b47366
parent 37057 2ec1fb9de638
child 37060 0a6c5cc09a88
equal deleted inserted replaced
37057:2ec1fb9de638 37058:c5e9c3b47366
   325        Indicates that nothing of interest happened and the server is waiting on
   325        Indicates that nothing of interest happened and the server is waiting on
   326        more frames from the client before anything interesting can be done.
   326        more frames from the client before anything interesting can be done.
   327 
   327 
   328     noop
   328     noop
   329        Indicates no additional action is required.
   329        Indicates no additional action is required.
       
   330 
       
   331     Known Issues
       
   332     ------------
       
   333 
       
   334     There are no limits to the number of partially received commands or their
       
   335     size. A malicious client could stream command request data and exhaust the
       
   336     server's memory.
       
   337 
       
   338     Partially received commands are not acted upon when end of input is
       
   339     reached. Should the server error if it receives a partial request?
       
   340     Should the client send a message to abort a partially transmitted request
       
   341     to facilitate graceful shutdown?
       
   342 
       
   343     Active requests that haven't been responded to aren't tracked. This means
       
   344     that if we receive a command and instruct its dispatch, another command
       
   345     with its request ID can come in over the wire and there will be a race
       
   346     between who responds to what.
   330     """
   347     """
   331 
   348 
   332     def __init__(self, deferoutput=False):
   349     def __init__(self, deferoutput=False):
   333         """Construct a new server reactor.
   350         """Construct a new server reactor.
   334 
   351 
   340         sender cannot receive until all data has been transmitted.
   357         sender cannot receive until all data has been transmitted.
   341         """
   358         """
   342         self._deferoutput = deferoutput
   359         self._deferoutput = deferoutput
   343         self._state = 'idle'
   360         self._state = 'idle'
   344         self._bufferedframegens = []
   361         self._bufferedframegens = []
   345         self._activerequestid = None
   362         # request id -> dict of commands that are actively being received.
   346         self._activecommand = None
   363         self._receivingcommands = {}
   347         self._activeargs = None
       
   348         self._activedata = None
       
   349         self._expectingargs = None
       
   350         self._expectingdata = None
       
   351         self._activeargname = None
       
   352         self._activeargchunks = None
       
   353 
   364 
   354     def onframerecv(self, requestid, frametype, frameflags, payload):
   365     def onframerecv(self, requestid, frametype, frameflags, payload):
   355         """Process a frame that has been received off the wire.
   366         """Process a frame that has been received off the wire.
   356 
   367 
   357         Returns a dict with an ``action`` key that details what action,
   368         Returns a dict with an ``action`` key that details what action,
   358         if any, the consumer should take next.
   369         if any, the consumer should take next.
   359         """
   370         """
   360         handlers = {
   371         handlers = {
   361             'idle': self._onframeidle,
   372             'idle': self._onframeidle,
   362             'command-receiving-args': self._onframereceivingargs,
   373             'command-receiving': self._onframecommandreceiving,
   363             'command-receiving-data': self._onframereceivingdata,
       
   364             'errored': self._onframeerrored,
   374             'errored': self._onframeerrored,
   365         }
   375         }
   366 
   376 
   367         meth = handlers.get(self._state)
   377         meth = handlers.get(self._state)
   368         if not meth:
   378         if not meth:
   389         """Signals that end of input has been received.
   399         """Signals that end of input has been received.
   390 
   400 
   391         No more frames will be received. All pending activity should be
   401         No more frames will be received. All pending activity should be
   392         completed.
   402         completed.
   393         """
   403         """
       
   404         # TODO should we do anything about in-flight commands?
       
   405 
   394         if not self._deferoutput or not self._bufferedframegens:
   406         if not self._deferoutput or not self._bufferedframegens:
   395             return 'noop', {}
   407             return 'noop', {}
   396 
   408 
   397         # If we buffered all our responses, emit those.
   409         # If we buffered all our responses, emit those.
   398         def makegen():
   410         def makegen():
   412     def _makeerrorresult(self, msg):
   424     def _makeerrorresult(self, msg):
   413         return 'error', {
   425         return 'error', {
   414             'message': msg,
   426             'message': msg,
   415         }
   427         }
   416 
   428 
   417     def _makeruncommandresult(self):
   429     def _makeruncommandresult(self, requestid):
       
   430         entry = self._receivingcommands[requestid]
       
   431         del self._receivingcommands[requestid]
       
   432 
       
   433         if self._receivingcommands:
       
   434             self._state = 'command-receiving'
       
   435         else:
       
   436             self._state = 'idle'
       
   437 
   418         return 'runcommand', {
   438         return 'runcommand', {
   419             'requestid': self._activerequestid,
   439             'requestid': requestid,
   420             'command': self._activecommand,
   440             'command': entry['command'],
   421             'args': self._activeargs,
   441             'args': entry['args'],
   422             'data': self._activedata.getvalue() if self._activedata else None,
   442             'data': entry['data'].getvalue() if entry['data'] else None,
   423         }
   443         }
   424 
   444 
   425     def _makewantframeresult(self):
   445     def _makewantframeresult(self):
   426         return 'wantframe', {
   446         return 'wantframe', {
   427             'state': self._state,
   447             'state': self._state,
   433         if frametype != FRAME_TYPE_COMMAND_NAME:
   453         if frametype != FRAME_TYPE_COMMAND_NAME:
   434             self._state = 'errored'
   454             self._state = 'errored'
   435             return self._makeerrorresult(
   455             return self._makeerrorresult(
   436                 _('expected command frame; got %d') % frametype)
   456                 _('expected command frame; got %d') % frametype)
   437 
   457 
   438         self._activerequestid = requestid
   458         if requestid in self._receivingcommands:
   439         self._activecommand = payload
   459             self._state = 'errored'
   440         self._activeargs = {}
   460             return self._makeerrorresult(
   441         self._activedata = None
   461                 _('request with ID %d already received') % requestid)
       
   462 
       
   463         expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
       
   464         expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
       
   465 
       
   466         self._receivingcommands[requestid] = {
       
   467             'command': payload,
       
   468             'args': {},
       
   469             'data': None,
       
   470             'expectingargs': expectingargs,
       
   471             'expectingdata': expectingdata,
       
   472         }
   442 
   473 
   443         if frameflags & FLAG_COMMAND_NAME_EOS:
   474         if frameflags & FLAG_COMMAND_NAME_EOS:
   444             return self._makeruncommandresult()
   475             return self._makeruncommandresult(requestid)
   445 
   476 
   446         self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
   477         if expectingargs or expectingdata:
   447         self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
   478             self._state = 'command-receiving'
   448 
       
   449         if self._expectingargs:
       
   450             self._state = 'command-receiving-args'
       
   451             return self._makewantframeresult()
       
   452         elif self._expectingdata:
       
   453             self._activedata = util.bytesio()
       
   454             self._state = 'command-receiving-data'
       
   455             return self._makewantframeresult()
   479             return self._makewantframeresult()
   456         else:
   480         else:
   457             self._state = 'errored'
   481             self._state = 'errored'
   458             return self._makeerrorresult(_('missing frame flags on '
   482             return self._makeerrorresult(_('missing frame flags on '
   459                                            'command frame'))
   483                                            'command frame'))
   460 
   484 
   461     def _onframereceivingargs(self, requestid, frametype, frameflags, payload):
   485     def _onframecommandreceiving(self, requestid, frametype, frameflags,
   462         if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
   486                                  payload):
   463             self._state = 'errored'
   487         # It could be a new command request. Process it as such.
   464             return self._makeerrorresult(_('expected command argument '
   488         if frametype == FRAME_TYPE_COMMAND_NAME:
   465                                            'frame; got %d') % frametype)
   489             return self._onframeidle(requestid, frametype, frameflags, payload)
       
   490 
       
   491         # All other frames should be related to a command that is currently
       
   492         # receiving.
       
   493         if requestid not in self._receivingcommands:
       
   494             self._state = 'errored'
       
   495             return self._makeerrorresult(
       
   496                 _('received frame for request that is not receiving: %d') %
       
   497                   requestid)
       
   498 
       
   499         entry = self._receivingcommands[requestid]
       
   500 
       
   501         if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
       
   502             if not entry['expectingargs']:
       
   503                 self._state = 'errored'
       
   504                 return self._makeerrorresult(_(
       
   505                     'received command argument frame for request that is not '
       
   506                     'expecting arguments: %d') % requestid)
       
   507 
       
   508             return self._handlecommandargsframe(requestid, entry, frametype,
       
   509                                                 frameflags, payload)
       
   510 
       
   511         elif frametype == FRAME_TYPE_COMMAND_DATA:
       
   512             if not entry['expectingdata']:
       
   513                 self._state = 'errored'
       
   514                 return self._makeerrorresult(_(
       
   515                     'received command data frame for request that is not '
       
   516                     'expecting data: %d') % requestid)
       
   517 
       
   518             if entry['data'] is None:
       
   519                 entry['data'] = util.bytesio()
       
   520 
       
   521             return self._handlecommanddataframe(requestid, entry, frametype,
       
   522                                                 frameflags, payload)
       
   523 
       
   524     def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
       
   525                                 payload):
       
   526         # The frame and state of command should have already been validated.
       
   527         assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
   466 
   528 
   467         offset = 0
   529         offset = 0
   468         namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
   530         namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
   469         offset += ARGUMENT_FRAME_HEADER.size
   531         offset += ARGUMENT_FRAME_HEADER.size
   470 
   532 
   481 
   543 
   482         # Argument value spans multiple frames. Record our active state
   544         # Argument value spans multiple frames. Record our active state
   483         # and wait for the next frame.
   545         # and wait for the next frame.
   484         if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
   546         if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
   485             raise error.ProgrammingError('not yet implemented')
   547             raise error.ProgrammingError('not yet implemented')
   486             self._activeargname = argname
       
   487             self._activeargchunks = [argvalue]
       
   488             self._state = 'command-arg-continuation'
       
   489             return self._makewantframeresult()
       
   490 
   548 
   491         # Common case: the argument value is completely contained in this
   549         # Common case: the argument value is completely contained in this
   492         # frame.
   550         # frame.
   493 
   551 
   494         if len(argvalue) != valuesize:
   552         if len(argvalue) != valuesize:
   495             self._state = 'errored'
   553             self._state = 'errored'
   496             return self._makeerrorresult(_('malformed argument frame: '
   554             return self._makeerrorresult(_('malformed argument frame: '
   497                                            'partial argument value'))
   555                                            'partial argument value'))
   498 
   556 
   499         self._activeargs[argname] = argvalue
   557         entry['args'][argname] = argvalue
   500 
   558 
   501         if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
   559         if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
   502             if self._expectingdata:
   560             if entry['expectingdata']:
   503                 self._state = 'command-receiving-data'
       
   504                 self._activedata = util.bytesio()
       
   505                 # TODO signal request to run a command once we don't
   561                 # TODO signal request to run a command once we don't
   506                 # buffer data frames.
   562                 # buffer data frames.
   507                 return self._makewantframeresult()
   563                 return self._makewantframeresult()
   508             else:
   564             else:
   509                 self._state = 'waiting'
   565                 return self._makeruncommandresult(requestid)
   510                 return self._makeruncommandresult()
       
   511         else:
   566         else:
   512             return self._makewantframeresult()
   567             return self._makewantframeresult()
   513 
   568 
   514     def _onframereceivingdata(self, requestid, frametype, frameflags, payload):
   569     def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
   515         if frametype != FRAME_TYPE_COMMAND_DATA:
   570                                 payload):
   516             self._state = 'errored'
   571         assert frametype == FRAME_TYPE_COMMAND_DATA
   517             return self._makeerrorresult(_('expected command data frame; '
       
   518                                            'got %d') % frametype)
       
   519 
   572 
   520         # TODO support streaming data instead of buffering it.
   573         # TODO support streaming data instead of buffering it.
   521         self._activedata.write(payload)
   574         entry['data'].write(payload)
   522 
   575 
   523         if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
   576         if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
   524             return self._makewantframeresult()
   577             return self._makewantframeresult()
   525         elif frameflags & FLAG_COMMAND_DATA_EOS:
   578         elif frameflags & FLAG_COMMAND_DATA_EOS:
   526             self._activedata.seek(0)
   579             entry['data'].seek(0)
   527             self._state = 'idle'
   580             return self._makeruncommandresult(requestid)
   528             return self._makeruncommandresult()
       
   529         else:
   581         else:
   530             self._state = 'errored'
   582             self._state = 'errored'
   531             return self._makeerrorresult(_('command data frame without '
   583             return self._makeerrorresult(_('command data frame without '
   532                                            'flags'))
   584                                            'flags'))
   533 
   585