--- a/mercurial/streamclone.py Mon Jan 13 00:29:11 2025 -0500
+++ b/mercurial/streamclone.py Mon Jan 13 00:36:25 2025 -0500
@@ -12,7 +12,7 @@
import os
import struct
-from typing import Iterator, Optional
+from typing import Iterable, Iterator, Optional, Set
from .i18n import _
from .interfaces import repository
@@ -35,7 +35,10 @@
)
-def new_stream_clone_requirements(default_requirements, streamed_requirements):
+def new_stream_clone_requirements(
+ default_requirements: Iterable[bytes],
+ streamed_requirements: Iterable[bytes],
+) -> Set[bytes]:
"""determine the final set of requirement for a new stream clone
this method combine the "default" requirements that a new repository would
@@ -48,7 +51,7 @@
return requirements
-def streamed_requirements(repo):
+def streamed_requirements(repo) -> Set[bytes]:
"""the set of requirement the new clone will have to support
This is used for advertising the stream options and to generate the actual
@@ -59,7 +62,7 @@
return requiredformats
-def canperformstreamclone(pullop, bundle2=False):
+def canperformstreamclone(pullop, bundle2: bool = False):
"""Whether it is possible to perform a streaming clone as part of pull.
``bundle2`` will cause the function to consider stream clone through
@@ -153,7 +156,7 @@
return True, requirements
-def maybeperformlegacystreamclone(pullop):
+def maybeperformlegacystreamclone(pullop) -> None:
"""Possibly perform a legacy stream clone operation.
Legacy stream clones are performed as part of pull but before all other
@@ -228,7 +231,7 @@
repo.invalidate()
-def allowservergeneration(repo):
+def allowservergeneration(repo) -> bool:
"""Whether streaming clones are allowed from the server."""
if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
return False
@@ -246,11 +249,13 @@
# This is it's own function so extensions can override it.
-def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
+def _walkstreamfiles(
+ repo, matcher=None, phase: bool = False, obsolescence: bool = False
+):
return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
-def generatev1(repo):
+def generatev1(repo) -> tuple[int, int, Iterator[bytes]]:
"""Emit content for version 1 of a streaming clone.
This returns a 3-tuple of (file count, byte size, data iterator).
@@ -291,7 +296,7 @@
svfs = repo.svfs
debugflag = repo.ui.debugflag
- def emitrevlogdata():
+ def emitrevlogdata() -> Iterator[bytes]:
for name, size in entries:
if debugflag:
repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
@@ -335,7 +340,9 @@
yield from it
-def generatebundlev1(repo, compression=b'UN'):
+def generatebundlev1(
+ repo, compression: bytes = b'UN'
+) -> tuple[Set[bytes], Iterator[bytes]]:
"""Emit content for version 1 of a stream clone bundle.
The first 4 bytes of the output ("HGS1") denote this as stream clone
@@ -363,7 +370,7 @@
requirements = streamed_requirements(repo)
requires = b','.join(sorted(requirements))
- def gen():
+ def gen() -> Iterator[bytes]:
yield b'HGS1'
yield compression
@@ -393,7 +400,7 @@
return requirements, gen()
-def consumev1(repo, fp, filecount, bytecount):
+def consumev1(repo, fp, filecount: int, bytecount: int) -> None:
"""Apply the contents from version 1 of a streaming clone file handle.
This takes the output from "stream_out" and applies it to the specified
@@ -477,7 +484,7 @@
)
-def readbundle1header(fp):
+def readbundle1header(fp) -> tuple[int, int, Set[bytes]]:
compression = fp.read(2)
if compression != b'UN':
raise error.Abort(
@@ -505,7 +512,7 @@
return filecount, bytecount, requirements
-def applybundlev1(repo, fp):
+def applybundlev1(repo, fp) -> None:
"""Apply the content from a stream clone bundle version 1.
We assume the 4 byte header has been read and validated and the file handle
@@ -535,10 +542,10 @@
readers to perform bundle type-specific functionality.
"""
- def __init__(self, fh):
+ def __init__(self, fh) -> None:
self._fh = fh
- def apply(self, repo):
+ def apply(self, repo) -> None:
return applybundlev1(repo, self._fh)
@@ -552,7 +559,7 @@
# This is it's own function so extensions can override it.
-def _walkstreamfullstorefiles(repo):
+def _walkstreamfullstorefiles(repo) -> list[bytes]:
"""list snapshot file from the store"""
fnames = []
if not repo.publishing():
@@ -589,7 +596,7 @@
# for flushing to disk in __call__().
MAX_OPEN = 2 if pycompat.iswindows else 100
- def __init__(self):
+ def __init__(self) -> None:
self._counter = 0
self._volatile_fps = None
self._copies = None
@@ -619,7 +626,7 @@
assert self._copies is None
assert self._dst_dir is None
- def _init_tmp_copies(self):
+ def _init_tmp_copies(self) -> None:
"""prepare a temporary directory to save volatile files
This will be used as backup if we have too many files open"""
@@ -629,7 +636,7 @@
self._copies = {}
self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
- def _flush_some_on_disk(self):
+ def _flush_some_on_disk(self) -> None:
"""move some of the open files to tempory files on disk"""
if self._copies is None:
self._init_tmp_copies()
@@ -648,7 +655,7 @@
del self._volatile_fps[src]
fp.close()
- def _keep_one(self, src):
+ def _keep_one(self, src: bytes) -> int:
"""preserve an open file handle for a given path"""
# store the file quickly to ensure we close it if any error happens
_, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
@@ -657,14 +664,14 @@
self._volatile_fps[src] = (size, fp)
return size
- def __call__(self, src):
+ def __call__(self, src: bytes) -> None:
"""preserve the volatile file at src"""
assert 0 < self._counter
if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
self._flush_some_on_disk()
self._keep_one(src)
- def try_keep(self, src) -> Optional[int]:
+ def try_keep(self, src: bytes) -> Optional[int]:
"""record a volatile file and returns it size
return None if the file does not exists.
@@ -778,7 +785,7 @@
raise error.Abort(msg % (bytecount, name, size))
-def _emit3(repo, entries):
+def _emit3(repo, entries) -> Iterator[bytes | None]:
"""actually emit the stream bundle (v3)"""
vfsmap = _makemap(repo)
# we keep repo.vfs out of the map on purpose, ther are too many dangers
@@ -880,14 +887,14 @@
BaseEntry base class, bbut the store one would be identical)
"""
- def __init__(self, entry_path):
+ def __init__(self, entry_path) -> None:
super().__init__(
entry_path,
# we will directly deal with that in `setup_cache_file`
is_volatile=True,
)
- def preserve_volatiles(self, vfs, volatiles):
+ def preserve_volatiles(self, vfs, volatiles) -> None:
self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
if self._file_size is None:
self._files = []
@@ -901,7 +908,7 @@
)
]
- def files(self):
+ def files(self) -> list[store.StoreFile]:
if self._files is None:
self._files = [
CacheFile(
@@ -915,10 +922,10 @@
class CacheFile(store.StoreFile):
# inform the "copy/hardlink" version that this file might be missing
# without consequences.
- optional = True
+ optional: bool = True
-def _entries_walk(repo, includes, excludes, includeobsmarkers):
+def _entries_walk(repo, includes, excludes, includeobsmarkers: bool):
"""emit a seris of files information useful to clone a repo
return (vfs-key, entry) iterator
@@ -950,7 +957,7 @@
yield (_srccache, CacheEntry(entry_path=name))
-def generatev2(repo, includes, excludes, includeobsmarkers):
+def generatev2(repo, includes, excludes, includeobsmarkers: bool):
"""Emit content for version 2 of a streaming clone.
the data stream consists the following entries:
@@ -981,7 +988,9 @@
return file_count, total_file_size, chunks
-def generatev3(repo, includes, excludes, includeobsmarkers):
+def generatev3(
+ repo, includes, excludes, includeobsmarkers: bool
+) -> Iterator[bytes | None]:
"""Emit content for version 3 of a streaming clone.
the data stream consists the following:
@@ -1039,7 +1048,7 @@
yield
-def consumev2(repo, fp, filecount, filesize):
+def consumev2(repo, fp, filecount: int, filesize: int) -> None:
"""Apply the contents from a version 2 streaming clone.
Data is read from an object that only needs to provide a ``read(size)``
@@ -1108,7 +1117,7 @@
progress.complete()
-def consumev3(repo, fp):
+def consumev3(repo, fp) -> None:
"""Apply the contents from a version 3 streaming clone.
Data is read from an object that only needs to provide a ``read(size)``
@@ -1181,7 +1190,9 @@
progress.complete()
-def applybundlev2(repo, fp, filecount, filesize, requirements):
+def applybundlev2(
+ repo, fp, filecount: int, filesize: int, requirements: Iterable[bytes]
+) -> None:
from . import localrepo
missingreqs = [r for r in requirements if r not in repo.supported]
@@ -1204,7 +1215,7 @@
nodemap.post_stream_cleanup(repo)
-def applybundlev3(repo, fp, requirements):
+def applybundlev3(repo, fp, requirements: Iterable[bytes]) -> None:
from . import localrepo
missingreqs = [r for r in requirements if r not in repo.supported]
@@ -1226,7 +1237,7 @@
nodemap.post_stream_cleanup(repo)
-def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
+def _copy_files(src_vfs_map, dst_vfs_map, entries, progress) -> bool:
hardlink = [True]
def copy_used():
@@ -1260,7 +1271,7 @@
return hardlink[0]
-def local_copy(src_repo, dest_repo):
+def local_copy(src_repo, dest_repo) -> None:
"""copy all content from one local repository to another
This is useful for local clone"""