--- a/mercurial/streamclone.py Sun Feb 16 23:29:09 2025 -0500
+++ b/mercurial/streamclone.py Mon Jan 20 12:16:54 2025 +0100
@@ -12,7 +12,13 @@
import os
import struct
-from typing import Iterable, Iterator, Optional, Set
+from typing import (
+ Iterable,
+ Iterator,
+ Optional,
+ Set,
+ Tuple,
+)
from .i18n import _
from .interfaces import repository
@@ -1057,6 +1063,13 @@
yield
+class V2Report:
+ """a small class to track the data we saw within the stream"""
+
+ def __init__(self):
+ self.byte_count = 0
+
+
def consumev2(repo, fp, filecount: int, filesize: int) -> None:
"""Apply the contents from a version 2 streaming clone.
@@ -1068,13 +1081,13 @@
_(b'%d files to transfer, %s of data\n')
% (filecount, util.bytecount(filesize))
)
-
+ progress = repo.ui.makeprogress(
+ _(b'clone'),
+ total=filesize,
+ unit=_(b'bytes'),
+ )
start = util.timer()
- progress = repo.ui.makeprogress(
- _(b'clone'), total=filesize, unit=_(b'bytes')
- )
- progress.update(0)
- byte_count = 0
+ report = V2Report()
vfsmap = _makemap(repo)
# we keep repo.vfs out of the on purpose, ther are too many danger
@@ -1090,25 +1103,18 @@
with repo.transaction(b'clone'):
ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
with nested(*ctxs):
- for i in range(filecount):
- src = util.readexactly(fp, 1)
+ files = _v2_parse_files(
+ repo,
+ fp,
+ filecount,
+ progress,
+ report,
+ )
+ for src, name, chunks in files:
vfs = vfsmap[src]
- namelen = util.uvarintdecodestream(fp)
- datalen = util.uvarintdecodestream(fp)
-
- name = util.readexactly(fp, namelen)
-
- if repo.ui.debugflag:
- repo.ui.debug(
- b'adding [%s] %s (%s)\n'
- % (src, name, util.bytecount(datalen))
- )
-
with vfs(name, b'w') as ofp:
- for chunk in util.filechunkiter(fp, limit=datalen):
- byte_count += len(chunk)
- progress.increment(step=len(chunk))
- ofp.write(chunk)
+ for c in chunks:
+ ofp.write(c)
# force @filecache properties to be reloaded from
# streamclone-ed file at next access
@@ -1119,7 +1125,61 @@
# sequential and parallel behavior.
remains = fp.read(1)
assert not remains
- _report_transferred(repo, start, filecount, byte_count)
+ _report_transferred(repo, start, filecount, report.byte_count)
+
+
+# iterator of chunk of bytes that constitute a file content.
+FileChunksT = Iterator[bytes]
+# Contains the information necessary to write stream file on disk
+FileInfoT = Tuple[
+ bytes, # vfs key
+ bytes, # file name (non-vfs-encoded)
+ FileChunksT, # content
+]
+
+
+def _v2_file_chunk(
+ fp: bundle2mod.unbundlepart,
+ data_len: int,
+ progress: scmutil.progress,
+ report: V2Report,
+) -> FileChunksT:
+ """yield the chunk that constitute a file
+
+ This function exists as the counterpart of a future threaded version and
+ would not be very useful on its own.
+ """
+ for chunk in util.filechunkiter(fp, limit=data_len):
+ report.byte_count += len(chunk)
+ progress.increment(step=len(chunk))
+ yield chunk
+
+
+def _v2_parse_files(
+ repo,
+ fp: bundle2mod.unbundlepart,
+ file_count: int,
+ progress: scmutil.progress,
+ report: V2Report,
+) -> Iterator[FileInfoT]:
+ """do the "stream-parsing" part of stream v2
+
+ The parsed information are yield result for consumption by the "writer"
+ """
+ progress.update(0)
+ for i in range(file_count):
+ src = util.readexactly(fp, 1)
+ namelen = util.uvarintdecodestream(fp)
+ datalen = util.uvarintdecodestream(fp)
+
+ name = util.readexactly(fp, namelen)
+
+ if repo.ui.debugflag:
+ repo.ui.debug(
+ b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
+ )
+
+ yield (src, name, _v2_file_chunk(fp, datalen, progress, report))
def consumev3(repo, fp) -> None: