Mercurial > public > mercurial-scm > hg-stable
diff mercurial/wireprotoframing.py @ 43076:2372284d9457
formatting: blacken the codebase
This is using my patch to black
(https://github.com/psf/black/pull/826) so we don't un-wrap collection
literals.
Done with:
hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S
# skip-blame mass-reformatting only
# no-check-commit reformats foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D6971
author | Augie Fackler <augie@google.com> |
---|---|
date | Sun, 06 Oct 2019 09:45:02 -0400 |
parents | 2c55716f8a1c |
children | 687b865b95ad |
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py Sat Oct 05 10:29:34 2019 -0400 +++ b/mercurial/wireprotoframing.py Sun Oct 06 09:45:02 2019 -0400 @@ -15,9 +15,7 @@ import struct from .i18n import _ -from .thirdparty import ( - attr, -) +from .thirdparty import attr from . import ( encoding, error, @@ -121,6 +119,7 @@ ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH') + def humanflags(mapping, value): """Convert a numeric flags value to a human value, using a mapping table.""" namemap = {v: k for k, v in mapping.iteritems()} @@ -133,6 +132,7 @@ return b'|'.join(flags) + @attr.s(slots=True) class frameheader(object): """Represents the data in a frame header.""" @@ -144,6 +144,7 @@ typeid = attr.ib() flags = attr.ib() + @attr.s(slots=True, repr=False) class frame(object): """Represents a parsed frame.""" @@ -163,11 +164,19 @@ typename = name break - return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; ' - 'type=%s; flags=%s)' % ( - len(self.payload), self.requestid, self.streamid, - humanflags(STREAM_FLAGS, self.streamflags), typename, - humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags))) + return ( + 'frame(size=%d; request=%d; stream=%d; streamflags=%s; ' + 'type=%s; flags=%s)' + % ( + len(self.payload), + self.requestid, + self.streamid, + humanflags(STREAM_FLAGS, self.streamflags), + typename, + humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags), + ) + ) + def makeframe(requestid, streamid, streamflags, typeid, flags, payload): """Assemble a frame into a byte array.""" @@ -189,6 +198,7 @@ return frame + def makeframefromhumanstring(s): """Create a frame from a human readable string @@ -238,15 +248,22 @@ finalflags |= int(flag) if payload.startswith(b'cbor:'): - payload = b''.join(cborutil.streamencode( - stringutil.evalpythonliteral(payload[5:]))) + payload = b''.join( + cborutil.streamencode(stringutil.evalpythonliteral(payload[5:])) + ) else: payload = stringutil.unescapestr(payload) - return makeframe(requestid=requestid, streamid=streamid, - streamflags=finalstreamflags, typeid=frametype, - flags=finalflags, payload=payload) + return makeframe( + requestid=requestid, + streamid=streamid, + streamflags=finalstreamflags, + typeid=frametype, + flags=finalflags, + payload=payload, + ) + def parseheader(data): """Parse a unified framing protocol frame header from a buffer. @@ -265,11 +282,13 @@ requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3) typeflags = data[7] - frametype = (typeflags & 0xf0) >> 4 - frameflags = typeflags & 0x0f + frametype = (typeflags & 0xF0) >> 4 + frameflags = typeflags & 0x0F - return frameheader(framelength, requestid, streamid, streamflags, - frametype, frameflags) + return frameheader( + framelength, requestid, streamid, streamflags, frametype, frameflags + ) + def readframe(fh): """Read a unified framing protocol frame from a file object. @@ -286,22 +305,34 @@ return None if readcount != FRAME_HEADER_SIZE: - raise error.Abort(_('received incomplete frame: got %d bytes: %s') % - (readcount, header)) + raise error.Abort( + _('received incomplete frame: got %d bytes: %s') + % (readcount, header) + ) h = parseheader(header) payload = fh.read(h.length) if len(payload) != h.length: - raise error.Abort(_('frame length error: expected %d; got %d') % - (h.length, len(payload))) + raise error.Abort( + _('frame length error: expected %d; got %d') + % (h.length, len(payload)) + ) + + return frame( + h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload + ) + - return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, - payload) - -def createcommandframes(stream, requestid, cmd, args, datafh=None, - maxframesize=DEFAULT_MAX_FRAME_SIZE, - redirect=None): +def createcommandframes( + stream, + requestid, + cmd, + args, + datafh=None, + maxframesize=DEFAULT_MAX_FRAME_SIZE, + redirect=None, +): """Create frames necessary to transmit a request to run a command. This is a generator of bytearrays. Each item represents a frame @@ -331,16 +362,18 @@ if datafh: flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA - payload = data[offset:offset + maxframesize] + payload = data[offset : offset + maxframesize] offset += len(payload) if len(payload) == maxframesize and offset < len(data): flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_REQUEST, - flags=flags, - payload=payload) + yield stream.makeframe( + requestid=requestid, + typeid=FRAME_TYPE_COMMAND_REQUEST, + flags=flags, + payload=payload, + ) if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES): break @@ -357,14 +390,17 @@ assert datafh.read(1) == b'' done = True - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_DATA, - flags=flags, - payload=data) + yield stream.makeframe( + requestid=requestid, + typeid=FRAME_TYPE_COMMAND_DATA, + flags=flags, + payload=data, + ) if done: break + def createcommandresponseokframe(stream, requestid): overall = b''.join(cborutil.streamencode({b'status': b'ok'})) @@ -377,20 +413,24 @@ else: encoded = False - return stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=overall, - encoded=encoded) + return stream.makeframe( + requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=overall, + encoded=encoded, + ) -def createcommandresponseeosframes(stream, requestid, - maxframesize=DEFAULT_MAX_FRAME_SIZE): + +def createcommandresponseeosframes( + stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE +): """Create an empty payload frame representing command end-of-stream.""" payload = stream.flush() offset = 0 while True: - chunk = payload[offset:offset + maxframesize] + chunk = payload[offset : offset + maxframesize] offset += len(chunk) done = offset == len(payload) @@ -400,26 +440,31 @@ else: flags = FLAG_COMMAND_RESPONSE_CONTINUATION - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=flags, - payload=chunk, - encoded=payload != b'') + yield stream.makeframe( + requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=chunk, + encoded=payload != b'', + ) if done: break + def createalternatelocationresponseframe(stream, requestid, location): data = { b'status': b'redirect', - b'location': { - b'url': location.url, - b'mediatype': location.mediatype, - } + b'location': {b'url': location.url, b'mediatype': location.mediatype,}, } - for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts', - r'servercadercerts'): + for a in ( + r'size', + r'fullhashes', + r'fullhashseed', + r'serverdercerts', + r'servercadercerts', + ): value = getattr(location, a) if value is not None: data[b'location'][pycompat.bytestr(a)] = value @@ -432,48 +477,52 @@ else: encoded = False - return stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=payload, - encoded=encoded) + return stream.makeframe( + requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=payload, + encoded=encoded, + ) + def createcommanderrorresponse(stream, requestid, message, args=None): # TODO should this be using a list of {'msg': ..., 'args': {}} so atom # formatting works consistently? - m = { - b'status': b'error', - b'error': { - b'message': message, - } - } + m = {b'status': b'error', b'error': {b'message': message,}} if args: m[b'error'][b'args'] = args overall = b''.join(cborutil.streamencode(m)) - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_EOS, - payload=overall) + yield stream.makeframe( + requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_EOS, + payload=overall, + ) + def createerrorframe(stream, requestid, msg, errtype): # TODO properly handle frame size limits. assert len(msg) <= DEFAULT_MAX_FRAME_SIZE - payload = b''.join(cborutil.streamencode({ - b'type': errtype, - b'message': [{b'msg': msg}], - })) + payload = b''.join( + cborutil.streamencode({b'type': errtype, b'message': [{b'msg': msg}],}) + ) - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_ERROR_RESPONSE, - flags=0, - payload=payload) + yield stream.makeframe( + requestid=requestid, + typeid=FRAME_TYPE_ERROR_RESPONSE, + flags=0, + payload=payload, + ) -def createtextoutputframe(stream, requestid, atoms, - maxframesize=DEFAULT_MAX_FRAME_SIZE): + +def createtextoutputframe( + stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE +): """Create a text output frame to render text to people. ``atoms`` is a 3-tuple of (formatting string, args, labels). @@ -504,8 +553,9 @@ args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args] # Labels must be ASCII. - labels = [l.decode(r'ascii', r'strict').encode(r'ascii') - for l in labels] + labels = [ + l.decode(r'ascii', r'strict').encode(r'ascii') for l in labels + ] atom = {b'msg': formatting} if args: @@ -520,10 +570,13 @@ if len(payload) > maxframesize: raise ValueError('cannot encode data in a single frame') - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_TEXT_OUTPUT, - flags=0, - payload=payload) + yield stream.makeframe( + requestid=requestid, + typeid=FRAME_TYPE_TEXT_OUTPUT, + flags=0, + payload=payload, + ) + class bufferingcommandresponseemitter(object): """Helper object to emit command response frames intelligently. @@ -536,6 +589,7 @@ So it might make sense to implement this functionality at the stream level. """ + def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): self._stream = stream self._requestid = requestid @@ -581,7 +635,7 @@ # Now emit frames for the big chunk. offset = 0 while True: - chunk = data[offset:offset + self._maxsize] + chunk = data[offset : offset + self._maxsize] offset += len(chunk) yield self._stream.makeframe( @@ -589,7 +643,8 @@ typeid=FRAME_TYPE_COMMAND_RESPONSE, flags=FLAG_COMMAND_RESPONSE_CONTINUATION, payload=chunk, - encoded=True) + encoded=True, + ) if offset == len(data): return @@ -625,13 +680,17 @@ typeid=FRAME_TYPE_COMMAND_RESPONSE, flags=FLAG_COMMAND_RESPONSE_CONTINUATION, payload=payload, - encoded=True) + encoded=True, + ) + # TODO consider defining encoders/decoders using the util.compressionengine # mechanism. + class identityencoder(object): """Encoder for the "identity" stream encoding profile.""" + def __init__(self, ui): pass @@ -644,20 +703,24 @@ def finish(self): return b'' + class identitydecoder(object): """Decoder for the "identity" stream encoding profile.""" def __init__(self, ui, extraobjs): if extraobjs: - raise error.Abort(_('identity decoder received unexpected ' - 'additional values')) + raise error.Abort( + _('identity decoder received unexpected ' 'additional values') + ) def decode(self, data): return data + class zlibencoder(object): def __init__(self, ui): import zlib + self._zlib = zlib self._compressor = zlib.compressobj() @@ -674,13 +737,15 @@ self._compressor = None return res + class zlibdecoder(object): def __init__(self, ui, extraobjs): import zlib if extraobjs: - raise error.Abort(_('zlib decoder received unexpected ' - 'additional values')) + raise error.Abort( + _('zlib decoder received unexpected ' 'additional values') + ) self._decompressor = zlib.decompressobj() @@ -692,6 +757,7 @@ return self._decompressor.decompress(data) + class zstdbaseencoder(object): def __init__(self, level): from . import zstd @@ -714,38 +780,46 @@ self._compressor = None return res + class zstd8mbencoder(zstdbaseencoder): def __init__(self, ui): super(zstd8mbencoder, self).__init__(3) + class zstdbasedecoder(object): def __init__(self, maxwindowsize): from . import zstd + dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize) self._decompressor = dctx.decompressobj() def decode(self, data): return self._decompressor.decompress(data) + class zstd8mbdecoder(zstdbasedecoder): def __init__(self, ui, extraobjs): if extraobjs: - raise error.Abort(_('zstd8mb decoder received unexpected ' - 'additional values')) + raise error.Abort( + _('zstd8mb decoder received unexpected ' 'additional values') + ) super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576) + # We lazily populate this to avoid excessive module imports when importing # this module. STREAM_ENCODERS = {} STREAM_ENCODERS_ORDER = [] + def populatestreamencoders(): if STREAM_ENCODERS: return try: from . import zstd + zstd.__version__ except ImportError: zstd = None @@ -761,6 +835,7 @@ STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder) STREAM_ENCODERS_ORDER.append(b'identity') + class stream(object): """Represents a logical unidirectional series of frames.""" @@ -778,8 +853,10 @@ streamflags |= STREAM_FLAG_BEGIN_STREAM self._active = True - return makeframe(requestid, self.streamid, streamflags, typeid, flags, - payload) + return makeframe( + requestid, self.streamid, streamflags, typeid, flags, payload + ) + class inputstream(stream): """Represents a stream used for receiving data.""" @@ -813,6 +890,7 @@ return self._decoder.flush() + class outputstream(stream): """Represents a stream used for sending data.""" @@ -851,8 +929,7 @@ self._encoder.finish() - def makeframe(self, requestid, typeid, flags, payload, - encoded=False): + def makeframe(self, requestid, typeid, flags, payload, encoded=False): """Create a frame to be sent out over this stream. Only returns the frame instance. Does not actually send it. @@ -866,16 +943,20 @@ if not self.streamsettingssent: raise error.ProgrammingError( b'attempting to send encoded frame without sending stream ' - b'settings') + b'settings' + ) streamflags |= STREAM_FLAG_ENCODING_APPLIED - if (typeid == FRAME_TYPE_STREAM_SETTINGS - and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS): + if ( + typeid == FRAME_TYPE_STREAM_SETTINGS + and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS + ): self.streamsettingssent = True - return makeframe(requestid, self.streamid, streamflags, typeid, flags, - payload) + return makeframe( + requestid, self.streamid, streamflags, typeid, flags, payload + ) def makestreamsettingsframe(self, requestid): """Create a stream settings frame for this stream. @@ -887,19 +968,27 @@ return None payload = b''.join(cborutil.streamencode(self._encodername)) - return self.makeframe(requestid, FRAME_TYPE_STREAM_SETTINGS, - FLAG_STREAM_ENCODING_SETTINGS_EOS, payload) + return self.makeframe( + requestid, + FRAME_TYPE_STREAM_SETTINGS, + FLAG_STREAM_ENCODING_SETTINGS_EOS, + payload, + ) + def ensureserverstream(stream): if stream.streamid % 2: - raise error.ProgrammingError('server should only write to even ' - 'numbered streams; %d is not even' % - stream.streamid) + raise error.ProgrammingError( + 'server should only write to even ' + 'numbered streams; %d is not even' % stream.streamid + ) + DEFAULT_PROTOCOL_SETTINGS = { 'contentencodings': [b'identity'], } + class serverreactor(object): """Holds state of a server handling frame-based protocol requests. @@ -1006,23 +1095,28 @@ if not frame.streamid % 2: self._state = 'errored' return self._makeerrorresult( - _('received frame with even numbered stream ID: %d') % - frame.streamid) + _('received frame with even numbered stream ID: %d') + % frame.streamid + ) if frame.streamid not in self._incomingstreams: if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: self._state = 'errored' return self._makeerrorresult( - _('received frame on unknown inactive stream without ' - 'beginning of stream flag set')) + _( + 'received frame on unknown inactive stream without ' + 'beginning of stream flag set' + ) + ) self._incomingstreams[frame.streamid] = inputstream(frame.streamid) if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: # TODO handle decoding frames self._state = 'errored' - raise error.ProgrammingError('support for decoding stream payloads ' - 'not yet implemented') + raise error.ProgrammingError( + 'support for decoding stream payloads ' 'not yet implemented' + ) if frame.streamflags & STREAM_FLAG_END_STREAM: del self._incomingstreams[frame.streamid] @@ -1080,20 +1174,25 @@ if emitted: for frame in createcommandresponseeosframes( - stream, requestid): + stream, requestid + ): yield frame break except error.WireprotoCommandError as e: for frame in createcommanderrorresponse( - stream, requestid, e.message, e.messageargs): + stream, requestid, e.message, e.messageargs + ): yield frame break except Exception as e: for frame in createerrorframe( - stream, requestid, '%s' % stringutil.forcebytestr(e), - errtype='server'): + stream, + requestid, + '%s' % stringutil.forcebytestr(e), + errtype='server', + ): yield frame @@ -1106,14 +1205,16 @@ if emitted: raise error.ProgrammingError( 'alternatelocationresponse seen after initial ' - 'output object') + 'output object' + ) frame = stream.makestreamsettingsframe(requestid) if frame: yield frame yield createalternatelocationresponseframe( - stream, requestid, o) + stream, requestid, o + ) alternatelocationsent = True emitted = True @@ -1121,7 +1222,8 @@ if alternatelocationsent: raise error.ProgrammingError( - 'object follows alternatelocationresponse') + 'object follows alternatelocationresponse' + ) if not emitted: # Frame is optional. @@ -1147,9 +1249,11 @@ yield frame elif isinstance( - o, wireprototypes.indefinitebytestringresponse): + o, wireprototypes.indefinitebytestringresponse + ): for chunk in cborutil.streamencodebytestringfromiter( - o.chunks): + o.chunks + ): for frame in emitter.send(chunk): yield frame @@ -1161,9 +1265,9 @@ yield frame except Exception as e: - for frame in createerrorframe(stream, requestid, - '%s' % e, - errtype='server'): + for frame in createerrorframe( + stream, requestid, '%s' % e, errtype='server' + ): yield frame break @@ -1189,25 +1293,22 @@ for frame in gen: yield frame - return 'sendframes', { - 'framegen': makegen(), - } + return 'sendframes', {'framegen': makegen(),} def _handlesendframes(self, framegen): if self._deferoutput: self._bufferedframegens.append(framegen) return 'noop', {} else: - return 'sendframes', { - 'framegen': framegen, - } + return 'sendframes', {'framegen': framegen,} def onservererror(self, stream, requestid, msg): ensureserverstream(stream) def sendframes(): - for frame in createerrorframe(stream, requestid, msg, - errtype='server'): + for frame in createerrorframe( + stream, requestid, msg, errtype='server' + ): yield frame self._activecommands.remove(requestid) @@ -1219,8 +1320,9 @@ ensureserverstream(stream) def sendframes(): - for frame in createcommanderrorresponse(stream, requestid, message, - args): + for frame in createcommanderrorresponse( + stream, requestid, message, args + ): yield frame self._activecommands.remove(requestid) @@ -1250,17 +1352,16 @@ return s def _makeerrorresult(self, msg): - return 'error', { - 'message': msg, - } + return 'error', {'message': msg,} def _makeruncommandresult(self, requestid): entry = self._receivingcommands[requestid] if not entry['requestdone']: self._state = 'errored' - raise error.ProgrammingError('should not be called without ' - 'requestdone set') + raise error.ProgrammingError( + 'should not be called without ' 'requestdone set' + ) del self._receivingcommands[requestid] @@ -1276,7 +1377,8 @@ if b'name' not in request: self._state = 'errored' return self._makeerrorresult( - _('command request missing "name" field')) + _('command request missing "name" field') + ) if b'args' not in request: request[b'args'] = {} @@ -1284,18 +1386,19 @@ assert requestid not in self._activecommands self._activecommands.add(requestid) - return 'runcommand', { - 'requestid': requestid, - 'command': request[b'name'], - 'args': request[b'args'], - 'redirect': request.get(b'redirect'), - 'data': entry['data'].getvalue() if entry['data'] else None, - } + return ( + 'runcommand', + { + 'requestid': requestid, + 'command': request[b'name'], + 'args': request[b'args'], + 'redirect': request.get(b'redirect'), + 'data': entry['data'].getvalue() if entry['data'] else None, + }, + ) def _makewantframeresult(self): - return 'wantframe', { - 'state': self._state, - } + return 'wantframe', {'state': self._state,} def _validatecommandrequestframe(self, frame): new = frame.flags & FLAG_COMMAND_REQUEST_NEW @@ -1304,14 +1407,20 @@ if new and continuation: self._state = 'errored' return self._makeerrorresult( - _('received command request frame with both new and ' - 'continuation flags set')) + _( + 'received command request frame with both new and ' + 'continuation flags set' + ) + ) if not new and not continuation: self._state = 'errored' return self._makeerrorresult( - _('received command request frame with neither new nor ' - 'continuation flags set')) + _( + 'received command request frame with neither new nor ' + 'continuation flags set' + ) + ) def _onframeinitial(self, frame): # Called when we receive a frame when in the "initial" state. @@ -1327,8 +1436,12 @@ else: self._state = 'errored' return self._makeerrorresult( - _('expected sender protocol settings or command request ' - 'frame; got %d') % frame.typeid) + _( + 'expected sender protocol settings or command request ' + 'frame; got %d' + ) + % frame.typeid + ) def _onframeprotocolsettings(self, frame): assert self._state == 'protocol-settings-receiving' @@ -1337,8 +1450,9 @@ if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: self._state = 'errored' return self._makeerrorresult( - _('expected sender protocol settings frame; got %d') % - frame.typeid) + _('expected sender protocol settings frame; got %d') + % frame.typeid + ) more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS @@ -1346,14 +1460,20 @@ if more and eos: self._state = 'errored' return self._makeerrorresult( - _('sender protocol settings frame cannot have both ' - 'continuation and end of stream flags set')) + _( + 'sender protocol settings frame cannot have both ' + 'continuation and end of stream flags set' + ) + ) if not more and not eos: self._state = 'errored' return self._makeerrorresult( - _('sender protocol settings frame must have continuation or ' - 'end of stream flag set')) + _( + 'sender protocol settings frame must have continuation or ' + 'end of stream flag set' + ) + ) # TODO establish limits for maximum amount of data that can be # buffered. @@ -1363,7 +1483,8 @@ self._state = 'errored' return self._makeerrorresult( _('error decoding CBOR from sender protocol settings frame: %s') - % stringutil.forcebytestr(e)) + % stringutil.forcebytestr(e) + ) if more: return self._makewantframeresult() @@ -1376,12 +1497,16 @@ if not decoded: self._state = 'errored' return self._makeerrorresult( - _('sender protocol settings frame did not contain CBOR data')) + _('sender protocol settings frame did not contain CBOR data') + ) elif len(decoded) > 1: self._state = 'errored' return self._makeerrorresult( - _('sender protocol settings frame contained multiple CBOR ' - 'values')) + _( + 'sender protocol settings frame contained multiple CBOR ' + 'values' + ) + ) d = decoded[0] @@ -1398,7 +1523,8 @@ if frame.typeid != FRAME_TYPE_COMMAND_REQUEST: self._state = 'errored' return self._makeerrorresult( - _('expected command request frame; got %d') % frame.typeid) + _('expected command request frame; got %d') % frame.typeid + ) res = self._validatecommandrequestframe(frame) if res: @@ -1407,12 +1533,14 @@ if frame.requestid in self._receivingcommands: self._state = 'errored' return self._makeerrorresult( - _('request with ID %d already received') % frame.requestid) + _('request with ID %d already received') % frame.requestid + ) if frame.requestid in self._activecommands: self._state = 'errored' return self._makeerrorresult( - _('request with ID %d is already active') % frame.requestid) + _('request with ID %d is already active') % frame.requestid + ) new = frame.flags & FLAG_COMMAND_REQUEST_NEW moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES @@ -1421,7 +1549,8 @@ if not new: self._state = 'errored' return self._makeerrorresult( - _('received command request frame without new flag set')) + _('received command request frame without new flag set') + ) payload = util.bytesio() payload.write(frame.payload) @@ -1456,14 +1585,16 @@ if frame.requestid in self._activecommands: self._state = 'errored' return self._makeerrorresult( - _('received frame for request that is still active: %d') % - frame.requestid) + _('received frame for request that is still active: %d') + % frame.requestid + ) if frame.requestid not in self._receivingcommands: self._state = 'errored' return self._makeerrorresult( - _('received frame for request that is not receiving: %d') % - frame.requestid) + _('received frame for request that is not receiving: %d') + % frame.requestid + ) entry = self._receivingcommands[frame.requestid] @@ -1474,13 +1605,17 @@ if entry['requestdone']: self._state = 'errored' return self._makeerrorresult( - _('received command request frame when request frames ' - 'were supposedly done')) + _( + 'received command request frame when request frames ' + 'were supposedly done' + ) + ) if expectingdata != entry['expectingdata']: self._state = 'errored' return self._makeerrorresult( - _('mismatch between expect data flag and previous frame')) + _('mismatch between expect data flag and previous frame') + ) entry['payload'].write(frame.payload) @@ -1495,9 +1630,13 @@ elif frame.typeid == FRAME_TYPE_COMMAND_DATA: if not entry['expectingdata']: self._state = 'errored' - return self._makeerrorresult(_( - 'received command data frame for request that is not ' - 'expecting data: %d') % frame.requestid) + return self._makeerrorresult( + _( + 'received command data frame for request that is not ' + 'expecting data: %d' + ) + % frame.requestid + ) if entry['data'] is None: entry['data'] = util.bytesio() @@ -1505,8 +1644,9 @@ return self._handlecommanddataframe(frame, entry) else: self._state = 'errored' - return self._makeerrorresult(_( - 'received unexpected frame type: %d') % frame.typeid) + return self._makeerrorresult( + _('received unexpected frame type: %d') % frame.typeid + ) def _handlecommanddataframe(self, frame, entry): assert frame.typeid == FRAME_TYPE_COMMAND_DATA @@ -1521,12 +1661,14 @@ return self._makeruncommandresult(frame.requestid) else: self._state = 'errored' - return self._makeerrorresult(_('command data frame without ' - 'flags')) + return self._makeerrorresult( + _('command data frame without ' 'flags') + ) def _onframeerrored(self, frame): return self._makeerrorresult(_('server already errored')) + class commandrequest(object): """Represents a request to run a command.""" @@ -1538,6 +1680,7 @@ self.redirect = redirect self.state = 'pending' + class clientreactor(object): """Holds state of a client issuing frame-based protocol requests. @@ -1584,8 +1727,14 @@ is expected to follow or we're at the end of the response stream, respectively. """ - def __init__(self, ui, hasmultiplesend=False, buffersends=True, - clientcontentencoders=None): + + def __init__( + self, + ui, + hasmultiplesend=False, + buffersends=True, + clientcontentencoders=None, + ): """Create a new instance. ``hasmultiplesend`` indicates whether multiple sends are supported @@ -1634,24 +1783,28 @@ requestid = self._nextrequestid self._nextrequestid += 2 - request = commandrequest(requestid, name, args, datafh=datafh, - redirect=redirect) + request = commandrequest( + requestid, name, args, datafh=datafh, redirect=redirect + ) if self._buffersends: self._pendingrequests.append(request) return request, 'noop', {} else: if not self._cansend: - raise error.ProgrammingError('sends cannot be performed on ' - 'this instance') + raise error.ProgrammingError( + 'sends cannot be performed on ' 'this instance' + ) if not self._hasmultiplesend: self._cansend = False self._canissuecommands = False - return request, 'sendframes', { - 'framegen': self._makecommandframes(request), - } + return ( + request, + 'sendframes', + {'framegen': self._makecommandframes(request),}, + ) def flushcommands(self): """Request that all queued commands be sent. @@ -1667,8 +1820,9 @@ return 'noop', {} if not self._cansend: - raise error.ProgrammingError('sends cannot be performed on this ' - 'instance') + raise error.ProgrammingError( + 'sends cannot be performed on this ' 'instance' + ) # If the instance only allows sending once, mark that we have fired # our one shot. @@ -1682,9 +1836,7 @@ for frame in self._makecommandframes(request): yield frame - return 'sendframes', { - 'framegen': makeframes(), - } + return 'sendframes', {'framegen': makeframes(),} def _makecommandframes(self, request): """Emit frames to issue a command request. @@ -1698,22 +1850,27 @@ if not self._protocolsettingssent and self._clientcontentencoders: self._protocolsettingssent = True - payload = b''.join(cborutil.streamencode({ - b'contentencodings': self._clientcontentencoders, - })) + payload = b''.join( + cborutil.streamencode( + {b'contentencodings': self._clientcontentencoders,} + ) + ) yield self._outgoingstream.makeframe( requestid=request.requestid, typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS, flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS, - payload=payload) + payload=payload, + ) - res = createcommandframes(self._outgoingstream, - request.requestid, - request.name, - request.args, - datafh=request.datafh, - redirect=request.redirect) + res = createcommandframes( + self._outgoingstream, + request.requestid, + request.name, + request.args, + datafh=request.datafh, + redirect=request.redirect, + ) for frame in res: yield frame @@ -1727,21 +1884,29 @@ caller needs to take as a result of receiving this frame. """ if frame.streamid % 2: - return 'error', { - 'message': ( - _('received frame with odd numbered stream ID: %d') % - frame.streamid), - } + return ( + 'error', + { + 'message': ( + _('received frame with odd numbered stream ID: %d') + % frame.streamid + ), + }, + ) if frame.streamid not in self._incomingstreams: if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: - return 'error', { - 'message': _('received frame on unknown stream ' - 'without beginning of stream flag set'), - } + return ( + 'error', + { + 'message': _( + 'received frame on unknown stream ' + 'without beginning of stream flag set' + ), + }, + ) - self._incomingstreams[frame.streamid] = inputstream( - frame.streamid) + self._incomingstreams[frame.streamid] = inputstream(frame.streamid) stream = self._incomingstreams[frame.streamid] @@ -1758,10 +1923,15 @@ return self._onstreamsettingsframe(frame) if frame.requestid not in self._activerequests: - return 'error', { - 'message': (_('received frame for inactive request ID: %d') % - frame.requestid), - } + return ( + 'error', + { + 'message': ( + _('received frame for inactive request ID: %d') + % frame.requestid + ), + }, + ) request = self._activerequests[frame.requestid] request.state = 'receiving' @@ -1773,8 +1943,9 @@ meth = handlers.get(frame.typeid) if not meth: - raise error.ProgrammingError('unhandled frame type: %d' % - frame.typeid) + raise error.ProgrammingError( + 'unhandled frame type: %d' % frame.typeid + ) return meth(request, frame) @@ -1785,16 +1956,28 @@ eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS if more and eos: - return 'error', { - 'message': (_('stream encoding settings frame cannot have both ' - 'continuation and end of stream flags set')), - } + return ( + 'error', + { + 'message': ( + _( + 'stream encoding settings frame cannot have both ' + 'continuation and end of stream flags set' + ) + ), + }, + ) if not more and not eos: - return 'error', { - 'message': _('stream encoding settings frame must have ' - 'continuation or end of stream flag set'), - } + return ( + 'error', + { + 'message': _( + 'stream encoding settings frame must have ' + 'continuation or end of stream flag set' + ), + }, + ) if frame.streamid not in self._streamsettingsdecoders: decoder = cborutil.bufferingdecoder() @@ -1805,11 +1988,18 @@ try: decoder.decode(frame.payload) except Exception as e: - return 'error', { - 'message': (_('error decoding CBOR from stream encoding ' - 'settings frame: %s') % - stringutil.forcebytestr(e)), - } + return ( + 'error', + { + 'message': ( + _( + 'error decoding CBOR from stream encoding ' + 'settings frame: %s' + ) + % stringutil.forcebytestr(e) + ), + }, + ) if more: return 'noop', {} @@ -1820,20 +2010,30 @@ del self._streamsettingsdecoders[frame.streamid] if not decoded: - return 'error', { - 'message': _('stream encoding settings frame did not contain ' - 'CBOR data'), - } + return ( + 'error', + { + 'message': _( + 'stream encoding settings frame did not contain ' + 'CBOR data' + ), + }, + ) try: - self._incomingstreams[frame.streamid].setdecoder(self._ui, - decoded[0], - decoded[1:]) + self._incomingstreams[frame.streamid].setdecoder( + self._ui, decoded[0], decoded[1:] + ) except Exception as e: - return 'error', { - 'message': (_('error setting stream decoder: %s') % - stringutil.forcebytestr(e)), - } + return ( + 'error', + { + 'message': ( + _('error setting stream decoder: %s') + % stringutil.forcebytestr(e) + ), + }, + ) return 'noop', {} @@ -1842,12 +2042,15 @@ request.state = 'received' del self._activerequests[request.requestid] - return 'responsedata', { - 'request': request, - 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION, - 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS, - 'data': frame.payload, - } + return ( + 'responsedata', + { + 'request': request, + 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION, + 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS, + 'data': frame.payload, + }, + ) def _onerrorresponseframe(self, request, frame): request.state = 'errored' @@ -1856,8 +2059,7 @@ # The payload should be a CBOR map. m = cborutil.decodeall(frame.payload)[0] - return 'error', { - 'request': request, - 'type': m['type'], - 'message': m['message'], - } + return ( + 'error', + {'request': request, 'type': m['type'], 'message': m['message'],}, + )