diff mercurial/utils/cborutil.py @ 52693:5e09c6b5b795

typing: add type annotations to most of `mercurial/utils/cborutil.py` These are the easy/obvious/documented ones. We'll leave the harder ones for later.
author Matt Harbison <matt_harbison@yahoo.com>
date Sun, 12 Jan 2025 22:05:19 -0500
parents 279e217d6041
children 8a2091a2f974
line wrap: on
line diff
--- a/mercurial/utils/cborutil.py	Sun Jan 12 21:35:30 2025 -0500
+++ b/mercurial/utils/cborutil.py	Sun Jan 12 22:05:19 2025 -0500
@@ -12,6 +12,7 @@
 
 if typing.TYPE_CHECKING:
     from typing import (
+        Iterable,
         Iterator,
     )
 
@@ -69,7 +70,7 @@
 BREAK_INT = 255
 
 
-def encodelength(majortype, length):
+def encodelength(majortype: int, length: int) -> bytes:
     """Obtain a value encoding the major type and its length."""
     if length < 24:
         return ENCODED_LENGTH_1.pack(majortype << 5 | length)
@@ -83,12 +84,12 @@
         return ENCODED_LENGTH_5.pack(majortype << 5 | 27, length)
 
 
-def streamencodebytestring(v):
+def streamencodebytestring(v: bytes) -> Iterator[bytes]:
     yield encodelength(MAJOR_TYPE_BYTESTRING, len(v))
     yield v
 
 
-def streamencodebytestringfromiter(it):
+def streamencodebytestringfromiter(it: Iterable[bytes]) -> Iterator[bytes]:
     """Convert an iterator of chunks to an indefinite bytestring.
 
     Given an input that is iterable and each element in the iterator is
@@ -103,7 +104,9 @@
     yield BREAK
 
 
-def streamencodeindefinitebytestring(source, chunksize=65536):
+def streamencodeindefinitebytestring(
+    source, chunksize: int = 65536
+) -> Iterator[bytes]:
     """Given a large source buffer, emit as an indefinite length bytestring.
 
     This is a generator of chunks constituting the encoded CBOR data.
@@ -126,7 +129,7 @@
     yield BREAK
 
 
-def streamencodeint(v):
+def streamencodeint(v: int) -> Iterator[bytes]:
     if v >= 18446744073709551616 or v < -18446744073709551616:
         raise ValueError(b'big integers not supported')
 
@@ -160,7 +163,7 @@
     return type(v).__name__, v
 
 
-def streamencodeset(s):
+def streamencodeset(s) -> Iterator[bytes]:
     # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml defines
     # semantic tag 258 for finite sets.
     yield encodelength(MAJOR_TYPE_SEMANTIC, SEMANTIC_TAG_FINITE_SET)
@@ -168,7 +171,7 @@
     yield from streamencodearray(sorted(s, key=_mixedtypesortkey))
 
 
-def streamencodemap(d) -> Iterator[bytes]:
+def streamencodemap(d: dict) -> Iterator[bytes]:
     """Encode dictionary to a generator.
 
     Does not supporting indefinite length dictionaries.
@@ -180,7 +183,7 @@
         yield from streamencode(value)
 
 
-def streamencodemapfromiter(it) -> Iterator[bytes]:
+def streamencodemapfromiter(it: Iterable) -> Iterator[bytes]:
     """Given an iterable of (key, value), encode to an indefinite length map."""
     yield BEGIN_INDEFINITE_MAP
 
@@ -191,12 +194,12 @@
     yield BREAK
 
 
-def streamencodebool(b):
+def streamencodebool(b: bool) -> Iterator[bytes]:
     # major type 7, simple value 20 and 21.
     yield b'\xf5' if b else b'\xf4'
 
 
-def streamencodenone(v):
+def streamencodenone(v: None) -> Iterator[bytes]:
     # major type 7, simple value 22.
     yield b'\xf6'
 
@@ -213,7 +216,7 @@
 }
 
 
-def streamencode(v):
+def streamencode(v) -> Iterator[bytes]:
     """Encode a value in a streaming manner.
 
     Given an input object, encode it to CBOR recursively.
@@ -243,7 +246,7 @@
     """Represents an error decoding CBOR."""
 
 
-def _elementtointeger(b, i):
+def _elementtointeger(b, i: int) -> int:
     return b[i]
 
 
@@ -260,7 +263,7 @@
 SPECIAL_INDEFINITE_BREAK = 5
 
 
-def decodeitem(b, offset=0):
+def decodeitem(b, offset: int = 0):
     """Decode a new CBOR value from a buffer at offset.
 
     This function attempts to decode up to one complete CBOR value
@@ -306,6 +309,7 @@
         complete, value, readcount = decodeuint(subtype, b, offset)
 
         if complete:
+            assert value is not None  # help pytype
             return True, -value - 1, readcount + 1, SPECIAL_NONE
         else:
             return False, None, readcount, SPECIAL_NONE
@@ -414,7 +418,9 @@
         assert False
 
 
-def decodeuint(subtype, b, offset=0, allowindefinite=False):
+def decodeuint(
+    subtype: int, b: bytes, offset: int = 0, allowindefinite: bool = False
+):
     """Decode an unsigned integer.
 
     ``subtype`` is the lower 5 bits from the initial byte CBOR item
@@ -473,7 +479,10 @@
     or last in an indefinite length bytestring.
     """
 
-    def __new__(cls, v, first=False, last=False):
+    isfirst: bool
+    islast: bool
+
+    def __new__(cls, v, first: bool = False, last: bool = False):
         self = bytes.__new__(cls, v)
         self.isfirst = first
         self.islast = last
@@ -546,7 +555,7 @@
     _STATE_WANT_BYTESTRING_CHUNK_FIRST = 5
     _STATE_WANT_BYTESTRING_CHUNK_SUBSEQUENT = 6
 
-    def __init__(self):
+    def __init__(self) -> None:
         # TODO add support for limiting size of bytestrings
         # TODO add support for limiting number of keys / values in collections
         # TODO add support for limiting size of buffered partial values
@@ -566,11 +575,11 @@
         self._decodedvalues = []
 
     @property
-    def inprogress(self):
+    def inprogress(self) -> bool:
         """Whether the decoder has partially decoded a value."""
         return self._state != self._STATE_NONE
 
-    def decode(self, b, offset=0):
+    def decode(self, b, offset: int = 0) -> tuple[bool, int, int]:
         """Attempt to decode bytes from an input buffer.
 
         ``b`` is a collection of bytes and ``offset`` is the byte
@@ -985,12 +994,16 @@
     be buffered.
     """
 
-    def __init__(self):
+    _decoder: sansiodecoder
+    _chunks: list
+    _wanted: int
+
+    def __init__(self) -> None:
         self._decoder = sansiodecoder()
         self._chunks = []
         self._wanted = 0
 
-    def decode(self, b):
+    def decode(self, b) -> tuple[bool, int, int]:
         """Attempt to decode bytes to CBOR values.
 
         Returns a tuple with the following fields: