comparison mercurial/utils/cborutil.py @ 52693:5e09c6b5b795

typing: add type annotations to most of `mercurial/utils/cborutil.py` These are the easy/obvious/documented ones. We'll leave the harder ones for later.
author Matt Harbison <matt_harbison@yahoo.com>
date Sun, 12 Jan 2025 22:05:19 -0500
parents 279e217d6041
children 8a2091a2f974
comparison
equal deleted inserted replaced
52692:45dc0f874b8c 52693:5e09c6b5b795
10 import struct 10 import struct
11 import typing 11 import typing
12 12
13 if typing.TYPE_CHECKING: 13 if typing.TYPE_CHECKING:
14 from typing import ( 14 from typing import (
15 Iterable,
15 Iterator, 16 Iterator,
16 ) 17 )
17 18
18 # Very short very of RFC 7049... 19 # Very short very of RFC 7049...
19 # 20 #
67 # The break ends an indefinite length item. 68 # The break ends an indefinite length item.
68 BREAK = b'\xff' 69 BREAK = b'\xff'
69 BREAK_INT = 255 70 BREAK_INT = 255
70 71
71 72
72 def encodelength(majortype, length): 73 def encodelength(majortype: int, length: int) -> bytes:
73 """Obtain a value encoding the major type and its length.""" 74 """Obtain a value encoding the major type and its length."""
74 if length < 24: 75 if length < 24:
75 return ENCODED_LENGTH_1.pack(majortype << 5 | length) 76 return ENCODED_LENGTH_1.pack(majortype << 5 | length)
76 elif length < 256: 77 elif length < 256:
77 return ENCODED_LENGTH_2.pack(majortype << 5 | 24, length) 78 return ENCODED_LENGTH_2.pack(majortype << 5 | 24, length)
81 return ENCODED_LENGTH_4.pack(majortype << 5 | 26, length) 82 return ENCODED_LENGTH_4.pack(majortype << 5 | 26, length)
82 else: 83 else:
83 return ENCODED_LENGTH_5.pack(majortype << 5 | 27, length) 84 return ENCODED_LENGTH_5.pack(majortype << 5 | 27, length)
84 85
85 86
86 def streamencodebytestring(v): 87 def streamencodebytestring(v: bytes) -> Iterator[bytes]:
87 yield encodelength(MAJOR_TYPE_BYTESTRING, len(v)) 88 yield encodelength(MAJOR_TYPE_BYTESTRING, len(v))
88 yield v 89 yield v
89 90
90 91
91 def streamencodebytestringfromiter(it): 92 def streamencodebytestringfromiter(it: Iterable[bytes]) -> Iterator[bytes]:
92 """Convert an iterator of chunks to an indefinite bytestring. 93 """Convert an iterator of chunks to an indefinite bytestring.
93 94
94 Given an input that is iterable and each element in the iterator is 95 Given an input that is iterable and each element in the iterator is
95 representable as bytes, emit an indefinite length bytestring. 96 representable as bytes, emit an indefinite length bytestring.
96 """ 97 """
101 yield chunk 102 yield chunk
102 103
103 yield BREAK 104 yield BREAK
104 105
105 106
106 def streamencodeindefinitebytestring(source, chunksize=65536): 107 def streamencodeindefinitebytestring(
108 source, chunksize: int = 65536
109 ) -> Iterator[bytes]:
107 """Given a large source buffer, emit as an indefinite length bytestring. 110 """Given a large source buffer, emit as an indefinite length bytestring.
108 111
109 This is a generator of chunks constituting the encoded CBOR data. 112 This is a generator of chunks constituting the encoded CBOR data.
110 """ 113 """
111 yield BEGIN_INDEFINITE_BYTESTRING 114 yield BEGIN_INDEFINITE_BYTESTRING
124 break 127 break
125 128
126 yield BREAK 129 yield BREAK
127 130
128 131
129 def streamencodeint(v): 132 def streamencodeint(v: int) -> Iterator[bytes]:
130 if v >= 18446744073709551616 or v < -18446744073709551616: 133 if v >= 18446744073709551616 or v < -18446744073709551616:
131 raise ValueError(b'big integers not supported') 134 raise ValueError(b'big integers not supported')
132 135
133 if v >= 0: 136 if v >= 0:
134 yield encodelength(MAJOR_TYPE_UINT, v) 137 yield encodelength(MAJOR_TYPE_UINT, v)
158 161
159 def _mixedtypesortkey(v): 162 def _mixedtypesortkey(v):
160 return type(v).__name__, v 163 return type(v).__name__, v
161 164
162 165
163 def streamencodeset(s): 166 def streamencodeset(s) -> Iterator[bytes]:
164 # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml defines 167 # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml defines
165 # semantic tag 258 for finite sets. 168 # semantic tag 258 for finite sets.
166 yield encodelength(MAJOR_TYPE_SEMANTIC, SEMANTIC_TAG_FINITE_SET) 169 yield encodelength(MAJOR_TYPE_SEMANTIC, SEMANTIC_TAG_FINITE_SET)
167 170
168 yield from streamencodearray(sorted(s, key=_mixedtypesortkey)) 171 yield from streamencodearray(sorted(s, key=_mixedtypesortkey))
169 172
170 173
171 def streamencodemap(d) -> Iterator[bytes]: 174 def streamencodemap(d: dict) -> Iterator[bytes]:
172 """Encode dictionary to a generator. 175 """Encode dictionary to a generator.
173 176
174 Does not supporting indefinite length dictionaries. 177 Does not supporting indefinite length dictionaries.
175 """ 178 """
176 yield encodelength(MAJOR_TYPE_MAP, len(d)) 179 yield encodelength(MAJOR_TYPE_MAP, len(d))
178 for key, value in sorted(d.items(), key=lambda x: _mixedtypesortkey(x[0])): 181 for key, value in sorted(d.items(), key=lambda x: _mixedtypesortkey(x[0])):
179 yield from streamencode(key) 182 yield from streamencode(key)
180 yield from streamencode(value) 183 yield from streamencode(value)
181 184
182 185
183 def streamencodemapfromiter(it) -> Iterator[bytes]: 186 def streamencodemapfromiter(it: Iterable) -> Iterator[bytes]:
184 """Given an iterable of (key, value), encode to an indefinite length map.""" 187 """Given an iterable of (key, value), encode to an indefinite length map."""
185 yield BEGIN_INDEFINITE_MAP 188 yield BEGIN_INDEFINITE_MAP
186 189
187 for key, value in it: 190 for key, value in it:
188 yield from streamencode(key) 191 yield from streamencode(key)
189 yield from streamencode(value) 192 yield from streamencode(value)
190 193
191 yield BREAK 194 yield BREAK
192 195
193 196
194 def streamencodebool(b): 197 def streamencodebool(b: bool) -> Iterator[bytes]:
195 # major type 7, simple value 20 and 21. 198 # major type 7, simple value 20 and 21.
196 yield b'\xf5' if b else b'\xf4' 199 yield b'\xf5' if b else b'\xf4'
197 200
198 201
199 def streamencodenone(v): 202 def streamencodenone(v: None) -> Iterator[bytes]:
200 # major type 7, simple value 22. 203 # major type 7, simple value 22.
201 yield b'\xf6' 204 yield b'\xf6'
202 205
203 206
204 STREAM_ENCODERS = { 207 STREAM_ENCODERS = {
211 bool: streamencodebool, 214 bool: streamencodebool,
212 type(None): streamencodenone, 215 type(None): streamencodenone,
213 } 216 }
214 217
215 218
216 def streamencode(v): 219 def streamencode(v) -> Iterator[bytes]:
217 """Encode a value in a streaming manner. 220 """Encode a value in a streaming manner.
218 221
219 Given an input object, encode it to CBOR recursively. 222 Given an input object, encode it to CBOR recursively.
220 223
221 Returns a generator of CBOR encoded bytes. There is no guarantee 224 Returns a generator of CBOR encoded bytes. There is no guarantee
241 244
242 class CBORDecodeError(Exception): 245 class CBORDecodeError(Exception):
243 """Represents an error decoding CBOR.""" 246 """Represents an error decoding CBOR."""
244 247
245 248
246 def _elementtointeger(b, i): 249 def _elementtointeger(b, i: int) -> int:
247 return b[i] 250 return b[i]
248 251
249 252
250 STRUCT_BIG_UBYTE = struct.Struct('>B') 253 STRUCT_BIG_UBYTE = struct.Struct('>B')
251 STRUCT_BIG_USHORT = struct.Struct(b'>H') 254 STRUCT_BIG_USHORT = struct.Struct(b'>H')
258 SPECIAL_START_MAP = 3 261 SPECIAL_START_MAP = 3
259 SPECIAL_START_SET = 4 262 SPECIAL_START_SET = 4
260 SPECIAL_INDEFINITE_BREAK = 5 263 SPECIAL_INDEFINITE_BREAK = 5
261 264
262 265
263 def decodeitem(b, offset=0): 266 def decodeitem(b, offset: int = 0):
264 """Decode a new CBOR value from a buffer at offset. 267 """Decode a new CBOR value from a buffer at offset.
265 268
266 This function attempts to decode up to one complete CBOR value 269 This function attempts to decode up to one complete CBOR value
267 from ``b`` starting at offset ``offset``. 270 from ``b`` starting at offset ``offset``.
268 271
304 elif majortype == MAJOR_TYPE_NEGINT: 307 elif majortype == MAJOR_TYPE_NEGINT:
305 # Negative integers are the same as UINT except inverted minus 1. 308 # Negative integers are the same as UINT except inverted minus 1.
306 complete, value, readcount = decodeuint(subtype, b, offset) 309 complete, value, readcount = decodeuint(subtype, b, offset)
307 310
308 if complete: 311 if complete:
312 assert value is not None # help pytype
309 return True, -value - 1, readcount + 1, SPECIAL_NONE 313 return True, -value - 1, readcount + 1, SPECIAL_NONE
310 else: 314 else:
311 return False, None, readcount, SPECIAL_NONE 315 return False, None, readcount, SPECIAL_NONE
312 316
313 elif majortype == MAJOR_TYPE_BYTESTRING: 317 elif majortype == MAJOR_TYPE_BYTESTRING:
412 raise CBORDecodeError(b'special type %d not allowed' % subtype) 416 raise CBORDecodeError(b'special type %d not allowed' % subtype)
413 else: 417 else:
414 assert False 418 assert False
415 419
416 420
417 def decodeuint(subtype, b, offset=0, allowindefinite=False): 421 def decodeuint(
422 subtype: int, b: bytes, offset: int = 0, allowindefinite: bool = False
423 ):
418 """Decode an unsigned integer. 424 """Decode an unsigned integer.
419 425
420 ``subtype`` is the lower 5 bits from the initial byte CBOR item 426 ``subtype`` is the lower 5 bits from the initial byte CBOR item
421 "header." ``b`` is a buffer containing bytes. ``offset`` points to 427 "header." ``b`` is a buffer containing bytes. ``offset`` points to
422 the index of the first byte after the byte that ``subtype`` was 428 the index of the first byte after the byte that ``subtype`` was
471 This behaves like a ``bytes`` but in addition has the ``isfirst`` 477 This behaves like a ``bytes`` but in addition has the ``isfirst``
472 and ``islast`` attributes indicating whether this chunk is the first 478 and ``islast`` attributes indicating whether this chunk is the first
473 or last in an indefinite length bytestring. 479 or last in an indefinite length bytestring.
474 """ 480 """
475 481
476 def __new__(cls, v, first=False, last=False): 482 isfirst: bool
483 islast: bool
484
485 def __new__(cls, v, first: bool = False, last: bool = False):
477 self = bytes.__new__(cls, v) 486 self = bytes.__new__(cls, v)
478 self.isfirst = first 487 self.isfirst = first
479 self.islast = last 488 self.islast = last
480 489
481 return self 490 return self
544 _STATE_WANT_ARRAY_VALUE = 3 553 _STATE_WANT_ARRAY_VALUE = 3
545 _STATE_WANT_SET_VALUE = 4 554 _STATE_WANT_SET_VALUE = 4
546 _STATE_WANT_BYTESTRING_CHUNK_FIRST = 5 555 _STATE_WANT_BYTESTRING_CHUNK_FIRST = 5
547 _STATE_WANT_BYTESTRING_CHUNK_SUBSEQUENT = 6 556 _STATE_WANT_BYTESTRING_CHUNK_SUBSEQUENT = 6
548 557
549 def __init__(self): 558 def __init__(self) -> None:
550 # TODO add support for limiting size of bytestrings 559 # TODO add support for limiting size of bytestrings
551 # TODO add support for limiting number of keys / values in collections 560 # TODO add support for limiting number of keys / values in collections
552 # TODO add support for limiting size of buffered partial values 561 # TODO add support for limiting size of buffered partial values
553 562
554 self.decodedbytecount = 0 563 self.decodedbytecount = 0
564 573
565 # Fully decoded values available for retrieval. 574 # Fully decoded values available for retrieval.
566 self._decodedvalues = [] 575 self._decodedvalues = []
567 576
568 @property 577 @property
569 def inprogress(self): 578 def inprogress(self) -> bool:
570 """Whether the decoder has partially decoded a value.""" 579 """Whether the decoder has partially decoded a value."""
571 return self._state != self._STATE_NONE 580 return self._state != self._STATE_NONE
572 581
573 def decode(self, b, offset=0): 582 def decode(self, b, offset: int = 0) -> tuple[bool, int, int]:
574 """Attempt to decode bytes from an input buffer. 583 """Attempt to decode bytes from an input buffer.
575 584
576 ``b`` is a collection of bytes and ``offset`` is the byte 585 ``b`` is a collection of bytes and ``offset`` is the byte
577 offset within that buffer from which to begin reading data. 586 offset within that buffer from which to begin reading data.
578 587
983 992
984 TODO consider adding limits as to the maximum amount of data that can 993 TODO consider adding limits as to the maximum amount of data that can
985 be buffered. 994 be buffered.
986 """ 995 """
987 996
988 def __init__(self): 997 _decoder: sansiodecoder
998 _chunks: list
999 _wanted: int
1000
1001 def __init__(self) -> None:
989 self._decoder = sansiodecoder() 1002 self._decoder = sansiodecoder()
990 self._chunks = [] 1003 self._chunks = []
991 self._wanted = 0 1004 self._wanted = 0
992 1005
993 def decode(self, b): 1006 def decode(self, b) -> tuple[bool, int, int]:
994 """Attempt to decode bytes to CBOR values. 1007 """Attempt to decode bytes to CBOR values.
995 1008
996 Returns a tuple with the following fields: 1009 Returns a tuple with the following fields:
997 1010
998 * Bool indicating whether new values are available for retrieval. 1011 * Bool indicating whether new values are available for retrieval.