changeset 52921:3ee343dd3abf

stream-clone-v2: extract the stream parsing code in a function This will be useful to spread the work across different threads. However, before being able to introduce threading, we need to finish refactoring the code.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Mon, 20 Jan 2025 12:16:54 +0100
parents 0005eb86deac
children 70306aefa52b
files mercurial/streamclone.py
diffstat 1 files changed, 85 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/streamclone.py	Sun Feb 16 23:29:09 2025 -0500
+++ b/mercurial/streamclone.py	Mon Jan 20 12:16:54 2025 +0100
@@ -12,7 +12,13 @@
 import os
 import struct
 
-from typing import Iterable, Iterator, Optional, Set
+from typing import (
+    Iterable,
+    Iterator,
+    Optional,
+    Set,
+    Tuple,
+)
 
 from .i18n import _
 from .interfaces import repository
@@ -1057,6 +1063,13 @@
             yield
 
 
+class V2Report:
+    """a small class to track the data we saw within the stream"""
+
+    def __init__(self):
+        self.byte_count = 0
+
+
 def consumev2(repo, fp, filecount: int, filesize: int) -> None:
     """Apply the contents from a version 2 streaming clone.
 
@@ -1068,13 +1081,13 @@
             _(b'%d files to transfer, %s of data\n')
             % (filecount, util.bytecount(filesize))
         )
-
+        progress = repo.ui.makeprogress(
+            _(b'clone'),
+            total=filesize,
+            unit=_(b'bytes'),
+        )
         start = util.timer()
-        progress = repo.ui.makeprogress(
-            _(b'clone'), total=filesize, unit=_(b'bytes')
-        )
-        progress.update(0)
-        byte_count = 0
+        report = V2Report()
 
         vfsmap = _makemap(repo)
         # we keep repo.vfs out of the on purpose, ther are too many danger
@@ -1090,25 +1103,18 @@
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
-                for i in range(filecount):
-                    src = util.readexactly(fp, 1)
+                files = _v2_parse_files(
+                    repo,
+                    fp,
+                    filecount,
+                    progress,
+                    report,
+                )
+                for src, name, chunks in files:
                     vfs = vfsmap[src]
-                    namelen = util.uvarintdecodestream(fp)
-                    datalen = util.uvarintdecodestream(fp)
-
-                    name = util.readexactly(fp, namelen)
-
-                    if repo.ui.debugflag:
-                        repo.ui.debug(
-                            b'adding [%s] %s (%s)\n'
-                            % (src, name, util.bytecount(datalen))
-                        )
-
                     with vfs(name, b'w') as ofp:
-                        for chunk in util.filechunkiter(fp, limit=datalen):
-                            byte_count += len(chunk)
-                            progress.increment(step=len(chunk))
-                            ofp.write(chunk)
+                        for c in chunks:
+                            ofp.write(c)
 
             # force @filecache properties to be reloaded from
             # streamclone-ed file at next access
@@ -1119,7 +1125,61 @@
         # sequential and parallel behavior.
         remains = fp.read(1)
         assert not remains
-        _report_transferred(repo, start, filecount, byte_count)
+        _report_transferred(repo, start, filecount, report.byte_count)
+
+
+# iterator of chunk of bytes that constitute a file content.
+FileChunksT = Iterator[bytes]
+# Contains the information necessary to write stream file on disk
+FileInfoT = Tuple[
+    bytes,  # vfs key
+    bytes,  # file name (non-vfs-encoded)
+    FileChunksT,  # content
+]
+
+
+def _v2_file_chunk(
+    fp: bundle2mod.unbundlepart,
+    data_len: int,
+    progress: scmutil.progress,
+    report: V2Report,
+) -> FileChunksT:
+    """yield the chunk that constitute a file
+
+    This function exists as the counterpart of a future threaded version and
+    would not be very useful on its own.
+    """
+    for chunk in util.filechunkiter(fp, limit=data_len):
+        report.byte_count += len(chunk)
+        progress.increment(step=len(chunk))
+        yield chunk
+
+
+def _v2_parse_files(
+    repo,
+    fp: bundle2mod.unbundlepart,
+    file_count: int,
+    progress: scmutil.progress,
+    report: V2Report,
+) -> Iterator[FileInfoT]:
+    """do the "stream-parsing" part of stream v2
+
+    The parsed information are yield result for consumption by the "writer"
+    """
+    progress.update(0)
+    for i in range(file_count):
+        src = util.readexactly(fp, 1)
+        namelen = util.uvarintdecodestream(fp)
+        datalen = util.uvarintdecodestream(fp)
+
+        name = util.readexactly(fp, namelen)
+
+        if repo.ui.debugflag:
+            repo.ui.debug(
+                b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
+            )
+
+        yield (src, name, _v2_file_chunk(fp, datalen, progress, report))
 
 
 def consumev3(repo, fp) -> None: