--- a/mercurial/utils/cborutil.py Sun Jan 12 21:35:30 2025 -0500
+++ b/mercurial/utils/cborutil.py Sun Jan 12 22:05:19 2025 -0500
@@ -12,6 +12,7 @@
if typing.TYPE_CHECKING:
from typing import (
+ Iterable,
Iterator,
)
@@ -69,7 +70,7 @@
BREAK_INT = 255
-def encodelength(majortype, length):
+def encodelength(majortype: int, length: int) -> bytes:
"""Obtain a value encoding the major type and its length."""
if length < 24:
return ENCODED_LENGTH_1.pack(majortype << 5 | length)
@@ -83,12 +84,12 @@
return ENCODED_LENGTH_5.pack(majortype << 5 | 27, length)
-def streamencodebytestring(v):
+def streamencodebytestring(v: bytes) -> Iterator[bytes]:
yield encodelength(MAJOR_TYPE_BYTESTRING, len(v))
yield v
-def streamencodebytestringfromiter(it):
+def streamencodebytestringfromiter(it: Iterable[bytes]) -> Iterator[bytes]:
"""Convert an iterator of chunks to an indefinite bytestring.
Given an input that is iterable and each element in the iterator is
@@ -103,7 +104,9 @@
yield BREAK
-def streamencodeindefinitebytestring(source, chunksize=65536):
+def streamencodeindefinitebytestring(
+ source, chunksize: int = 65536
+) -> Iterator[bytes]:
"""Given a large source buffer, emit as an indefinite length bytestring.
This is a generator of chunks constituting the encoded CBOR data.
@@ -126,7 +129,7 @@
yield BREAK
-def streamencodeint(v):
+def streamencodeint(v: int) -> Iterator[bytes]:
if v >= 18446744073709551616 or v < -18446744073709551616:
raise ValueError(b'big integers not supported')
@@ -160,7 +163,7 @@
return type(v).__name__, v
-def streamencodeset(s):
+def streamencodeset(s) -> Iterator[bytes]:
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml defines
# semantic tag 258 for finite sets.
yield encodelength(MAJOR_TYPE_SEMANTIC, SEMANTIC_TAG_FINITE_SET)
@@ -168,7 +171,7 @@
yield from streamencodearray(sorted(s, key=_mixedtypesortkey))
-def streamencodemap(d) -> Iterator[bytes]:
+def streamencodemap(d: dict) -> Iterator[bytes]:
"""Encode dictionary to a generator.
Does not supporting indefinite length dictionaries.
@@ -180,7 +183,7 @@
yield from streamencode(value)
-def streamencodemapfromiter(it) -> Iterator[bytes]:
+def streamencodemapfromiter(it: Iterable) -> Iterator[bytes]:
"""Given an iterable of (key, value), encode to an indefinite length map."""
yield BEGIN_INDEFINITE_MAP
@@ -191,12 +194,12 @@
yield BREAK
-def streamencodebool(b):
+def streamencodebool(b: bool) -> Iterator[bytes]:
# major type 7, simple value 20 and 21.
yield b'\xf5' if b else b'\xf4'
-def streamencodenone(v):
+def streamencodenone(v: None) -> Iterator[bytes]:
# major type 7, simple value 22.
yield b'\xf6'
@@ -213,7 +216,7 @@
}
-def streamencode(v):
+def streamencode(v) -> Iterator[bytes]:
"""Encode a value in a streaming manner.
Given an input object, encode it to CBOR recursively.
@@ -243,7 +246,7 @@
"""Represents an error decoding CBOR."""
-def _elementtointeger(b, i):
+def _elementtointeger(b, i: int) -> int:
return b[i]
@@ -260,7 +263,7 @@
SPECIAL_INDEFINITE_BREAK = 5
-def decodeitem(b, offset=0):
+def decodeitem(b, offset: int = 0):
"""Decode a new CBOR value from a buffer at offset.
This function attempts to decode up to one complete CBOR value
@@ -306,6 +309,7 @@
complete, value, readcount = decodeuint(subtype, b, offset)
if complete:
+ assert value is not None # help pytype
return True, -value - 1, readcount + 1, SPECIAL_NONE
else:
return False, None, readcount, SPECIAL_NONE
@@ -414,7 +418,9 @@
assert False
-def decodeuint(subtype, b, offset=0, allowindefinite=False):
+def decodeuint(
+ subtype: int, b: bytes, offset: int = 0, allowindefinite: bool = False
+):
"""Decode an unsigned integer.
``subtype`` is the lower 5 bits from the initial byte CBOR item
@@ -473,7 +479,10 @@
or last in an indefinite length bytestring.
"""
- def __new__(cls, v, first=False, last=False):
+ isfirst: bool
+ islast: bool
+
+ def __new__(cls, v, first: bool = False, last: bool = False):
self = bytes.__new__(cls, v)
self.isfirst = first
self.islast = last
@@ -546,7 +555,7 @@
_STATE_WANT_BYTESTRING_CHUNK_FIRST = 5
_STATE_WANT_BYTESTRING_CHUNK_SUBSEQUENT = 6
- def __init__(self):
+ def __init__(self) -> None:
# TODO add support for limiting size of bytestrings
# TODO add support for limiting number of keys / values in collections
# TODO add support for limiting size of buffered partial values
@@ -566,11 +575,11 @@
self._decodedvalues = []
@property
- def inprogress(self):
+ def inprogress(self) -> bool:
"""Whether the decoder has partially decoded a value."""
return self._state != self._STATE_NONE
- def decode(self, b, offset=0):
+ def decode(self, b, offset: int = 0) -> tuple[bool, int, int]:
"""Attempt to decode bytes from an input buffer.
``b`` is a collection of bytes and ``offset`` is the byte
@@ -985,12 +994,16 @@
be buffered.
"""
- def __init__(self):
+ _decoder: sansiodecoder
+ _chunks: list
+ _wanted: int
+
+ def __init__(self) -> None:
self._decoder = sansiodecoder()
self._chunks = []
self._wanted = 0
- def decode(self, b):
+ def decode(self, b) -> tuple[bool, int, int]:
"""Attempt to decode bytes to CBOR values.
Returns a tuple with the following fields: