diff mercurial/streamclone.py @ 53031:e705fec4a03f stable

branching: merging with 7.0 changes Since 6.9.3 was made after 7.0rc0 we need to deal with more branching than usual.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 05 Mar 2025 23:02:19 +0100
parents 89ab2459f62a e75ed9ae5fb9
children
line wrap: on
line diff
--- a/mercurial/streamclone.py	Wed Mar 05 22:33:11 2025 +0100
+++ b/mercurial/streamclone.py	Wed Mar 05 23:02:19 2025 +0100
@@ -7,9 +7,22 @@
 
 from __future__ import annotations
 
+import collections
 import contextlib
+import errno
 import os
 import struct
+import threading
+
+from typing import (
+    Callable,
+    Iterable,
+    Iterator,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
 
 from .i18n import _
 from .interfaces import repository
@@ -32,7 +45,29 @@
 )
 
 
-def new_stream_clone_requirements(default_requirements, streamed_requirements):
+# Number arbitrarily picked, feel free to change them (but the LOW one)
+#
+# update the configuration documentation if you touch this.
+DEFAULT_NUM_WRITER = {
+    scmutil.RESOURCE_LOW: 1,
+    scmutil.RESOURCE_MEDIUM: 4,
+    scmutil.RESOURCE_HIGH: 8,
+}
+
+
+# Number arbitrarily picked, feel free to adjust them. Do update the
+# documentation if you do so
+DEFAULT_MEMORY_TARGET = {
+    scmutil.RESOURCE_LOW: 50 * (2**20),  # 100 MB
+    scmutil.RESOURCE_MEDIUM: 500 * 2**20,  # 500 MB
+    scmutil.RESOURCE_HIGH: 2 * 2**30,  #   2 GB
+}
+
+
+def new_stream_clone_requirements(
+    default_requirements: Iterable[bytes],
+    streamed_requirements: Iterable[bytes],
+) -> Set[bytes]:
     """determine the final set of requirement for a new stream clone
 
     this method combine the "default" requirements that a new repository would
@@ -45,7 +80,7 @@
     return requirements
 
 
-def streamed_requirements(repo):
+def streamed_requirements(repo) -> Set[bytes]:
     """the set of requirement the new clone will have to support
 
     This is used for advertising the stream options and to generate the actual
@@ -56,7 +91,7 @@
     return requiredformats
 
 
-def canperformstreamclone(pullop, bundle2=False):
+def canperformstreamclone(pullop, bundle2: bool = False):
     """Whether it is possible to perform a streaming clone as part of pull.
 
     ``bundle2`` will cause the function to consider stream clone through
@@ -150,7 +185,7 @@
     return True, requirements
 
 
-def maybeperformlegacystreamclone(pullop):
+def maybeperformlegacystreamclone(pullop) -> None:
     """Possibly perform a legacy stream clone operation.
 
     Legacy stream clones are performed as part of pull but before all other
@@ -225,7 +260,7 @@
         repo.invalidate()
 
 
-def allowservergeneration(repo):
+def allowservergeneration(repo) -> bool:
     """Whether streaming clones are allowed from the server."""
     if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
         return False
@@ -243,11 +278,30 @@
 
 
 # This is it's own function so extensions can override it.
-def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
+def _walkstreamfiles(
+    repo, matcher=None, phase: bool = False, obsolescence: bool = False
+):
     return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
 
 
-def generatev1(repo):
+def _report_transferred(
+    repo, start_time: float, file_count: int, byte_count: int
+):
+    """common utility to report time it took to apply the stream bundle"""
+    elapsed = util.timer() - start_time
+    if elapsed <= 0:
+        elapsed = 0.001
+    m = _(b'stream-cloned %d files / %s in %.1f seconds (%s/sec)\n')
+    m %= (
+        file_count,
+        util.bytecount(byte_count),
+        elapsed,
+        util.bytecount(byte_count / elapsed),
+    )
+    repo.ui.status(m)
+
+
+def generatev1(repo) -> tuple[int, int, Iterator[bytes]]:
     """Emit content for version 1 of a streaming clone.
 
     This returns a 3-tuple of (file count, byte size, data iterator).
@@ -271,14 +325,15 @@
     # Get consistent snapshot of repo, lock during scan.
     with repo.lock():
         repo.ui.debug(b'scanning\n')
+        _test_sync_point_walk_1_2(repo)
         for entry in _walkstreamfiles(repo):
             for f in entry.files():
                 file_size = f.file_size(repo.store.vfs)
                 if file_size:
                     entries.append((f.unencoded_path, file_size))
                     total_bytes += file_size
-        _test_sync_point_walk_1(repo)
-    _test_sync_point_walk_2(repo)
+        _test_sync_point_walk_3(repo)
+    _test_sync_point_walk_4(repo)
 
     repo.ui.debug(
         b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
@@ -287,7 +342,7 @@
     svfs = repo.svfs
     debugflag = repo.ui.debugflag
 
-    def emitrevlogdata():
+    def emitrevlogdata() -> Iterator[bytes]:
         for name, size in entries:
             if debugflag:
                 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
@@ -299,13 +354,12 @@
                 if size <= 65536:
                     yield fp.read(size)
                 else:
-                    for chunk in util.filechunkiter(fp, limit=size):
-                        yield chunk
+                    yield from util.filechunkiter(fp, limit=size)
 
     return len(entries), total_bytes, emitrevlogdata()
 
 
-def generatev1wireproto(repo):
+def generatev1wireproto(repo) -> Iterator[bytes]:
     """Emit content for version 1 of streaming clone suitable for the wire.
 
     This is the data output from ``generatev1()`` with 2 header lines. The
@@ -329,11 +383,12 @@
     # Indicates successful response.
     yield b'0\n'
     yield b'%d %d\n' % (filecount, bytecount)
-    for chunk in it:
-        yield chunk
+    yield from it
 
 
-def generatebundlev1(repo, compression=b'UN'):
+def generatebundlev1(
+    repo, compression: bytes = b'UN'
+) -> tuple[Set[bytes], Iterator[bytes]]:
     """Emit content for version 1 of a stream clone bundle.
 
     The first 4 bytes of the output ("HGS1") denote this as stream clone
@@ -356,12 +411,12 @@
     Returns a tuple of (requirements, data generator).
     """
     if compression != b'UN':
-        raise ValueError(b'we do not support the compression argument yet')
+        raise ValueError('we do not support the compression argument yet')
 
     requirements = streamed_requirements(repo)
     requires = b','.join(sorted(requirements))
 
-    def gen():
+    def gen() -> Iterator[bytes]:
         yield b'HGS1'
         yield compression
 
@@ -391,7 +446,7 @@
     return requirements, gen()
 
 
-def consumev1(repo, fp, filecount, bytecount):
+def consumev1(repo, fp, filecount: int, bytecount: int) -> None:
     """Apply the contents from version 1 of a streaming clone file handle.
 
     This takes the output from "stream_out" and applies it to the specified
@@ -425,6 +480,7 @@
         # nesting occurs also in ordinary case (e.g. enabling
         # clonebundles).
 
+        total_file_count = 0
         with repo.transaction(b'clone'):
             with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
                 for i in range(filecount):
@@ -453,6 +509,7 @@
                     # for backwards compat, name was partially encoded
                     path = store.decodedir(name)
                     with repo.svfs(path, b'w', backgroundclose=True) as ofp:
+                        total_file_count += 1
                         for chunk in util.filechunkiter(fp, limit=size):
                             progress.increment(step=len(chunk))
                             ofp.write(chunk)
@@ -461,21 +518,11 @@
             # streamclone-ed file at next access
             repo.invalidate(clearfilecache=True)
 
-        elapsed = util.timer() - start
-        if elapsed <= 0:
-            elapsed = 0.001
         progress.complete()
-        repo.ui.status(
-            _(b'transferred %s in %.1f seconds (%s/sec)\n')
-            % (
-                util.bytecount(bytecount),
-                elapsed,
-                util.bytecount(bytecount / elapsed),
-            )
-        )
+        _report_transferred(repo, start, total_file_count, bytecount)
 
 
-def readbundle1header(fp):
+def readbundle1header(fp) -> tuple[int, int, Set[bytes]]:
     compression = fp.read(2)
     if compression != b'UN':
         raise error.Abort(
@@ -503,7 +550,7 @@
     return filecount, bytecount, requirements
 
 
-def applybundlev1(repo, fp):
+def applybundlev1(repo, fp) -> None:
     """Apply the content from a stream clone bundle version 1.
 
     We assume the 4 byte header has been read and validated and the file handle
