Mercurial > public > mercurial-scm > hg-stable
changeset 52717:068398a8c9cb
typing: add some type annotations to the `wireprotoframing` module
`flushcommands()` and a few other generators got a slightly modified return type
after the pyupgrade series ending with 70a75d379daf. This fixes those, and grabs
some other low and mid hanging fruit here.
Note that there are possibly two bugs detected here- in the `inputstream` and
`outputstream` classes, the `flush()` and `finish()` methods respectively have
surprising issues. pytype flagged the latter when the return value was typed,
but it surprisingly doesn't see the lack of `flush()` on the `Decoder` class.
I wonder if either of these are the cause of the occasional odd-length string
error I see in the `hg serve` output from time to time. In any event, punt
those changes down the road for now, and mark with a TODO.
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Mon, 06 Jan 2025 20:32:31 -0500 |
parents | 1a612f9ec2c4 |
children | 061bfd699a56 |
files | mercurial/wireprotoframing.py |
diffstat | 1 files changed, 151 insertions(+), 82 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py Mon Jan 13 15:37:08 2025 -0500 +++ b/mercurial/wireprotoframing.py Mon Jan 06 20:32:31 2025 -0500 @@ -41,6 +41,13 @@ stringutil, ) +if typing.TYPE_CHECKING: + from typing import ( + Iterator, + ) + + HandleSendFramesReturnT = tuple[bytes, dict[bytes, Iterator[bytearray]]] + FRAME_HEADER_SIZE = 8 DEFAULT_MAX_FRAME_SIZE = 32768 @@ -48,7 +55,7 @@ STREAM_FLAG_END_STREAM = 0x02 STREAM_FLAG_ENCODING_APPLIED = 0x04 -STREAM_FLAGS = { +STREAM_FLAGS: dict[bytes, int] = { b'stream-begin': STREAM_FLAG_BEGIN_STREAM, b'stream-end': STREAM_FLAG_END_STREAM, b'encoded': STREAM_FLAG_ENCODING_APPLIED, @@ -63,7 +70,7 @@ FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08 FRAME_TYPE_STREAM_SETTINGS = 0x09 -FRAME_TYPES = { +FRAME_TYPES: dict[bytes, int] = { b'command-request': FRAME_TYPE_COMMAND_REQUEST, b'command-data': FRAME_TYPE_COMMAND_DATA, b'command-response': FRAME_TYPE_COMMAND_RESPONSE, @@ -79,7 +86,7 @@ FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08 -FLAGS_COMMAND_REQUEST = { +FLAGS_COMMAND_REQUEST: dict[bytes, int] = { b'new': FLAG_COMMAND_REQUEST_NEW, b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION, b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES, @@ -89,7 +96,7 @@ FLAG_COMMAND_DATA_CONTINUATION = 0x01 FLAG_COMMAND_DATA_EOS = 0x02 -FLAGS_COMMAND_DATA = { +FLAGS_COMMAND_DATA: dict[bytes, int] = { b'continuation': FLAG_COMMAND_DATA_CONTINUATION, b'eos': FLAG_COMMAND_DATA_EOS, } @@ -97,7 +104,7 @@ FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01 FLAG_COMMAND_RESPONSE_EOS = 0x02 -FLAGS_COMMAND_RESPONSE = { +FLAGS_COMMAND_RESPONSE: dict[bytes, int] = { b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION, b'eos': FLAG_COMMAND_RESPONSE_EOS, } @@ -105,7 +112,7 @@ FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02 -FLAGS_SENDER_PROTOCOL_SETTINGS = { +FLAGS_SENDER_PROTOCOL_SETTINGS: dict[bytes, int] = { b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION, b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS, } @@ -113,13 +120,13 @@ FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02 -FLAGS_STREAM_ENCODING_SETTINGS = { +FLAGS_STREAM_ENCODING_SETTINGS: dict[bytes, int] = { b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION, b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS, } # Maps frame types to their available flags. -FRAME_TYPE_FLAGS = { +FRAME_TYPE_FLAGS: dict[int, dict[bytes, int]] = { FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST, FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA, FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE, @@ -133,7 +140,7 @@ ARGUMENT_RECORD_HEADER = struct.Struct('<HH') -def humanflags(mapping, value): +def humanflags(mapping: dict[bytes, int], value: int) -> bytes: """Convert a numeric flags value to a human value, using a mapping table.""" namemap = {v: k for k, v in mapping.items()} flags = [] @@ -150,24 +157,24 @@ class frameheader: """Represents the data in a frame header.""" - length = attr.ib() - requestid = attr.ib() - streamid = attr.ib() - streamflags = attr.ib() - typeid = attr.ib() - flags = attr.ib() + length = attr.ib(type=int) + requestid = attr.ib(type=int) + streamid = attr.ib(type=int) + streamflags = attr.ib(type=int) + typeid = attr.ib(type=int) + flags = attr.ib(type=int) @attr.s(slots=True, repr=False) class frame: """Represents a parsed frame.""" - requestid = attr.ib() - streamid = attr.ib() - streamflags = attr.ib() - typeid = attr.ib() - flags = attr.ib() - payload = attr.ib() + requestid = attr.ib(type=int) + streamid = attr.ib(type=int) + streamflags = attr.ib(type=int) + typeid = attr.ib(type=int) + flags = attr.ib(type=int) + payload = attr.ib(type=bytes) @encoding.strmethod def __repr__(self): @@ -191,7 +198,14 @@ ) -def makeframe(requestid, streamid, streamflags, typeid, flags, payload): +def makeframe( + requestid: int, + streamid: int, + streamflags: int, + typeid: int, + flags: int, + payload: bytes, +) -> bytearray: """Assemble a frame into a byte array.""" # TODO assert size of payload. frame = bytearray(FRAME_HEADER_SIZE + len(payload)) @@ -212,7 +226,7 @@ return frame -def makeframefromhumanstring(s): +def makeframefromhumanstring(s: bytes) -> bytearray: """Create a frame from a human readable string Strings have the form: @@ -278,7 +292,7 @@ ) -def parseheader(data): +def parseheader(data: bytes) -> frameheader: """Parse a unified framing protocol frame header from a buffer. The header is expected to be in the buffer at offset 0 and the @@ -303,7 +317,7 @@ ) -def readframe(fh): +def readframe(fh) -> frame | None: """Read a unified framing protocol frame from a file object. Returns a 3-tuple of (type, flags, payload) for the decoded frame or @@ -338,14 +352,14 @@ def createcommandframes( - stream, - requestid, + stream: stream, + requestid: int, cmd, args, datafh=None, - maxframesize=DEFAULT_MAX_FRAME_SIZE, + maxframesize: int = DEFAULT_MAX_FRAME_SIZE, redirect=None, -): +) -> Iterator[bytearray]: """Create frames necessary to transmit a request to run a command. This is a generator of bytearrays. Each item represents a frame @@ -414,7 +428,9 @@ break -def createcommandresponseokframe(stream, requestid): +def createcommandresponseokframe( + stream: outputstream, requestid: int +) -> bytearray | None: overall = b''.join(cborutil.streamencode({b'status': b'ok'})) if stream.streamsettingssent: @@ -436,8 +452,10 @@ def createcommandresponseeosframes( - stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE -): + stream: outputstream, + requestid: int, + maxframesize: int = DEFAULT_MAX_FRAME_SIZE, +) -> Iterator[bytearray]: """Create an empty payload frame representing command end-of-stream.""" payload = stream.flush() @@ -465,7 +483,9 @@ break -def createalternatelocationresponseframe(stream, requestid, location): +def createalternatelocationresponseframe( + stream: outputstream, requestid: int, location +) -> bytearray: data = { b'status': b'redirect', b'location': { @@ -504,7 +524,9 @@ ) -def createcommanderrorresponse(stream, requestid, message, args=None): +def createcommanderrorresponse( + stream: stream, requestid: int, message: bytes, args=None +) -> Iterator[bytearray]: # TODO should this be using a list of {'msg': ..., 'args': {}} so atom # formatting works consistently? m = { @@ -527,7 +549,9 @@ ) -def createerrorframe(stream, requestid, msg, errtype): +def createerrorframe( + stream: stream, requestid: int, msg: bytes, errtype: bytes +) -> Iterator[bytearray]: # TODO properly handle frame size limits. assert len(msg) <= DEFAULT_MAX_FRAME_SIZE @@ -549,8 +573,11 @@ def createtextoutputframe( - stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE -): + stream: stream, + requestid: int, + atoms, + maxframesize: int = DEFAULT_MAX_FRAME_SIZE, +) -> Iterator[bytearray]: """Create a text output frame to render text to people. ``atoms`` is a 3-tuple of (formatting string, args, labels). @@ -616,7 +643,9 @@ level. """ - def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): + def __init__( + self, stream, requestid: int, maxframesize: int = DEFAULT_MAX_FRAME_SIZE + ) -> None: self._stream = stream self._requestid = requestid self._maxsize = maxframesize @@ -861,7 +890,7 @@ STREAM_ENCODERS_ORDER: list[bytes] = [] -def populatestreamencoders(): +def populatestreamencoders() -> None: if STREAM_ENCODERS: return @@ -887,11 +916,16 @@ class stream: """Represents a logical unidirectional series of frames.""" - def __init__(self, streamid, active=False): + streamid: int + _active: bool + + def __init__(self, streamid: int, active: bool = False) -> None: self.streamid = streamid self._active = active - def makeframe(self, requestid, typeid, flags, payload): + def makeframe( + self, requestid: int, typeid: int, flags: int, payload: bytes + ) -> bytearray: """Create a frame to be sent out over this stream. Only returns the frame instance. Does not actually send it. @@ -909,11 +943,13 @@ class inputstream(stream): """Represents a stream used for receiving data.""" - def __init__(self, streamid, active=False): + _decoder: Decoder | None + + def __init__(self, streamid: int, active: bool = False) -> None: super().__init__(streamid, active=active) self._decoder = None - def setdecoder(self, ui, name, extraobjs): + def setdecoder(self, ui, name: bytes, extraobjs) -> None: """Set the decoder for this stream. Receives the stream profile name and any additional CBOR objects @@ -924,7 +960,7 @@ self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs) - def decode(self, data): + def decode(self, data) -> bytes: # Default is identity decoder. We don't bother instantiating one # because it is trivial. if not self._decoder: @@ -932,23 +968,29 @@ return self._decoder.decode(data) - def flush(self): + def flush(self) -> bytes: if not self._decoder: return b'' + # TODO: this looks like a bug- no decoder class defines flush(), so + # either no decoders are used, or no inputstream is flushed. return self._decoder.flush() class outputstream(stream): """Represents a stream used for sending data.""" - def __init__(self, streamid, active=False): + streamsettingssent: bool + _encoder: Encoder | None + _encodername: bytes | None + + def __init__(self, streamid: int, active: bool = False) -> None: super().__init__(streamid, active=active) self.streamsettingssent = False self._encoder = None self._encodername = None - def setencoder(self, ui, name): + def setencoder(self, ui, name: bytes) -> None: """Set the encoder for this stream. Receives the stream profile name. @@ -959,25 +1001,33 @@ self._encoder = STREAM_ENCODERS[name][0](ui) self._encodername = name - def encode(self, data): + def encode(self, data) -> bytes: if not self._encoder: return data return self._encoder.encode(data) - def flush(self): + def flush(self) -> bytes: if not self._encoder: return b'' return self._encoder.flush() - def finish(self): + # TODO: was this supposed to return the result of finish()? + def finish(self): # -> bytes: if not self._encoder: return b'' self._encoder.finish() - def makeframe(self, requestid, typeid, flags, payload, encoded=False): + def makeframe( + self, + requestid: int, + typeid: int, + flags: int, + payload: bytes, + encoded: bool = False, + ) -> bytearray: """Create a frame to be sent out over this stream. Only returns the frame instance. Does not actually send it. @@ -1006,7 +1056,7 @@ requestid, self.streamid, streamflags, typeid, flags, payload ) - def makestreamsettingsframe(self, requestid): + def makestreamsettingsframe(self, requestid: int) -> bytearray | None: """Create a stream settings frame for this stream. Returns frame data or None if no stream settings frame is needed or has @@ -1024,7 +1074,7 @@ ) -def ensureserverstream(stream): +def ensureserverstream(stream: stream) -> None: if stream.streamid % 2: raise error.ProgrammingError( b'server should only write to even ' @@ -1032,7 +1082,7 @@ ) -DEFAULT_PROTOCOL_SETTINGS = { +DEFAULT_PROTOCOL_SETTINGS: dict[bytes, list[bytes]] = { b'contentencodings': [b'identity'], } @@ -1102,7 +1152,9 @@ between who responds to what. """ - def __init__(self, ui, deferoutput=False): + _bufferedframegens: list[Iterator[bytearray]] + + def __init__(self, ui, deferoutput: bool = False) -> None: """Construct a new server reactor. ``deferoutput`` can be used to indicate that no output frames should be @@ -1134,7 +1186,7 @@ populatestreamencoders() - def onframerecv(self, frame): + def onframerecv(self, frame: frame): """Process a frame that has been received off the wire. Returns a dict with an ``action`` key that details what action, @@ -1183,7 +1235,9 @@ return meth(frame) - def oncommandresponsereadyobjects(self, stream, requestid, objs): + def oncommandresponsereadyobjects( + self, stream, requestid: int, objs + ) -> HandleSendFramesReturnT: """Signal that objects are ready to be sent to the client. ``objs`` is an iterable of objects (typically a generator) that will @@ -1322,7 +1376,7 @@ return self._handlesendframes(sendframes()) - def oninputeof(self): + def oninputeof(self) -> tuple[bytes, dict[bytes, Iterator[bytearray]]]: """Signals that end of input has been received. No more frames will be received. All pending activity should be @@ -1342,7 +1396,9 @@ b'framegen': makegen(), } - def _handlesendframes(self, framegen): + def _handlesendframes( + self, framegen: Iterator[bytearray] + ) -> HandleSendFramesReturnT: if self._deferoutput: self._bufferedframegens.append(framegen) return b'noop', {} @@ -1351,7 +1407,9 @@ b'framegen': framegen, } - def onservererror(self, stream, requestid, msg): + def onservererror( + self, stream: stream, requestid: int, msg: bytes + ) -> HandleSendFramesReturnT: ensureserverstream(stream) def sendframes(): @@ -1363,7 +1421,9 @@ return self._handlesendframes(sendframes()) - def oncommanderror(self, stream, requestid, message, args=None): + def oncommanderror( + self, stream: stream, requestid: int, message: bytes, args=None + ) -> HandleSendFramesReturnT: """Called when a command encountered an error before sending output.""" ensureserverstream(stream) @@ -1376,7 +1436,7 @@ return self._handlesendframes(sendframes()) - def makeoutputstream(self): + def makeoutputstream(self) -> outputstream: """Create a stream to be used for sending data to the client. If this is called before protocol settings frames are received, we @@ -1398,12 +1458,12 @@ return s - def _makeerrorresult(self, msg): + def _makeerrorresult(self, msg: bytes) -> tuple[bytes, dict[bytes, bytes]]: return b'error', { b'message': msg, } - def _makeruncommandresult(self, requestid): + def _makeruncommandresult(self, requestid: int): entry = self._receivingcommands[requestid] if not entry[b'requestdone']: @@ -1446,12 +1506,12 @@ }, ) - def _makewantframeresult(self): + def _makewantframeresult(self) -> tuple[bytes, dict[bytes, bytes]]: return b'wantframe', { b'state': self._state, } - def _validatecommandrequestframe(self, frame): + def _validatecommandrequestframe(self, frame: frame): new = frame.flags & FLAG_COMMAND_REQUEST_NEW continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION @@ -1473,7 +1533,7 @@ ) ) - def _onframeinitial(self, frame): + def _onframeinitial(self, frame: frame): # Called when we receive a frame when in the "initial" state. if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: self._state = b'protocol-settings-receiving' @@ -1494,7 +1554,7 @@ % frame.typeid ) - def _onframeprotocolsettings(self, frame): + def _onframeprotocolsettings(self, frame: frame): assert self._state == b'protocol-settings-receiving' assert self._protocolsettingsdecoder is not None @@ -1570,7 +1630,7 @@ return self._makewantframeresult() - def _onframeidle(self, frame): + def _onframeidle(self, frame: frame): # The only frame type that should be received in this state is a # command request. if frame.typeid != FRAME_TYPE_COMMAND_REQUEST: @@ -1623,7 +1683,7 @@ self._state = b'command-receiving' return self._makewantframeresult() - def _onframecommandreceiving(self, frame): + def _onframecommandreceiving(self, frame: frame): if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: # Process new command requests as such. if frame.flags & FLAG_COMMAND_REQUEST_NEW: @@ -1701,7 +1761,7 @@ _(b'received unexpected frame type: %d') % frame.typeid ) - def _handlecommanddataframe(self, frame, entry): + def _handlecommanddataframe(self, frame: frame, entry): assert frame.typeid == FRAME_TYPE_COMMAND_DATA # TODO support streaming data instead of buffering it. @@ -1716,14 +1776,18 @@ self._state = b'errored' return self._makeerrorresult(_(b'command data frame without flags')) - def _onframeerrored(self, frame): + def _onframeerrored(self, frame: frame): return self._makeerrorresult(_(b'server already errored')) class commandrequest: """Represents a request to run a command.""" - def __init__(self, requestid, name, args, datafh=None, redirect=None): + state: bytes + + def __init__( + self, requestid: int, name, args, datafh=None, redirect=None + ) -> None: self.requestid = requestid self.name = name self.args = args @@ -1779,13 +1843,16 @@ respectively. """ + _hasmultiplesend: bool + _buffersends: bool + def __init__( self, ui, - hasmultiplesend=False, - buffersends=True, + hasmultiplesend: bool = False, + buffersends: bool = True, clientcontentencoders=None, - ): + ) -> None: """Create a new instance. ``hasmultiplesend`` indicates whether multiple sends are supported @@ -1859,7 +1926,7 @@ }, ) - def flushcommands(self): + def flushcommands(self) -> tuple[bytes, dict[bytes, Iterator[bytearray]]]: """Request that all queued commands be sent. If any commands are buffered, this will instruct the caller to send @@ -1892,7 +1959,9 @@ b'framegen': makeframes(), } - def _makecommandframes(self, request): + def _makecommandframes( + self, request: commandrequest + ) -> Iterator[bytearray]: """Emit frames to issue a command request. As a side-effect, update request accounting to reflect its changed @@ -1932,7 +2001,7 @@ request.state = b'sent' - def onframerecv(self, frame): + def onframerecv(self, frame: frame): """Process a frame that has been received off the wire. Returns a 2-tuple of (action, meta) describing further action the @@ -2004,7 +2073,7 @@ return meth(request, frame) - def _onstreamsettingsframe(self, frame): + def _onstreamsettingsframe(self, frame: frame): assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION @@ -2092,7 +2161,7 @@ return b'noop', {} - def _oncommandresponseframe(self, request, frame): + def _oncommandresponseframe(self, request: commandrequest, frame: frame): if frame.flags & FLAG_COMMAND_RESPONSE_EOS: request.state = b'received' del self._activerequests[request.requestid] @@ -2107,7 +2176,7 @@ }, ) - def _onerrorresponseframe(self, request, frame): + def _onerrorresponseframe(self, request: commandrequest, frame: frame): request.state = b'errored' del self._activerequests[request.requestid]