Mercurial > public > mercurial-scm > hg
diff mercurial/wireprotoframing.py @ 37285:3ed344546d9e
wireproto: start to associate frame generation with a stream
An upcoming commit will introduce "streams" into the frame-based wire
protocol. In preparation for this invasive change, we introduce a basic
"stream" class and have all operations that create frames also operate
alongside a stream instance.
Differential Revision: https://phab.mercurial-scm.org/D2906
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 26 Mar 2018 13:57:22 -0700 |
parents | f0b6fbea00cf |
children | 9bfcbe4f4745 |
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py Mon Mar 26 13:51:22 2018 -0700 +++ b/mercurial/wireprotoframing.py Mon Mar 26 13:57:22 2018 -0700 @@ -218,7 +218,7 @@ return frame(h.requestid, h.typeid, h.flags, payload) -def createcommandframes(requestid, cmd, args, datafh=None): +def createcommandframes(stream, requestid, cmd, args, datafh=None): """Create frames necessary to transmit a request to run a command. This is a generator of bytearrays. Each item represents a frame @@ -233,8 +233,8 @@ if not flags: flags |= FLAG_COMMAND_NAME_EOS - yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME, - flags=flags, payload=cmd) + yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME, + flags=flags, payload=cmd) for i, k in enumerate(sorted(args)): v = args[k] @@ -250,10 +250,10 @@ payload[offset:offset + len(v)] = v flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0 - yield makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_ARGUMENT, - flags=flags, - payload=payload) + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_ARGUMENT, + flags=flags, + payload=payload) if datafh: while True: @@ -267,15 +267,15 @@ assert datafh.read(1) == b'' done = True - yield makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_DATA, - flags=flags, - payload=data) + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_DATA, + flags=flags, + payload=data) if done: break -def createbytesresponseframesfrombytes(requestid, data, +def createbytesresponseframesfrombytes(stream, requestid, data, maxframesize=DEFAULT_MAX_FRAME_SIZE): """Create a raw frame to send a bytes response from static bytes input. @@ -284,10 +284,10 @@ # Simple case of a single frame. if len(data) <= maxframesize: - yield makeframe(requestid=requestid, - typeid=FRAME_TYPE_BYTES_RESPONSE, - flags=FLAG_BYTES_RESPONSE_EOS, - payload=data) + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_BYTES_RESPONSE, + flags=FLAG_BYTES_RESPONSE_EOS, + payload=data) return offset = 0 @@ -301,15 +301,15 @@ else: flags = FLAG_BYTES_RESPONSE_CONTINUATION - yield makeframe(requestid=requestid, - typeid=FRAME_TYPE_BYTES_RESPONSE, - flags=flags, - payload=chunk) + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_BYTES_RESPONSE, + flags=flags, + payload=chunk) if done: break -def createerrorframe(requestid, msg, protocol=False, application=False): +def createerrorframe(stream, requestid, msg, protocol=False, application=False): # TODO properly handle frame size limits. assert len(msg) <= DEFAULT_MAX_FRAME_SIZE @@ -319,12 +319,12 @@ if application: flags |= FLAG_ERROR_RESPONSE_APPLICATION - yield makeframe(requestid=requestid, - typeid=FRAME_TYPE_ERROR_RESPONSE, - flags=flags, - payload=msg) + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_ERROR_RESPONSE, + flags=flags, + payload=msg) -def createtextoutputframe(requestid, atoms): +def createtextoutputframe(stream, requestid, atoms): """Create a text output frame to render text to people. ``atoms`` is a 3-tuple of (formatting string, args, labels). @@ -390,10 +390,20 @@ if bytesleft < 0: raise ValueError('cannot encode data in a single frame') - yield makeframe(requestid=requestid, - typeid=FRAME_TYPE_TEXT_OUTPUT, - flags=0, - payload=b''.join(atomchunks)) + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_TEXT_OUTPUT, + flags=0, + payload=b''.join(atomchunks)) + +class stream(object): + """Represents a logical unidirectional series of frames.""" + + def makeframe(self, requestid, typeid, flags, payload): + """Create a frame to be sent out over this stream. + + Only returns the frame instance. Does not actually send it. + """ + return makeframe(requestid, typeid, flags, payload) class serverreactor(object): """Holds state of a server handling frame-based protocol requests. @@ -498,13 +508,14 @@ return meth(frame) - def onbytesresponseready(self, requestid, data): + def onbytesresponseready(self, stream, requestid, data): """Signal that a bytes response is ready to be sent to the client. The raw bytes response is passed as an argument. """ def sendframes(): - for frame in createbytesresponseframesfrombytes(requestid, data): + for frame in createbytesresponseframesfrombytes(stream, requestid, + data): yield frame self._activecommands.remove(requestid) @@ -540,9 +551,10 @@ 'framegen': makegen(), } - def onapplicationerror(self, requestid, msg): + def onapplicationerror(self, stream, requestid, msg): return 'sendframes', { - 'framegen': createerrorframe(requestid, msg, application=True), + 'framegen': createerrorframe(stream, requestid, msg, + application=True), } def _makeerrorresult(self, msg):