@@ -533,10 +580,10 @@
     readers to perform bundle type-specific functionality.
     """
 
-    def __init__(self, fh):
+    def __init__(self, fh) -> None:
         self._fh = fh
 
-    def apply(self, repo):
+    def apply(self, repo) -> None:
         return applybundlev1(repo, self._fh)
 
 
@@ -550,7 +597,7 @@
 
 
 # This is it's own function so extensions can override it.
-def _walkstreamfullstorefiles(repo):
+def _walkstreamfullstorefiles(repo) -> list[bytes]:
     """list snapshot file from the store"""
     fnames = []
     if not repo.publishing():
@@ -587,7 +634,7 @@
     # for flushing to disk in __call__().
     MAX_OPEN = 2 if pycompat.iswindows else 100
 
-    def __init__(self):
+    def __init__(self) -> None:
         self._counter = 0
         self._volatile_fps = None
         self._copies = None
@@ -617,7 +664,7 @@
             assert self._copies is None
             assert self._dst_dir is None
 
-    def _init_tmp_copies(self):
+    def _init_tmp_copies(self) -> None:
         """prepare a temporary directory to save volatile files
 
         This will be used as backup if we have too many files open"""
@@ -627,7 +674,7 @@
         self._copies = {}
         self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
 
-    def _flush_some_on_disk(self):
+    def _flush_some_on_disk(self) -> None:
         """move some of the open files to tempory files on disk"""
         if self._copies is None:
             self._init_tmp_copies()
@@ -646,21 +693,39 @@
             del self._volatile_fps[src]
             fp.close()
 
-    def _keep_one(self, src):
+    def _keep_one(self, src: bytes) -> int:
         """preserve an open file handle for a given path"""
         # store the file quickly to ensure we close it if any error happens
         _, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
         fp.seek(0, os.SEEK_END)
         size = fp.tell()
         self._volatile_fps[src] = (size, fp)
+        return size
 
-    def __call__(self, src):
+    def __call__(self, src: bytes) -> None:
         """preserve the volatile file at src"""
         assert 0 < self._counter
         if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
             self._flush_some_on_disk()
         self._keep_one(src)
 
+    def try_keep(self, src: bytes) -> Optional[int]:
+        """record a volatile file and returns it size
+
+        return None if the file does not exists.
+
+        Used for cache file that are not lock protected.
+        """
+        assert 0 < self._counter
+        if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
+            self._flush_some_on_disk()
+        try:
+            return self._keep_one(src)
+        except OSError as err:
+            if err.errno not in (errno.ENOENT, errno.EPERM):
+                raise
+            return None
+
     @contextlib.contextmanager
     def open(self, src):
         assert 0 < self._counter
@@ -701,65 +766,64 @@
     # fine, while this is really not fine.
     if repo.vfs in vfsmap.values():
         raise error.ProgrammingError(
-            b'repo.vfs must not be added to vfsmap for security reasons'
+            'repo.vfs must not be added to vfsmap for security reasons'
         )
 
     # translate the vfs one
     entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+    _test_sync_point_walk_1_2(repo)
 
     max_linkrev = len(repo)
     file_count = totalfilesize = 0
-    with util.nogc():
-        # record the expected size of every file
-        for k, vfs, e in entries:
-            for f in e.files():
-                file_count += 1
-                totalfilesize += f.file_size(vfs)
+    with VolatileManager() as volatiles:
+        # make sure we preserve volatile files
+        with util.nogc():
+            # record the expected size of every file
+            for k, vfs, e in entries:
+                e.preserve_volatiles(vfs, volatiles)
+                for f in e.files():
+                    file_count += 1
+                    totalfilesize += f.file_size(vfs)
 
-    progress = repo.ui.makeprogress(
-        _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
-    )
-    progress.update(0)
-    with VolatileManager() as volatiles, progress:
-        # make sure we preserve volatile files
-        for k, vfs, e in entries:
-            for f in e.files():
-                if f.is_volatile:
-                    volatiles(vfs.join(f.unencoded_path))
-        # the first yield release the lock on the repository
-        yield file_count, totalfilesize
-        totalbytecount = 0
+        progress = repo.ui.makeprogress(
+            _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
+        )
+        progress.update(0)
+        with progress:
+            # the first yield release the lock on the repository
+            yield file_count, totalfilesize
+            totalbytecount = 0
 
-        for src, vfs, e in entries:
-            entry_streams = e.get_streams(
-                repo=repo,
-                vfs=vfs,
-                volatiles=volatiles,
-                max_changeset=max_linkrev,
-                preserve_file_count=True,
-            )
-            for name, stream, size in entry_streams:
-                yield src
-                yield util.uvarintencode(len(name))
-                yield util.uvarintencode(size)
-                yield name
-                bytecount = 0
-                for chunk in stream:
-                    bytecount += len(chunk)
-                    totalbytecount += len(chunk)
-                    progress.update(totalbytecount)
-                    yield chunk
-                if bytecount != size:
-                    # Would most likely be caused by a race due to `hg
-                    # strip` or a revlog split
-                    msg = _(
-                        b'clone could only read %d bytes from %s, but '
-                        b'expected %d bytes'
-                    )
-                    raise error.Abort(msg % (bytecount, name, size))
+            for src, vfs, e in entries:
+                entry_streams = e.get_streams(
+                    repo=repo,
+                    vfs=vfs,
+                    volatiles=volatiles,
+                    max_changeset=max_linkrev,
+                    preserve_file_count=True,
+                )
+                for name, stream, size in entry_streams:
+                    yield src
+                    yield util.uvarintencode(len(name))
+                    yield util.uvarintencode(size)
+                    yield name
+                    bytecount = 0
+                    for chunk in stream:
+                        bytecount += len(chunk)
+                        totalbytecount += len(chunk)
+                        progress.update(totalbytecount)
+                        yield chunk
+                    if bytecount != size:
+                        # Would most likely be caused by a race due to `hg
+                        # strip` or a revlog split
+                        msg = _(
+                            b'clone could only read %d bytes from %s, but '
+                            b'expected %d bytes'
+                        )
+                        raise error.Abort(msg % (bytecount, name, size))
 
 
-def _emit3(repo, entries):
+def _emit3(repo, entries) -> Iterator[bytes | None]:
     """actually emit the stream bundle (v3)"""
     vfsmap = _makemap(repo)
     # we keep repo.vfs out of the map on purpose, ther are too many dangers
@@ -769,60 +833,137 @@
     # fine, while this is really not fine.
     if repo.vfs in vfsmap.values():
         raise error.ProgrammingError(
-            b'repo.vfs must not be added to vfsmap for security reasons'
+            'repo.vfs must not be added to vfsmap for security reasons'
         )
 
     # translate the vfs once
-    entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
-    total_entry_count = len(entries)
-
-    max_linkrev = len(repo)
-    progress = repo.ui.makeprogress(
-        _(b'bundle'),
-        total=total_entry_count,
-        unit=_(b'entry'),
-    )
-    progress.update(0)
-    with VolatileManager() as volatiles, progress:
+    # we only turn this into a list for the `_test_sync`, this is not ideal
+    base_entries = list(entries)
+    _test_sync_point_walk_1_2(repo)
+    entries = []
+    with VolatileManager() as volatiles:
         # make sure we preserve volatile files
-        for k, vfs, e in entries:
+        for vfs_key, e in base_entries:
+            vfs = vfsmap[vfs_key]
+            any_files = True
             if e.maybe_volatile:
+                any_files = False
+                e.preserve_volatiles(vfs, volatiles)
                 for f in e.files():
                     if f.is_volatile:
                         # record the expected size under lock
                         f.file_size(vfs)
-                        volatiles(vfs.join(f.unencoded_path))
+                    any_files = True
+            if any_files:
+                entries.append((vfs_key, vfsmap[vfs_key], e))
+
+        total_entry_count = len(entries)
+
+        max_linkrev = len(repo)
+        progress = repo.ui.makeprogress(
+            _(b'bundle'),
+            total=total_entry_count,
+            unit=_(b'entry'),
+        )
+        progress.update(0)
         # the first yield release the lock on the repository
         yield None
+        with progress:
+            yield util.uvarintencode(total_entry_count)
 
-        yield util.uvarintencode(total_entry_count)
+            for src, vfs, e in entries:
+                entry_streams = e.get_streams(
+                    repo=repo,
+                    vfs=vfs,
+                    volatiles=volatiles,
+                    max_changeset=max_linkrev,
+                )
+                yield util.uvarintencode(len(entry_streams))
+                for name, stream, size in entry_streams:
+                    yield src
+                    yield util.uvarintencode(len(name))
+                    yield util.uvarintencode(size)
+                    yield name
+                    yield from stream
+                progress.increment()
+
 
-        for src, vfs, e in entries:
-            entry_streams = e.get_streams(
-                repo=repo,
-                vfs=vfs,
-                volatiles=volatiles,
-                max_changeset=max_linkrev,
-            )
-            yield util.uvarintencode(len(entry_streams))
-            for name, stream, size in entry_streams:
-                yield src
-                yield util.uvarintencode(len(name))
-                yield util.uvarintencode(size)
-                yield name
-                yield from stream
-            progress.increment()
+def _test_sync_point_walk_1_2(repo):
+    """a function for synchronisation during tests
+
+    Triggered after gather entry, but before starting to process/preserve them
+    under lock.
+
+    (on v1 is triggered before the actual walk start)
+    """
+
+
+def _test_sync_point_walk_3(repo):
+    """a function for synchronisation during tests
+
+    Triggered right before releasing the lock, but after computing what need
+    needed to compute under lock.
+    """
+
+
+def _test_sync_point_walk_4(repo):
+    """a function for synchronisation during tests
+
+    Triggered right after releasing the lock.
+    """
 
 
-def _test_sync_point_walk_1(repo):
-    """a function for synchronisation during tests"""
+# not really a StoreEntry, but close enough
+class CacheEntry(store.SimpleStoreEntry):
+    """Represent an entry for Cache files
+
+    It has special logic to preserve cache file early and accept optional
+    presence.
 
 
-def _test_sync_point_walk_2(repo):
-    """a function for synchronisation during tests"""
+    (Yes... this is not really a StoreEntry, but close enough. We could have a
+    BaseEntry base class, bbut the store one would be identical)
+    """
+
+    def __init__(self, entry_path) -> None:
+        super().__init__(
+            entry_path,
+            # we will directly deal with that in `setup_cache_file`
+            is_volatile=True,
+        )
+
+    def preserve_volatiles(self, vfs, volatiles) -> None:
+        self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
+        if self._file_size is None:
+            self._files = []
+        else:
+            assert self._is_volatile
+            self._files = [
+                CacheFile(
+                    unencoded_path=self._entry_path,
+                    file_size=self._file_size,
+                    is_volatile=self._is_volatile,
+                )
+            ]
+
+    def files(self) -> list[store.StoreFile]:
+        if self._files is None:
+            self._files = [
+                CacheFile(
+                    unencoded_path=self._entry_path,
+                    is_volatile=self._is_volatile,
+                )
+            ]
+        return super().files()
 
 
-def _entries_walk(repo, includes, excludes, includeobsmarkers):
+class CacheFile(store.StoreFile):
+    # inform the "copy/hardlink" version that this file might be missing
+    # without consequences.
+    optional: bool = True
+
+
+def _entries_walk(repo, includes, excludes, includeobsmarkers: bool):
     """emit a seris of files information useful to clone a repo
 
     return (vfs-key, entry) iterator
