mercurial/wireprotoframing.py
changeset 37057 2ec1fb9de638
parent 37056 861e9d37e56e
child 37058 c5e9c3b47366
equal deleted inserted replaced
37056:861e9d37e56e 37057:2ec1fb9de638
    17 from . import (
    17 from . import (
    18     error,
    18     error,
    19     util,
    19     util,
    20 )
    20 )
    21 
    21 
    22 FRAME_HEADER_SIZE = 4
    22 FRAME_HEADER_SIZE = 6
    23 DEFAULT_MAX_FRAME_SIZE = 32768
    23 DEFAULT_MAX_FRAME_SIZE = 32768
    24 
    24 
    25 FRAME_TYPE_COMMAND_NAME = 0x01
    25 FRAME_TYPE_COMMAND_NAME = 0x01
    26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
    26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
    27 FRAME_TYPE_COMMAND_DATA = 0x03
    27 FRAME_TYPE_COMMAND_DATA = 0x03
    87     FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
    87     FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
    88 }
    88 }
    89 
    89 
    90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
    90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
    91 
    91 
    92 def makeframe(frametype, frameflags, payload):
    92 def makeframe(requestid, frametype, frameflags, payload):
    93     """Assemble a frame into a byte array."""
    93     """Assemble a frame into a byte array."""
    94     # TODO assert size of payload.
    94     # TODO assert size of payload.
    95     frame = bytearray(FRAME_HEADER_SIZE + len(payload))
    95     frame = bytearray(FRAME_HEADER_SIZE + len(payload))
    96 
    96 
       
    97     # 24 bits length
       
    98     # 16 bits request id
       
    99     # 4 bits type
       
   100     # 4 bits flags
       
   101 
    97     l = struct.pack(r'<I', len(payload))
   102     l = struct.pack(r'<I', len(payload))
    98     frame[0:3] = l[0:3]
   103     frame[0:3] = l[0:3]
    99     frame[3] = (frametype << 4) | frameflags
   104     struct.pack_into(r'<H', frame, 3, requestid)
   100     frame[4:] = payload
   105     frame[5] = (frametype << 4) | frameflags
       
   106     frame[6:] = payload
   101 
   107 
   102     return frame
   108     return frame
   103 
   109 
   104 def makeframefromhumanstring(s):
   110 def makeframefromhumanstring(s):
   105     """Given a string of the form: <type> <flags> <payload>, creates a frame.
   111     """Create a frame from a human readable string
       
   112 
       
   113     Strings have the form:
       
   114 
       
   115         <request-id> <type> <flags> <payload>
   106 
   116 
   107     This can be used by user-facing applications and tests for creating
   117     This can be used by user-facing applications and tests for creating
   108     frames easily without having to type out a bunch of constants.
   118     frames easily without having to type out a bunch of constants.
   109 
   119 
       
   120     Request ID is an integer.
       
   121 
   110     Frame type and flags can be specified by integer or named constant.
   122     Frame type and flags can be specified by integer or named constant.
       
   123 
   111     Flags can be delimited by `|` to bitwise OR them together.
   124     Flags can be delimited by `|` to bitwise OR them together.
   112     """
   125     """
   113     frametype, frameflags, payload = s.split(b' ', 2)
   126     requestid, frametype, frameflags, payload = s.split(b' ', 3)
       
   127 
       
   128     requestid = int(requestid)
   114 
   129 
   115     if frametype in FRAME_TYPES:
   130     if frametype in FRAME_TYPES:
   116         frametype = FRAME_TYPES[frametype]
   131         frametype = FRAME_TYPES[frametype]
   117     else:
   132     else:
   118         frametype = int(frametype)
   133         frametype = int(frametype)
   125         else:
   140         else:
   126             finalflags |= int(flag)
   141             finalflags |= int(flag)
   127 
   142 
   128     payload = util.unescapestr(payload)
   143     payload = util.unescapestr(payload)
   129 
   144 
   130     return makeframe(frametype, finalflags, payload)
   145     return makeframe(requestid, frametype, finalflags, payload)
   131 
   146 
   132 def parseheader(data):
   147 def parseheader(data):
   133     """Parse a unified framing protocol frame header from a buffer.
   148     """Parse a unified framing protocol frame header from a buffer.
   134 
   149 
   135     The header is expected to be in the buffer at offset 0 and the
   150     The header is expected to be in the buffer at offset 0 and the
   138     # 24 bits payload length (little endian)
   153     # 24 bits payload length (little endian)
   139     # 4 bits frame type
   154     # 4 bits frame type
   140     # 4 bits frame flags
   155     # 4 bits frame flags
   141     # ... payload
   156     # ... payload
   142     framelength = data[0] + 256 * data[1] + 16384 * data[2]
   157     framelength = data[0] + 256 * data[1] + 16384 * data[2]
   143     typeflags = data[3]
   158     requestid = struct.unpack_from(r'<H', data, 3)[0]
       
   159     typeflags = data[5]
   144 
   160 
   145     frametype = (typeflags & 0xf0) >> 4
   161     frametype = (typeflags & 0xf0) >> 4
   146     frameflags = typeflags & 0x0f
   162     frameflags = typeflags & 0x0f
   147 
   163 
   148     return frametype, frameflags, framelength
   164     return requestid, frametype, frameflags, framelength
   149 
   165 
   150 def readframe(fh):
   166 def readframe(fh):
   151     """Read a unified framing protocol frame from a file object.
   167     """Read a unified framing protocol frame from a file object.
   152 
   168 
   153     Returns a 3-tuple of (type, flags, payload) for the decoded frame or
   169     Returns a 3-tuple of (type, flags, payload) for the decoded frame or
   163 
   179 
   164     if readcount != FRAME_HEADER_SIZE:
   180     if readcount != FRAME_HEADER_SIZE:
   165         raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
   181         raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
   166                           (readcount, header))
   182                           (readcount, header))
   167 
   183 
   168     frametype, frameflags, framelength = parseheader(header)
   184     requestid, frametype, frameflags, framelength = parseheader(header)
   169 
   185 
   170     payload = fh.read(framelength)
   186     payload = fh.read(framelength)
   171     if len(payload) != framelength:
   187     if len(payload) != framelength:
   172         raise error.Abort(_('frame length error: expected %d; got %d') %
   188         raise error.Abort(_('frame length error: expected %d; got %d') %
   173                           (framelength, len(payload)))
   189                           (framelength, len(payload)))
   174 
   190 
   175     return frametype, frameflags, payload
   191     return requestid, frametype, frameflags, payload
   176 
   192 
   177 def createcommandframes(cmd, args, datafh=None):
   193 def createcommandframes(requestid, cmd, args, datafh=None):
   178     """Create frames necessary to transmit a request to run a command.
   194     """Create frames necessary to transmit a request to run a command.
   179 
   195 
   180     This is a generator of bytearrays. Each item represents a frame
   196     This is a generator of bytearrays. Each item represents a frame
   181     ready to be sent over the wire to a peer.
   197     ready to be sent over the wire to a peer.
   182     """
   198     """
   187         flags |= FLAG_COMMAND_NAME_HAVE_DATA
   203         flags |= FLAG_COMMAND_NAME_HAVE_DATA
   188 
   204 
   189     if not flags:
   205     if not flags:
   190         flags |= FLAG_COMMAND_NAME_EOS
   206         flags |= FLAG_COMMAND_NAME_EOS
   191 
   207 
   192     yield makeframe(FRAME_TYPE_COMMAND_NAME, flags, cmd)
   208     yield makeframe(requestid, FRAME_TYPE_COMMAND_NAME, flags, cmd)
   193 
   209 
   194     for i, k in enumerate(sorted(args)):
   210     for i, k in enumerate(sorted(args)):
   195         v = args[k]
   211         v = args[k]
   196         last = i == len(args) - 1
   212         last = i == len(args) - 1
   197 
   213 
   203         payload[offset:offset + len(k)] = k
   219         payload[offset:offset + len(k)] = k
   204         offset += len(k)
   220         offset += len(k)
   205         payload[offset:offset + len(v)] = v
   221         payload[offset:offset + len(v)] = v
   206 
   222 
   207         flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
   223         flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
   208         yield makeframe(FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
   224         yield makeframe(requestid, FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
   209 
   225 
   210     if datafh:
   226     if datafh:
   211         while True:
   227         while True:
   212             data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
   228             data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
   213 
   229 
   217             else:
   233             else:
   218                 flags = FLAG_COMMAND_DATA_EOS
   234                 flags = FLAG_COMMAND_DATA_EOS
   219                 assert datafh.read(1) == b''
   235                 assert datafh.read(1) == b''
   220                 done = True
   236                 done = True
   221 
   237 
   222             yield makeframe(FRAME_TYPE_COMMAND_DATA, flags, data)
   238             yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data)
   223 
   239 
   224             if done:
   240             if done:
   225                 break
   241                 break
   226 
   242 
   227 def createbytesresponseframesfrombytes(data,
   243 def createbytesresponseframesfrombytes(requestid, data,
   228                                        maxframesize=DEFAULT_MAX_FRAME_SIZE):
   244                                        maxframesize=DEFAULT_MAX_FRAME_SIZE):
   229     """Create a raw frame to send a bytes response from static bytes input.
   245     """Create a raw frame to send a bytes response from static bytes input.
   230 
   246 
   231     Returns a generator of bytearrays.
   247     Returns a generator of bytearrays.
   232     """
   248     """
   233 
   249 
   234     # Simple case of a single frame.
   250     # Simple case of a single frame.
   235     if len(data) <= maxframesize:
   251     if len(data) <= maxframesize:
   236         yield makeframe(FRAME_TYPE_BYTES_RESPONSE,
   252         yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE,
   237                         FLAG_BYTES_RESPONSE_EOS, data)
   253                         FLAG_BYTES_RESPONSE_EOS, data)
   238         return
   254         return
   239 
   255 
   240     offset = 0
   256     offset = 0
   241     while True:
   257     while True:
   246         if done:
   262         if done:
   247             flags = FLAG_BYTES_RESPONSE_EOS
   263             flags = FLAG_BYTES_RESPONSE_EOS
   248         else:
   264         else:
   249             flags = FLAG_BYTES_RESPONSE_CONTINUATION
   265             flags = FLAG_BYTES_RESPONSE_CONTINUATION
   250 
   266 
   251         yield makeframe(FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
   267         yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
   252 
   268 
   253         if done:
   269         if done:
   254             break
   270             break
   255 
   271 
   256 def createerrorframe(msg, protocol=False, application=False):
   272 def createerrorframe(requestid, msg, protocol=False, application=False):
   257     # TODO properly handle frame size limits.
   273     # TODO properly handle frame size limits.
   258     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
   274     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
   259 
   275 
   260     flags = 0
   276     flags = 0
   261     if protocol:
   277     if protocol:
   262         flags |= FLAG_ERROR_RESPONSE_PROTOCOL
   278         flags |= FLAG_ERROR_RESPONSE_PROTOCOL
   263     if application:
   279     if application:
   264         flags |= FLAG_ERROR_RESPONSE_APPLICATION
   280         flags |= FLAG_ERROR_RESPONSE_APPLICATION
   265 
   281 
   266     yield makeframe(FRAME_TYPE_ERROR_RESPONSE, flags, msg)
   282     yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg)
   267 
   283 
   268 class serverreactor(object):
   284 class serverreactor(object):
   269     """Holds state of a server handling frame-based protocol requests.
   285     """Holds state of a server handling frame-based protocol requests.
   270 
   286 
   271     This class is the "brain" of the unified frame-based protocol server
   287     This class is the "brain" of the unified frame-based protocol server
   324         sender cannot receive until all data has been transmitted.
   340         sender cannot receive until all data has been transmitted.
   325         """
   341         """
   326         self._deferoutput = deferoutput
   342         self._deferoutput = deferoutput
   327         self._state = 'idle'
   343         self._state = 'idle'
   328         self._bufferedframegens = []
   344         self._bufferedframegens = []
       
   345         self._activerequestid = None
   329         self._activecommand = None
   346         self._activecommand = None
   330         self._activeargs = None
   347         self._activeargs = None
   331         self._activedata = None
   348         self._activedata = None
   332         self._expectingargs = None
   349         self._expectingargs = None
   333         self._expectingdata = None
   350         self._expectingdata = None
   334         self._activeargname = None
   351         self._activeargname = None
   335         self._activeargchunks = None
   352         self._activeargchunks = None
   336 
   353 
   337     def onframerecv(self, frametype, frameflags, payload):
   354     def onframerecv(self, requestid, frametype, frameflags, payload):
   338         """Process a frame that has been received off the wire.
   355         """Process a frame that has been received off the wire.
   339 
   356 
   340         Returns a dict with an ``action`` key that details what action,
   357         Returns a dict with an ``action`` key that details what action,
   341         if any, the consumer should take next.
   358         if any, the consumer should take next.
   342         """
   359         """
   349 
   366 
   350         meth = handlers.get(self._state)
   367         meth = handlers.get(self._state)
   351         if not meth:
   368         if not meth:
   352             raise error.ProgrammingError('unhandled state: %s' % self._state)
   369             raise error.ProgrammingError('unhandled state: %s' % self._state)
   353 
   370 
   354         return meth(frametype, frameflags, payload)
   371         return meth(requestid, frametype, frameflags, payload)
   355 
   372 
   356     def onbytesresponseready(self, data):
   373     def onbytesresponseready(self, requestid, data):
   357         """Signal that a bytes response is ready to be sent to the client.
   374         """Signal that a bytes response is ready to be sent to the client.
   358 
   375 
   359         The raw bytes response is passed as an argument.
   376         The raw bytes response is passed as an argument.
   360         """
   377         """
   361         framegen = createbytesresponseframesfrombytes(data)
   378         framegen = createbytesresponseframesfrombytes(requestid, data)
   362 
   379 
   363         if self._deferoutput:
   380         if self._deferoutput:
   364             self._bufferedframegens.append(framegen)
   381             self._bufferedframegens.append(framegen)
   365             return 'noop', {}
   382             return 'noop', {}
   366         else:
   383         else:
   385 
   402 
   386         return 'sendframes', {
   403         return 'sendframes', {
   387             'framegen': makegen(),
   404             'framegen': makegen(),
   388         }
   405         }
   389 
   406 
   390     def onapplicationerror(self, msg):
   407     def onapplicationerror(self, requestid, msg):
   391         return 'sendframes', {
   408         return 'sendframes', {
   392             'framegen': createerrorframe(msg, application=True),
   409             'framegen': createerrorframe(requestid, msg, application=True),
   393         }
   410         }
   394 
   411 
   395     def _makeerrorresult(self, msg):
   412     def _makeerrorresult(self, msg):
   396         return 'error', {
   413         return 'error', {
   397             'message': msg,
   414             'message': msg,
   398         }
   415         }
   399 
   416 
   400     def _makeruncommandresult(self):
   417     def _makeruncommandresult(self):
   401         return 'runcommand', {
   418         return 'runcommand', {
       
   419             'requestid': self._activerequestid,
   402             'command': self._activecommand,
   420             'command': self._activecommand,
   403             'args': self._activeargs,
   421             'args': self._activeargs,
   404             'data': self._activedata.getvalue() if self._activedata else None,
   422             'data': self._activedata.getvalue() if self._activedata else None,
   405         }
   423         }
   406 
   424 
   407     def _makewantframeresult(self):
   425     def _makewantframeresult(self):
   408         return 'wantframe', {
   426         return 'wantframe', {
   409             'state': self._state,
   427             'state': self._state,
   410         }
   428         }
   411 
   429 
   412     def _onframeidle(self, frametype, frameflags, payload):
   430     def _onframeidle(self, requestid, frametype, frameflags, payload):
   413         # The only frame type that should be received in this state is a
   431         # The only frame type that should be received in this state is a
   414         # command request.
   432         # command request.
   415         if frametype != FRAME_TYPE_COMMAND_NAME:
   433         if frametype != FRAME_TYPE_COMMAND_NAME:
   416             self._state = 'errored'
   434             self._state = 'errored'
   417             return self._makeerrorresult(
   435             return self._makeerrorresult(
   418                 _('expected command frame; got %d') % frametype)
   436                 _('expected command frame; got %d') % frametype)
   419 
   437 
       
   438         self._activerequestid = requestid
   420         self._activecommand = payload
   439         self._activecommand = payload
   421         self._activeargs = {}
   440         self._activeargs = {}
   422         self._activedata = None
   441         self._activedata = None
   423 
   442 
   424         if frameflags & FLAG_COMMAND_NAME_EOS:
   443         if frameflags & FLAG_COMMAND_NAME_EOS:
   437         else:
   456         else:
   438             self._state = 'errored'
   457             self._state = 'errored'
   439             return self._makeerrorresult(_('missing frame flags on '
   458             return self._makeerrorresult(_('missing frame flags on '
   440                                            'command frame'))
   459                                            'command frame'))
   441 
   460 
   442     def _onframereceivingargs(self, frametype, frameflags, payload):
   461     def _onframereceivingargs(self, requestid, frametype, frameflags, payload):
   443         if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
   462         if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
   444             self._state = 'errored'
   463             self._state = 'errored'
   445             return self._makeerrorresult(_('expected command argument '
   464             return self._makeerrorresult(_('expected command argument '
   446                                            'frame; got %d') % frametype)
   465                                            'frame; got %d') % frametype)
   447 
   466 
   490                 self._state = 'waiting'
   509                 self._state = 'waiting'
   491                 return self._makeruncommandresult()
   510                 return self._makeruncommandresult()
   492         else:
   511         else:
   493             return self._makewantframeresult()
   512             return self._makewantframeresult()
   494 
   513 
   495     def _onframereceivingdata(self, frametype, frameflags, payload):
   514     def _onframereceivingdata(self, requestid, frametype, frameflags, payload):
   496         if frametype != FRAME_TYPE_COMMAND_DATA:
   515         if frametype != FRAME_TYPE_COMMAND_DATA:
   497             self._state = 'errored'
   516             self._state = 'errored'
   498             return self._makeerrorresult(_('expected command data frame; '
   517             return self._makeerrorresult(_('expected command data frame; '
   499                                            'got %d') % frametype)
   518                                            'got %d') % frametype)
   500 
   519 
   510         else:
   529         else:
   511             self._state = 'errored'
   530             self._state = 'errored'
   512             return self._makeerrorresult(_('command data frame without '
   531             return self._makeerrorresult(_('command data frame without '
   513                                            'flags'))
   532                                            'flags'))
   514 
   533 
   515     def _onframeerrored(self, frametype, frameflags, payload):
   534     def _onframeerrored(self, requestid, frametype, frameflags, payload):
   516         return self._makeerrorresult(_('server already errored'))
   535         return self._makeerrorresult(_('server already errored'))