diff -r 3ed344546d9e -r 9bfcbe4f4745 mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py Mon Mar 26 13:57:22 2018 -0700 +++ b/mercurial/wireprotoframing.py Mon Mar 26 11:00:16 2018 -0700 @@ -25,15 +25,26 @@ stringutil, ) -FRAME_HEADER_SIZE = 6 +FRAME_HEADER_SIZE = 8 DEFAULT_MAX_FRAME_SIZE = 32768 +STREAM_FLAG_BEGIN_STREAM = 0x01 +STREAM_FLAG_END_STREAM = 0x02 +STREAM_FLAG_ENCODING_APPLIED = 0x04 + +STREAM_FLAGS = { + b'stream-begin': STREAM_FLAG_BEGIN_STREAM, + b'stream-end': STREAM_FLAG_END_STREAM, + b'encoded': STREAM_FLAG_ENCODING_APPLIED, +} + FRAME_TYPE_COMMAND_NAME = 0x01 FRAME_TYPE_COMMAND_ARGUMENT = 0x02 FRAME_TYPE_COMMAND_DATA = 0x03 FRAME_TYPE_BYTES_RESPONSE = 0x04 FRAME_TYPE_ERROR_RESPONSE = 0x05 FRAME_TYPE_TEXT_OUTPUT = 0x06 +FRAME_TYPE_STREAM_SETTINGS = 0x08 FRAME_TYPES = { b'command-name': FRAME_TYPE_COMMAND_NAME, @@ -42,6 +53,7 @@ b'bytes-response': FRAME_TYPE_BYTES_RESPONSE, b'error-response': FRAME_TYPE_ERROR_RESPONSE, b'text-output': FRAME_TYPE_TEXT_OUTPUT, + b'stream-settings': FRAME_TYPE_STREAM_SETTINGS, } FLAG_COMMAND_NAME_EOS = 0x01 @@ -94,6 +106,7 @@ FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE, FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE, FRAME_TYPE_TEXT_OUTPUT: {}, + FRAME_TYPE_STREAM_SETTINGS: {}, } ARGUMENT_FRAME_HEADER = struct.Struct(r' + This can be used by user-facing applications and tests for creating frames easily without having to type out a bunch of constants. - Request ID is an integer. + Request ID and stream IDs are integers. - Frame type and flags can be specified by integer or named constant. + Stream flags, frame type, and flags can be specified by integer or + named constant. Flags can be delimited by `|` to bitwise OR them together. """ - requestid, frametype, frameflags, payload = s.split(b' ', 3) + fields = s.split(b' ', 5) + requestid, streamid, streamflags, frametype, frameflags, payload = fields requestid = int(requestid) + streamid = int(streamid) + + finalstreamflags = 0 + for flag in streamflags.split(b'|'): + if flag in STREAM_FLAGS: + finalstreamflags |= STREAM_FLAGS[flag] + else: + finalstreamflags |= int(flag) if frametype in FRAME_TYPES: frametype = FRAME_TYPES[frametype] @@ -169,7 +198,8 @@ payload = stringutil.unescapestr(payload) - return makeframe(requestid=requestid, typeid=frametype, + return makeframe(requestid=requestid, streamid=streamid, + streamflags=finalstreamflags, typeid=frametype, flags=finalflags, payload=payload) def parseheader(data): @@ -179,17 +209,21 @@ buffer is expected to be large enough to hold a full header. """ # 24 bits payload length (little endian) + # 16 bits request ID + # 8 bits stream ID + # 8 bits stream flags # 4 bits frame type # 4 bits frame flags # ... payload framelength = data[0] + 256 * data[1] + 16384 * data[2] - requestid = struct.unpack_from(r'> 4 frameflags = typeflags & 0x0f - return frameheader(framelength, requestid, frametype, frameflags) + return frameheader(framelength, requestid, streamid, streamflags, + frametype, frameflags) def readframe(fh): """Read a unified framing protocol frame from a file object. @@ -216,7 +250,8 @@ raise error.Abort(_('frame length error: expected %d; got %d') % (h.length, len(payload))) - return frame(h.requestid, h.typeid, h.flags, payload) + return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, + payload) def createcommandframes(stream, requestid, cmd, args, datafh=None): """Create frames necessary to transmit a request to run a command. @@ -398,12 +433,28 @@ class stream(object): """Represents a logical unidirectional series of frames.""" + def __init__(self, streamid, active=False): + self.streamid = streamid + self._active = False + def makeframe(self, requestid, typeid, flags, payload): """Create a frame to be sent out over this stream. Only returns the frame instance. Does not actually send it. """ - return makeframe(requestid, typeid, flags, payload) + streamflags = 0 + if not self._active: + streamflags |= STREAM_FLAG_BEGIN_STREAM + self._active = True + + return makeframe(requestid, self.streamid, streamflags, typeid, flags, + payload) + +def ensureserverstream(stream): + if stream.streamid % 2: + raise error.ProgrammingError('server should only write to even ' + 'numbered streams; %d is not even' % + stream.streamid) class serverreactor(object): """Holds state of a server handling frame-based protocol requests. @@ -483,6 +534,8 @@ self._deferoutput = deferoutput self._state = 'idle' self._bufferedframegens = [] + # stream id -> stream instance for all active streams from the client. + self._incomingstreams = {} # request id -> dict of commands that are actively being received. self._receivingcommands = {} # Request IDs that have been received and are actively being processed. @@ -496,6 +549,30 @@ Returns a dict with an ``action`` key that details what action, if any, the consumer should take next. """ + if not frame.streamid % 2: + self._state = 'errored' + return self._makeerrorresult( + _('received frame with even numbered stream ID: %d') % + frame.streamid) + + if frame.streamid not in self._incomingstreams: + if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: + self._state = 'errored' + return self._makeerrorresult( + _('received frame on unknown inactive stream without ' + 'beginning of stream flag set')) + + self._incomingstreams[frame.streamid] = stream(frame.streamid) + + if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: + # TODO handle decoding frames + self._state = 'errored' + raise error.ProgrammingError('support for decoding stream payloads ' + 'not yet implemented') + + if frame.streamflags & STREAM_FLAG_END_STREAM: + del self._incomingstreams[frame.streamid] + handlers = { 'idle': self._onframeidle, 'command-receiving': self._onframecommandreceiving, @@ -513,6 +590,8 @@ The raw bytes response is passed as an argument. """ + ensureserverstream(stream) + def sendframes(): for frame in createbytesresponseframesfrombytes(stream, requestid, data): @@ -552,6 +631,8 @@ } def onapplicationerror(self, stream, requestid, msg): + ensureserverstream(stream) + return 'sendframes', { 'framegen': createerrorframe(stream, requestid, msg, application=True),