@@ -851,14 +992,10 @@
         for name in cacheutil.cachetocopy(repo):
             if repo.cachevfs.exists(name):
                 # not really a StoreEntry, but close enough
-                entry = store.SimpleStoreEntry(
-                    entry_path=name,
-                    is_volatile=True,
-                )
-                yield (_srccache, entry)
+                yield (_srccache, CacheEntry(entry_path=name))
 
 
-def generatev2(repo, includes, excludes, includeobsmarkers):
+def generatev2(repo, includes, excludes, includeobsmarkers: bool):
     """Emit content for version 2 of a streaming clone.
 
     the data stream consists the following entries:
@@ -880,17 +1017,18 @@
             excludes=excludes,
             includeobsmarkers=includeobsmarkers,
         )
-
         chunks = _emit2(repo, entries)
         first = next(chunks)
         file_count, total_file_size = first
-        _test_sync_point_walk_1(repo)
-    _test_sync_point_walk_2(repo)
+        _test_sync_point_walk_3(repo)
+    _test_sync_point_walk_4(repo)
 
     return file_count, total_file_size, chunks
 
 
-def generatev3(repo, includes, excludes, includeobsmarkers):
+def generatev3(
+    repo, includes, excludes, includeobsmarkers: bool
+) -> Iterator[bytes | None]:
     """Emit content for version 3 of a streaming clone.
 
     the data stream consists the following:
