Mercurial > public > mercurial-scm > hg
comparison mercurial/utils/cborutil.py @ 52693:5e09c6b5b795
typing: add type annotations to most of `mercurial/utils/cborutil.py`
These are the easy/obvious/documented ones. We'll leave the harder ones for
later.
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Sun, 12 Jan 2025 22:05:19 -0500 |
parents | 279e217d6041 |
children | 8a2091a2f974 |
comparison
equal
deleted
inserted
replaced
52692:45dc0f874b8c | 52693:5e09c6b5b795 |
---|---|
10 import struct | 10 import struct |
11 import typing | 11 import typing |
12 | 12 |
13 if typing.TYPE_CHECKING: | 13 if typing.TYPE_CHECKING: |
14 from typing import ( | 14 from typing import ( |
15 Iterable, | |
15 Iterator, | 16 Iterator, |
16 ) | 17 ) |
17 | 18 |
18 # Very short very of RFC 7049... | 19 # Very short very of RFC 7049... |
19 # | 20 # |
67 # The break ends an indefinite length item. | 68 # The break ends an indefinite length item. |
68 BREAK = b'\xff' | 69 BREAK = b'\xff' |
69 BREAK_INT = 255 | 70 BREAK_INT = 255 |
70 | 71 |
71 | 72 |
72 def encodelength(majortype, length): | 73 def encodelength(majortype: int, length: int) -> bytes: |
73 """Obtain a value encoding the major type and its length.""" | 74 """Obtain a value encoding the major type and its length.""" |
74 if length < 24: | 75 if length < 24: |
75 return ENCODED_LENGTH_1.pack(majortype << 5 | length) | 76 return ENCODED_LENGTH_1.pack(majortype << 5 | length) |
76 elif length < 256: | 77 elif length < 256: |
77 return ENCODED_LENGTH_2.pack(majortype << 5 | 24, length) | 78 return ENCODED_LENGTH_2.pack(majortype << 5 | 24, length) |
81 return ENCODED_LENGTH_4.pack(majortype << 5 | 26, length) | 82 return ENCODED_LENGTH_4.pack(majortype << 5 | 26, length) |
82 else: | 83 else: |
83 return ENCODED_LENGTH_5.pack(majortype << 5 | 27, length) | 84 return ENCODED_LENGTH_5.pack(majortype << 5 | 27, length) |
84 | 85 |
85 | 86 |
86 def streamencodebytestring(v): | 87 def streamencodebytestring(v: bytes) -> Iterator[bytes]: |
87 yield encodelength(MAJOR_TYPE_BYTESTRING, len(v)) | 88 yield encodelength(MAJOR_TYPE_BYTESTRING, len(v)) |
88 yield v | 89 yield v |
89 | 90 |
90 | 91 |
91 def streamencodebytestringfromiter(it): | 92 def streamencodebytestringfromiter(it: Iterable[bytes]) -> Iterator[bytes]: |
92 """Convert an iterator of chunks to an indefinite bytestring. | 93 """Convert an iterator of chunks to an indefinite bytestring. |
93 | 94 |
94 Given an input that is iterable and each element in the iterator is | 95 Given an input that is iterable and each element in the iterator is |
95 representable as bytes, emit an indefinite length bytestring. | 96 representable as bytes, emit an indefinite length bytestring. |
96 """ | 97 """ |
101 yield chunk | 102 yield chunk |
102 | 103 |
103 yield BREAK | 104 yield BREAK |
104 | 105 |
105 | 106 |
106 def streamencodeindefinitebytestring(source, chunksize=65536): | 107 def streamencodeindefinitebytestring( |
108 source, chunksize: int = 65536 | |
109 ) -> Iterator[bytes]: | |
107 """Given a large source buffer, emit as an indefinite length bytestring. | 110 """Given a large source buffer, emit as an indefinite length bytestring. |
108 | 111 |
109 This is a generator of chunks constituting the encoded CBOR data. | 112 This is a generator of chunks constituting the encoded CBOR data. |
110 """ | 113 """ |
111 yield BEGIN_INDEFINITE_BYTESTRING | 114 yield BEGIN_INDEFINITE_BYTESTRING |
124 break | 127 break |
125 | 128 |
126 yield BREAK | 129 yield BREAK |
127 | 130 |
128 | 131 |
129 def streamencodeint(v): | 132 def streamencodeint(v: int) -> Iterator[bytes]: |
130 if v >= 18446744073709551616 or v < -18446744073709551616: | 133 if v >= 18446744073709551616 or v < -18446744073709551616: |
131 raise ValueError(b'big integers not supported') | 134 raise ValueError(b'big integers not supported') |
132 | 135 |
133 if v >= 0: | 136 if v >= 0: |
134 yield encodelength(MAJOR_TYPE_UINT, v) | 137 yield encodelength(MAJOR_TYPE_UINT, v) |
158 | 161 |
159 def _mixedtypesortkey(v): | 162 def _mixedtypesortkey(v): |
160 return type(v).__name__, v | 163 return type(v).__name__, v |
161 | 164 |
162 | 165 |
163 def streamencodeset(s): | 166 def streamencodeset(s) -> Iterator[bytes]: |
164 # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml defines | 167 # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml defines |
165 # semantic tag 258 for finite sets. | 168 # semantic tag 258 for finite sets. |
166 yield encodelength(MAJOR_TYPE_SEMANTIC, SEMANTIC_TAG_FINITE_SET) | 169 yield encodelength(MAJOR_TYPE_SEMANTIC, SEMANTIC_TAG_FINITE_SET) |
167 | 170 |
168 yield from streamencodearray(sorted(s, key=_mixedtypesortkey)) | 171 yield from streamencodearray(sorted(s, key=_mixedtypesortkey)) |
169 | 172 |
170 | 173 |
171 def streamencodemap(d) -> Iterator[bytes]: | 174 def streamencodemap(d: dict) -> Iterator[bytes]: |
172 """Encode dictionary to a generator. | 175 """Encode dictionary to a generator. |
173 | 176 |
174 Does not supporting indefinite length dictionaries. | 177 Does not supporting indefinite length dictionaries. |
175 """ | 178 """ |
176 yield encodelength(MAJOR_TYPE_MAP, len(d)) | 179 yield encodelength(MAJOR_TYPE_MAP, len(d)) |
178 for key, value in sorted(d.items(), key=lambda x: _mixedtypesortkey(x[0])): | 181 for key, value in sorted(d.items(), key=lambda x: _mixedtypesortkey(x[0])): |
179 yield from streamencode(key) | 182 yield from streamencode(key) |
180 yield from streamencode(value) | 183 yield from streamencode(value) |
181 | 184 |
182 | 185 |
183 def streamencodemapfromiter(it) -> Iterator[bytes]: | 186 def streamencodemapfromiter(it: Iterable) -> Iterator[bytes]: |
184 """Given an iterable of (key, value), encode to an indefinite length map.""" | 187 """Given an iterable of (key, value), encode to an indefinite length map.""" |
185 yield BEGIN_INDEFINITE_MAP | 188 yield BEGIN_INDEFINITE_MAP |
186 | 189 |
187 for key, value in it: | 190 for key, value in it: |
188 yield from streamencode(key) | 191 yield from streamencode(key) |
189 yield from streamencode(value) | 192 yield from streamencode(value) |
190 | 193 |
191 yield BREAK | 194 yield BREAK |
192 | 195 |
193 | 196 |
194 def streamencodebool(b): | 197 def streamencodebool(b: bool) -> Iterator[bytes]: |
195 # major type 7, simple value 20 and 21. | 198 # major type 7, simple value 20 and 21. |
196 yield b'\xf5' if b else b'\xf4' | 199 yield b'\xf5' if b else b'\xf4' |
197 | 200 |
198 | 201 |
199 def streamencodenone(v): | 202 def streamencodenone(v: None) -> Iterator[bytes]: |
200 # major type 7, simple value 22. | 203 # major type 7, simple value 22. |
201 yield b'\xf6' | 204 yield b'\xf6' |
202 | 205 |
203 | 206 |
204 STREAM_ENCODERS = { | 207 STREAM_ENCODERS = { |
211 bool: streamencodebool, | 214 bool: streamencodebool, |
212 type(None): streamencodenone, | 215 type(None): streamencodenone, |
213 } | 216 } |
214 | 217 |
215 | 218 |
216 def streamencode(v): | 219 def streamencode(v) -> Iterator[bytes]: |
217 """Encode a value in a streaming manner. | 220 """Encode a value in a streaming manner. |
218 | 221 |
219 Given an input object, encode it to CBOR recursively. | 222 Given an input object, encode it to CBOR recursively. |
220 | 223 |
221 Returns a generator of CBOR encoded bytes. There is no guarantee | 224 Returns a generator of CBOR encoded bytes. There is no guarantee |
241 | 244 |
242 class CBORDecodeError(Exception): | 245 class CBORDecodeError(Exception): |
243 """Represents an error decoding CBOR.""" | 246 """Represents an error decoding CBOR.""" |
244 | 247 |
245 | 248 |
246 def _elementtointeger(b, i): | 249 def _elementtointeger(b, i: int) -> int: |
247 return b[i] | 250 return b[i] |
248 | 251 |
249 | 252 |
250 STRUCT_BIG_UBYTE = struct.Struct('>B') | 253 STRUCT_BIG_UBYTE = struct.Struct('>B') |
251 STRUCT_BIG_USHORT = struct.Struct(b'>H') | 254 STRUCT_BIG_USHORT = struct.Struct(b'>H') |
258 SPECIAL_START_MAP = 3 | 261 SPECIAL_START_MAP = 3 |
259 SPECIAL_START_SET = 4 | 262 SPECIAL_START_SET = 4 |
260 SPECIAL_INDEFINITE_BREAK = 5 | 263 SPECIAL_INDEFINITE_BREAK = 5 |
261 | 264 |
262 | 265 |
263 def decodeitem(b, offset=0): | 266 def decodeitem(b, offset: int = 0): |
264 """Decode a new CBOR value from a buffer at offset. | 267 """Decode a new CBOR value from a buffer at offset. |
265 | 268 |
266 This function attempts to decode up to one complete CBOR value | 269 This function attempts to decode up to one complete CBOR value |
267 from ``b`` starting at offset ``offset``. | 270 from ``b`` starting at offset ``offset``. |
268 | 271 |
304 elif majortype == MAJOR_TYPE_NEGINT: | 307 elif majortype == MAJOR_TYPE_NEGINT: |
305 # Negative integers are the same as UINT except inverted minus 1. | 308 # Negative integers are the same as UINT except inverted minus 1. |
306 complete, value, readcount = decodeuint(subtype, b, offset) | 309 complete, value, readcount = decodeuint(subtype, b, offset) |
307 | 310 |
308 if complete: | 311 if complete: |
312 assert value is not None # help pytype | |
309 return True, -value - 1, readcount + 1, SPECIAL_NONE | 313 return True, -value - 1, readcount + 1, SPECIAL_NONE |
310 else: | 314 else: |
311 return False, None, readcount, SPECIAL_NONE | 315 return False, None, readcount, SPECIAL_NONE |
312 | 316 |
313 elif majortype == MAJOR_TYPE_BYTESTRING: | 317 elif majortype == MAJOR_TYPE_BYTESTRING: |
412 raise CBORDecodeError(b'special type %d not allowed' % subtype) | 416 raise CBORDecodeError(b'special type %d not allowed' % subtype) |
413 else: | 417 else: |
414 assert False | 418 assert False |
415 | 419 |
416 | 420 |
417 def decodeuint(subtype, b, offset=0, allowindefinite=False): | 421 def decodeuint( |
422 subtype: int, b: bytes, offset: int = 0, allowindefinite: bool = False | |
423 ): | |
418 """Decode an unsigned integer. | 424 """Decode an unsigned integer. |
419 | 425 |
420 ``subtype`` is the lower 5 bits from the initial byte CBOR item | 426 ``subtype`` is the lower 5 bits from the initial byte CBOR item |
421 "header." ``b`` is a buffer containing bytes. ``offset`` points to | 427 "header." ``b`` is a buffer containing bytes. ``offset`` points to |
422 the index of the first byte after the byte that ``subtype`` was | 428 the index of the first byte after the byte that ``subtype`` was |
471 This behaves like a ``bytes`` but in addition has the ``isfirst`` | 477 This behaves like a ``bytes`` but in addition has the ``isfirst`` |
472 and ``islast`` attributes indicating whether this chunk is the first | 478 and ``islast`` attributes indicating whether this chunk is the first |
473 or last in an indefinite length bytestring. | 479 or last in an indefinite length bytestring. |
474 """ | 480 """ |
475 | 481 |
476 def __new__(cls, v, first=False, last=False): | 482 isfirst: bool |
483 islast: bool | |
484 | |
485 def __new__(cls, v, first: bool = False, last: bool = False): | |
477 self = bytes.__new__(cls, v) | 486 self = bytes.__new__(cls, v) |
478 self.isfirst = first | 487 self.isfirst = first |
479 self.islast = last | 488 self.islast = last |
480 | 489 |
481 return self | 490 return self |
544 _STATE_WANT_ARRAY_VALUE = 3 | 553 _STATE_WANT_ARRAY_VALUE = 3 |
545 _STATE_WANT_SET_VALUE = 4 | 554 _STATE_WANT_SET_VALUE = 4 |
546 _STATE_WANT_BYTESTRING_CHUNK_FIRST = 5 | 555 _STATE_WANT_BYTESTRING_CHUNK_FIRST = 5 |
547 _STATE_WANT_BYTESTRING_CHUNK_SUBSEQUENT = 6 | 556 _STATE_WANT_BYTESTRING_CHUNK_SUBSEQUENT = 6 |
548 | 557 |
549 def __init__(self): | 558 def __init__(self) -> None: |
550 # TODO add support for limiting size of bytestrings | 559 # TODO add support for limiting size of bytestrings |
551 # TODO add support for limiting number of keys / values in collections | 560 # TODO add support for limiting number of keys / values in collections |
552 # TODO add support for limiting size of buffered partial values | 561 # TODO add support for limiting size of buffered partial values |
553 | 562 |
554 self.decodedbytecount = 0 | 563 self.decodedbytecount = 0 |
564 | 573 |
565 # Fully decoded values available for retrieval. | 574 # Fully decoded values available for retrieval. |
566 self._decodedvalues = [] | 575 self._decodedvalues = [] |
567 | 576 |
568 @property | 577 @property |
569 def inprogress(self): | 578 def inprogress(self) -> bool: |
570 """Whether the decoder has partially decoded a value.""" | 579 """Whether the decoder has partially decoded a value.""" |
571 return self._state != self._STATE_NONE | 580 return self._state != self._STATE_NONE |
572 | 581 |
573 def decode(self, b, offset=0): | 582 def decode(self, b, offset: int = 0) -> tuple[bool, int, int]: |
574 """Attempt to decode bytes from an input buffer. | 583 """Attempt to decode bytes from an input buffer. |
575 | 584 |
576 ``b`` is a collection of bytes and ``offset`` is the byte | 585 ``b`` is a collection of bytes and ``offset`` is the byte |
577 offset within that buffer from which to begin reading data. | 586 offset within that buffer from which to begin reading data. |
578 | 587 |
983 | 992 |
984 TODO consider adding limits as to the maximum amount of data that can | 993 TODO consider adding limits as to the maximum amount of data that can |
985 be buffered. | 994 be buffered. |
986 """ | 995 """ |
987 | 996 |
988 def __init__(self): | 997 _decoder: sansiodecoder |
998 _chunks: list | |
999 _wanted: int | |
1000 | |
1001 def __init__(self) -> None: | |
989 self._decoder = sansiodecoder() | 1002 self._decoder = sansiodecoder() |
990 self._chunks = [] | 1003 self._chunks = [] |
991 self._wanted = 0 | 1004 self._wanted = 0 |
992 | 1005 |
993 def decode(self, b): | 1006 def decode(self, b) -> tuple[bool, int, int]: |
994 """Attempt to decode bytes to CBOR values. | 1007 """Attempt to decode bytes to CBOR values. |
995 | 1008 |
996 Returns a tuple with the following fields: | 1009 Returns a tuple with the following fields: |
997 | 1010 |
998 * Bool indicating whether new values are available for retrieval. | 1011 * Bool indicating whether new values are available for retrieval. |