@@ -930,8 +1068,8 @@
         chunks = _emit3(repo, list(entries))
         first = next(chunks)
         assert first is None
-        _test_sync_point_walk_1(repo)
-    _test_sync_point_walk_2(repo)
+        _test_sync_point_walk_3(repo)
+    _test_sync_point_walk_4(repo)
 
     return chunks
 
@@ -948,7 +1086,14 @@
             yield
 
 
-def consumev2(repo, fp, filecount, filesize):
+class V2Report:
+    """a small class to track the data we saw within the stream"""
+
+    def __init__(self):
+        self.byte_count = 0
+
+
+def consumev2(repo, fp, filecount: int, filesize: int) -> None:
     """Apply the contents from a version 2 streaming clone.
 
     Data is read from an object that only needs to provide a ``read(size)``
@@ -959,12 +1104,13 @@
             _(b'%d files to transfer, %s of data\n')
             % (filecount, util.bytecount(filesize))
         )
-
+        progress = repo.ui.makeprogress(
+            _(b'clone'),
+            total=filesize,
+            unit=_(b'bytes'),
+        )
         start = util.timer()
-        progress = repo.ui.makeprogress(
-            _(b'clone'), total=filesize, unit=_(b'bytes')
-        )
-        progress.update(0)
+        report = V2Report()
 
         vfsmap = _makemap(repo)
         # we keep repo.vfs out of the on purpose, ther are too many danger
@@ -974,50 +1120,453 @@
         # is fine, while this is really not fine.
         if repo.vfs in vfsmap.values():
             raise error.ProgrammingError(
-                b'repo.vfs must not be added to vfsmap for security reasons'
+                'repo.vfs must not be added to vfsmap for security reasons'
             )
 
+        cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu')
+        mem_profile = scmutil.get_resource_profile(repo.ui, b'memory')
+        threaded = repo.ui.configbool(
+            b"worker", b"parallel-stream-bundle-processing"
+        )
+        num_writer = repo.ui.configint(
+            b"worker",
+            b"parallel-stream-bundle-processing.num-writer",
+        )
+        if num_writer <= 0:
+            num_writer = DEFAULT_NUM_WRITER[cpu_profile]
+        memory_target = repo.ui.configbytes(
+            b"worker",
+            b"parallel-stream-bundle-processing.memory-target",
+        )
+        if memory_target < 0:
+            memory_target = None
+        elif memory_target == 0:
+            memory_target = DEFAULT_MEMORY_TARGET[mem_profile]
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
-                for i in range(filecount):
-                    src = util.readexactly(fp, 1)
-                    vfs = vfsmap[src]
-                    namelen = util.uvarintdecodestream(fp)
-                    datalen = util.uvarintdecodestream(fp)
-
-                    name = util.readexactly(fp, namelen)
+                workers = []
+                info_queue = None
+                data_queue = None
+                mark_used = None
+                try:
+                    if not threaded:
+                        fc = _FileChunker
+                        raw_data = fp
+                    else:
+                        fc = _ThreadSafeFileChunker
+                        data_queue = _DataQueue(memory_target=memory_target)
+                        if memory_target is not None:
+                            mark_used = data_queue.mark_used
+                        raw_data = util.chunkbuffer(data_queue)
 
-                    if repo.ui.debugflag:
-                        repo.ui.debug(
-                            b'adding [%s] %s (%s)\n'
-                            % (src, name, util.bytecount(datalen))
+                        w = threading.Thread(
+                            target=data_queue.fill_from,
+                            args=(fp,),
                         )
+                        workers.append(w)
+                        w.start()
+                    files = _v2_parse_files(
+                        repo,
+                        raw_data,
+                        vfsmap,
+                        filecount,
+                        progress,
+                        report,
+                        file_chunker=fc,
+                        mark_used=mark_used,
+                    )
+                    if not threaded:
+                        _write_files(files)
+                    else:
+                        info_queue = _FileInfoQueue(files)
 
-                    with vfs(name, b'w') as ofp:
-                        for chunk in util.filechunkiter(fp, limit=datalen):
-                            progress.increment(step=len(chunk))
-                            ofp.write(chunk)
+                        for __ in range(num_writer):
+                            w = threading.Thread(
+                                target=_write_files,
+                                args=(info_queue,),
+                            )
+                            workers.append(w)
+                            w.start()
+                        info_queue.fill()
+                except:  # re-raises
+                    if data_queue is not None:
+                        data_queue.abort()
+                    raise
+                finally:
+                    # shut down all the workers
+                    if info_queue is not None:
+                        # this is strictly speaking one too many worker for
+                        # this queu, but closing too many is not a problem.
+                        info_queue.close(len(workers))
+                    for w in workers:
+                        w.join()
 
             # force @filecache properties to be reloaded from
             # streamclone-ed file at next access
             repo.invalidate(clearfilecache=True)
 
-        elapsed = util.timer() - start
-        if elapsed <= 0:
-            elapsed = 0.001
-        repo.ui.status(
-            _(b'transferred %s in %.1f seconds (%s/sec)\n')
-            % (
-                util.bytecount(progress.pos),
-                elapsed,
-                util.bytecount(progress.pos / elapsed),
-            )
-        )
         progress.complete()
+        # acknowledge the end of the bundle2 part, this help aligning
+        # sequential and parallel behavior.
+        remains = fp.read(1)
+        assert not remains
+        _report_transferred(repo, start, filecount, report.byte_count)
+
+
+# iterator of chunk of bytes that constitute a file content.
+FileChunksT = Iterator[bytes]
+# Contains the information necessary to write stream file on disk
+FileInfoT = Tuple[
+    bytes,  # real fs path
+    Optional[int],  # permission to give to chmod
+    FileChunksT,  # content
+]
+
+
+class _Queue:
+    """a reimplementation of queue.Queue which doesn't use thread.Condition"""
+
+    def __init__(self):
+        self._queue = collections.deque()
+
+        # the "_lock" protect manipulation of the "_queue" deque
+        # the "_wait" is used to have the "get" thread waits for the
+        # "put" thread when the queue is empty.
+        #
+        # This is similar to the "threading.Condition", but without the absurd
+        # slowness of the stdlib implementation.
+        #
+        # the "_wait" is always released while holding the "_lock".
+        self._lock = threading.Lock()
+        self._wait = threading.Lock()
+
+    def put(self, item):
+        with self._lock:
+            self._queue.append(item)
+            # if anyone is waiting on item, unblock it.
+            if self._wait.locked():
+                self._wait.release()
+
+    def get(self):
+        with self._lock:
+            while len(self._queue) == 0:
+                # "arm"  the waiting lock
+                self._wait.acquire(blocking=False)
+                # release the lock to let other touch the queue
+                # (especially the put call we wait on)
+                self._lock.release()
+                # wait for for a `put` call to release the lock
+                self._wait.acquire()
+                # grab the lock to look at a possible available value
+                self._lock.acquire()
+                # disarm the lock if necessary.
+                #
+                # If the queue only constains one item, keep the _wait lock
+                # armed, as there is no need to wake another waiter anyway.
+                if self._wait.locked() and len(self._queue) > 1:
+                    self._wait.release()
+            return self._queue.popleft()
+
+
+class _DataQueue:
+    """A queue passing data from the bundle stream to other thread
+
+    It has a "memory_target" optional parameter to avoid buffering too much
+    information. The implementation is not exact and the memory target might be
+    exceed for a time in some situation.
+    """
+
+    def __init__(self, memory_target=None):
+        self._q = _Queue()
+        self._abort = False
+        self._memory_target = memory_target
+        if self._memory_target is not None and self._memory_target <= 0:
+            raise error.ProgrammingError("memory target should be > 0")
+
+        # the "_lock" protect manipulation of the _current_used" variable
+        # the "_wait" is used to have the "reading" thread waits for the
+        # "using" thread when the buffer is full.
+        #
+        # This is similar to the "threading.Condition", but without the absurd
+        # slowness of the stdlib implementation.
+        #
+        # the "_wait" is always released while holding the "_lock".
+        self._lock = threading.Lock()
+        self._wait = threading.Lock()
+        # only the stream reader touch this, it is find to touch without the lock
+        self._current_read = 0
+        # do not touch this without the lock
+        self._current_used = 0
+
+    def _has_free_space(self):
+        """True if more data can be read without further exceeding memory target
+
+        Must be called under the lock.
+        """
+        if self._memory_target is None:
+            # Ideally we should not even get into the locking business in that
+            # case, but we keep the implementation simple for now.
+            return True
+        return (self._current_read - self._current_used) < self._memory_target
+
+    def mark_used(self, offset):
+        """Notify we have used the buffer up to "offset"
+
+        This is meant to be used from another thread than the one filler the queue.
+        """
+        if self._memory_target is not None:
+            with self._lock:
+                if offset > self._current_used:
+                    self._current_used = offset
+                # If the reader is waiting for room, unblock it.
+                if self._wait.locked() and self._has_free_space():
+                    self._wait.release()
+
+    def fill_from(self, data):
+        """fill the data queue from a bundle2 part object
+
+        This is meant to be called by the data reading thread
+        """
+        q = self._q
+        try:
+            for item in data:
+                self._current_read += len(item)
+                q.put(item)
+                if self._abort:
+                    break
+                if self._memory_target is not None:
+                    with self._lock:
+                        while not self._has_free_space():
+                            # make sure the _wait lock is locked
+                            # this is done under lock, so there case be no race with the release logic
+                            self._wait.acquire(blocking=False)
+                            self._lock.release()
+                            # acquiring the lock will block until some other thread release it.
+                            self._wait.acquire()
+                            # lets dive into the locked section again
+                            self._lock.acquire()
+                            # make sure we release the lock we just grabed if
+                            # needed.
+                            if self._wait.locked():
+                                self._wait.release()
+        finally:
+            q.put(None)
+
+    def __iter__(self):
+        """Iterate over the bundle chunkgs
+
+        This is meant to be called by the data parsing thread."""
+        q = self._q
+        while (i := q.get()) is not None:
+            yield i
+            if self._abort:
+                break
+
+    def abort(self):
+        """stop the data-reading thread and interrupt the comsuming iteration
+
+        This is meant to be called on errors.
+        """
+        self._abort = True
+        self._q.put(None)
+        if self._memory_target is not None:
+            with self._lock:
+                # make sure we unstuck the reader thread.
+                if self._wait.locked():
+                    self._wait.release()
 
 
-def consumev3(repo, fp):
+class _FileInfoQueue:
+    """A thread-safe queue to passer parsed file information to the writers"""
+
+    def __init__(self, info: Iterable[FileInfoT]):
+        self._info = info
+        self._q = _Queue()
+
+    def fill(self):
+        """iterate over the parsed information to file the queue
+
+        This is meant to be call from the thread parsing the stream information.
+        """
+        q = self._q
+        for i in self._info:
+            q.put(i)
+
+    def close(self, number_worker):
+        """signal all the workers that we no longer have any file info coming
+
+        Called from the thread parsing the stream information (and/or the main
+        thread if different).
+        """
+        for __ in range(number_worker):
+            self._q.put(None)
+
+    def __iter__(self):
+        """iterate over the available file info
+
+        This is meant to be called from the writer threads.
+        """
+        q = self._q
+        while (i := q.get()) is not None:
+            yield i
+
+
+class _FileChunker:
+    """yield the chunk that constitute a file
+
+    This class exists as the counterpart of the threaded version and
+    would not be very useful on its own.
+    """
+
+    def __init__(
+        self,
+        fp: bundle2mod.unbundlepart,
+        data_len: int,
+        progress: scmutil.progress,
+        report: V2Report,
+        mark_used: Optional[Callable[[int], None]] = None,
+    ):
+        self.report = report
+        self.progress = progress
+        self._chunks = util.filechunkiter(fp, limit=data_len)
+
+    def fill(self) -> None:
+        """Do nothing in non-threading context"""
+
+    def __iter__(self) -> FileChunksT:
+        for chunk in self._chunks:
+            self.report.byte_count += len(chunk)
+            self.progress.increment(step=len(chunk))
+            yield chunk
+
+
+class _ThreadSafeFileChunker(_FileChunker):
+    """yield the chunk that constitute a file
+
+    Make sure you  call the "fill" function in the main thread to read the
+    right data at the right time.
+    """
+
+    def __init__(
+        self,
+        fp: bundle2mod.unbundlepart,
+        data_len: int,
+        progress: scmutil.progress,
+        report: V2Report,
+        mark_used: Optional[Callable[[int], None]] = None,
+    ):
+        super().__init__(fp, data_len, progress, report)
+        self._fp = fp
+        self._queue = _Queue()
+        self._mark_used = mark_used
+
+    def fill(self) -> None:
+        """fill the file chunker queue with data read from the stream
+
+        This is meant to be called from the thread parsing information (and
+        consuming the stream data).
+        """
+        try:
+            for chunk in super().__iter__():
+                offset = self._fp.tell()
+                self._queue.put((chunk, offset))
+        finally:
+            self._queue.put(None)
+
+    def __iter__(self) -> FileChunksT:
+        """Iterate over all the file chunk
+
+        This is meant to be called from the writer threads.
+        """
+        while (info := self._queue.get()) is not None:
+            chunk, offset = info
+            if self._mark_used is not None:
+                self._mark_used(offset)
+            yield chunk
+
+
+def _trivial_file(
+    chunk: bytes,
+    mark_used: Optional[Callable[[int], None]],
+    offset: int,
+) -> FileChunksT:
+    """used for single chunk file,"""
+    if mark_used is not None:
+        mark_used(offset)
+    yield chunk
+
+
+def _v2_parse_files(
+    repo,
+    fp: bundle2mod.unbundlepart,
+    vfs_map,
+    file_count: int,
+    progress: scmutil.progress,
+    report: V2Report,
+    file_chunker: Type[_FileChunker] = _FileChunker,
+    mark_used: Optional[Callable[[int], None]] = None,
+) -> Iterator[FileInfoT]:
+    """do the "stream-parsing" part of stream v2
+
+    The parsed information are yield result for consumption by the "writer"
+    """
+    known_dirs = set()  # set of directory that we know to exists
+    progress.update(0)
+    for i in range(file_count):
+        src = util.readexactly(fp, 1)
+        namelen = util.uvarintdecodestream(fp)
+        datalen = util.uvarintdecodestream(fp)
+
+        name = util.readexactly(fp, namelen)
+
+        if repo.ui.debugflag:
+            repo.ui.debug(
+                b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
+            )
+        vfs = vfs_map[src]
+        path, mode = vfs.prepare_streamed_file(name, known_dirs)
+        if datalen <= util.DEFAULT_FILE_CHUNK:
+            c = fp.read(datalen)
+            offset = fp.tell()
+            report.byte_count += len(c)
+            progress.increment(step=len(c))
+            chunks = _trivial_file(c, mark_used, offset)
+            yield (path, mode, iter(chunks))
+        else:
+            chunks = file_chunker(
+                fp,
+                datalen,
+                progress,
+                report,
+                mark_used=mark_used,
+            )
+            yield (path, mode, iter(chunks))
+            # make sure we read all the chunk before moving to the next file
+            chunks.fill()
+
+
+def _write_files(info: Iterable[FileInfoT]):
+    """write files from parsed data"""
+    io_flags = os.O_WRONLY | os.O_CREAT
+    if pycompat.iswindows:
+        io_flags |= os.O_BINARY
+    for path, mode, data in info:
+        if mode is None:
+            fd = os.open(path, io_flags)
+        else:
+            fd = os.open(path, io_flags, mode=mode)
+        try:
+            for chunk in data:
+                written = os.write(fd, chunk)
+                # write missing pieces if the write was interrupted
+                while written < len(chunk):
+                    written = os.write(fd, chunk[written:])
+        finally:
+            os.close(fd)
+
+
+def consumev3(repo, fp) -> None:
     """Apply the contents from a version 3 streaming clone.
 
     Data is read from an object that only needs to provide a ``read(size)``
@@ -1045,9 +1594,9 @@
         # is fine, while this is really not fine.
         if repo.vfs in vfsmap.values():
             raise error.ProgrammingError(
-                b'repo.vfs must not be added to vfsmap for security reasons'
+                'repo.vfs must not be added to vfsmap for security reasons'
             )
-
+        total_file_count = 0
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
@@ -1056,6 +1605,7 @@
                     if filecount == 0:
                         if repo.ui.debugflag:
                             repo.ui.debug(b'entry with no files [%d]\n' % (i))
+                    total_file_count += filecount
                     for i in range(filecount):
                         src = util.readexactly(fp, 1)
                         vfs = vfsmap[src]
@@ -1079,18 +1629,13 @@
             # streamclone-ed file at next access
             repo.invalidate(clearfilecache=True)
 
-        elapsed = util.timer() - start
-        if elapsed <= 0:
-            elapsed = 0.001
-        msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
-        byte_count = util.bytecount(bytes_transferred)
-        bytes_sec = util.bytecount(bytes_transferred / elapsed)
-        msg %= (byte_count, elapsed, bytes_sec)
-        repo.ui.status(msg)
         progress.complete()
+        _report_transferred(repo, start, total_file_count, bytes_transferred)
 
 
-def applybundlev2(repo, fp, filecount, filesize, requirements):
+def applybundlev2(
+    repo, fp, filecount: int, filesize: int, requirements: Iterable[bytes]
+) -> None:
     from . import localrepo
 
     missingreqs = [r for r in requirements if r not in repo.supported]
@@ -1100,7 +1645,8 @@
             % b', '.join(sorted(missingreqs))
         )
 
-    consumev2(repo, fp, filecount, filesize)
+    with util.nogc():
+        consumev2(repo, fp, filecount, filesize)
 
     repo.requirements = new_stream_clone_requirements(
         repo.requirements,
@@ -1113,7 +1659,7 @@
     nodemap.post_stream_cleanup(repo)
 
 
-def applybundlev3(repo, fp, requirements):
+def applybundlev3(repo, fp, requirements: Iterable[bytes]) -> None:
     from . import localrepo
 
     missingreqs = [r for r in requirements if r not in repo.supported]
@@ -1135,14 +1681,14 @@
     nodemap.post_stream_cleanup(repo)
 
 
-def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
+def _copy_files(src_vfs_map, dst_vfs_map, entries, progress) -> bool:
     hardlink = [True]
 
     def copy_used():
         hardlink[0] = False
         progress.topic = _(b'copying')
 
-    for k, path in entries:
+    for k, path, optional in entries:
         src_vfs = src_vfs_map[k]
         dst_vfs = dst_vfs_map[k]
         src_path = src_vfs.join(path)
@@ -1154,18 +1700,22 @@
             util.makedirs(dirname)
         dst_vfs.register_file(path)
         # XXX we could use the #nb_bytes argument.
-        util.copyfile(
-            src_path,
-            dst_path,
-            hardlink=hardlink[0],
-            no_hardlink_cb=copy_used,
-            check_fs_hardlink=False,
-        )
+        try:
+            util.copyfile(
+                src_path,
+                dst_path,
+                hardlink=hardlink[0],
+                no_hardlink_cb=copy_used,
+                check_fs_hardlink=False,
+            )
+        except FileNotFoundError:
+            if not optional:
+                raise
         progress.increment()
     return hardlink[0]
 
 
-def local_copy(src_repo, dest_repo):
+def local_copy(src_repo, dest_repo) -> None:
     """copy all content from one local repository to another
 
     This is useful for local clone"""
@@ -1215,7 +1765,7 @@
             # to the files while we do the clone, so this is not done yet. We
             # could do this blindly when copying files.
             files = [
-                (vfs_key, f.unencoded_path)
+                (vfs_key, f.unencoded_path, f.optional)
                 for vfs_key, e in entries
                 for f in e.files()
             ]