changeset 52924:7f848cfc4286

stream-clone-v2: use dedicated threads to write the data on disk This could provide a significant performance boost, but various implementation details means it is currently slower. More update will help make this boost real, but here is the basic idea. The implementation in this patch is unbounded in memory which could be a problem in some situation. We will deal with that soon. There is the benchmark result showing a slower run ### benchmark.name = hg.perf.exchange.stream.consume # bin-env-vars.hg.flavor = default # bin-env-vars.hg.py-re2-module = default # benchmark.variants.parallel-processing = yes # benchmark.variants.progress = no # benchmark.variants.read-from-memory = yes # benchmark.variants.version = v2 ## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog before: 0.249693 ~~~~~ after: 0.275081 (+10.17%, +0.03) ## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog before: 5.317709 ~~~~~ after: 6.783031 (+27.56%, +1.47) ## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog before: 5.398368 ~~~~~ after: 6.737864 (+24.81%, +1.34) ## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog before: 3.acbb55 ~~~~~ after: 3.758324 (+22.48%, +0.69) ## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog before: 7.244015 ~~~~~ after: 9.128669 (+26.02%, +1.88) ## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog before: 13.136674 ~~~~~ after: 16.374306 (+24.65%, +3.24) ## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog before: 52.253858 ~~~~~ after: 66.955037 (+28.13%, +14.70) ## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog before: 51.934795 ~~~~~ after: 66.561340 (+28.16%, +14.63) ## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog # benchmark.variants.read-from-memory = no before: 130.584329 ~~~~~ after: 138.770454 (+6.27%, +8.19)
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Mon, 20 Jan 2025 15:13:30 +0100
parents 307c4a0b91a0
children 58baa86c7a02
files mercurial/configitems.toml mercurial/streamclone.py tests/test-stream-bundle-v2.t
diffstat 3 files changed, 153 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/configitems.toml	Mon Jan 20 13:00:21 2025 +0100
+++ b/mercurial/configitems.toml	Mon Jan 20 15:13:30 2025 +0100
@@ -2946,6 +2946,32 @@
 section = "worker"
 name = "numcpus"
 
+
+# experimental until we are happy with the implementation and some sanity
+# checking has been done. Controlled memory usage when request in one of the
+# requirements.
+[[items]]
+section = "worker"
+name = "parallel-stream-bundle-processing"
+default = false
+documentation="""
+Read, parse and write stream bundle in parallel speeding up the process.
+
+Also see `worker.parallel-stream-bundle-processing.num-writer` to control the
+amount of concurrent writers.
+"""
+experimental = true
+
+[[items]]
+section = "worker"
+name = "parallel-stream-bundle-processing.num-writer"
+default = 2
+experimental = true
+documentation="""
+Control the number of file being written concurrently when applying a stream
+bundle.
+"""
+
 # Templates and template applications
 
 [[template-applications]]
--- a/mercurial/streamclone.py	Mon Jan 20 13:00:21 2025 +0100
+++ b/mercurial/streamclone.py	Mon Jan 20 15:13:30 2025 +0100
@@ -10,7 +10,9 @@
 import contextlib
 import errno
 import os
+import queue
 import struct
+import threading
 
 from typing import (
     Iterable,
@@ -1101,17 +1103,48 @@
                 'repo.vfs must not be added to vfsmap for security reasons'
             )
 
+        threaded = repo.ui.configbool(
+            b"worker", b"parallel-stream-bundle-processing"
+        )
+        num_writer = repo.ui.configint(
+            b"worker",
+            b"parallel-stream-bundle-processing.num-writer",
+        )
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
+                if threaded:
+                    fc = _ThreadSafeFileChunker
+                else:
+                    fc = _FileChunker
                 files = _v2_parse_files(
                     repo,
                     fp,
                     filecount,
                     progress,
                     report,
+                    file_chunker=fc,
                 )
-                _write_files(vfsmap, files)
+                if not threaded:
+                    _write_files(vfsmap, files)
+                else:
+                    info_queue = _FileInfoQueue(files)
+
+                    workers = []
+                    try:
+                        for __ in range(num_writer):
+                            w = threading.Thread(
+                                target=_write_files,
+                                args=(vfsmap, info_queue),
+                            )
+                            workers.append(w)
+                            w.start()
+                        info_queue.fill()
+                    finally:
+                        # shut down all the workers
+                        info_queue.close(len(workers))
+                        for w in workers:
+                            w.join()
 
             # force @filecache properties to be reloaded from
             # streamclone-ed file at next access
@@ -1135,10 +1168,45 @@
 ]
 
 
+class _FileInfoQueue:
+    """A thread-safe queue to passer parsed file information to the writers"""
+
+    def __init__(self, info: Iterable[FileInfoT]):
+        self._info = info
+        self._q = queue.Queue()
+
+    def fill(self):
+        """iterate over the parsed information to file the queue
+
+        This is meant to be call from the thread parsing the stream information.
+        """
+        q = self._q
+        for i in self._info:
+            q.put(i)
+
+    def close(self, number_worker):
+        """signal all the workers that we no longer have any file info coming
+
+        Called from the thread parsing the stream information (and/or the main
+        thread if different).
+        """
+        for __ in range(number_worker):
+            self._q.put(None)
+
+    def __iter__(self):
+        """iterate over the available file info
+
+        This is meant to be called from the writer threads.
+        """
+        q = self._q
+        while (i := q.get()) is not None:
+            yield i
+
+
 class _FileChunker:
     """yield the chunk that constitute a file
 
-    This class exists as the counterpart of a future threaded version and
+    This class exists as the counterpart of the threaded version and
     would not be very useful on its own.
     """
 
@@ -1149,11 +1217,13 @@
         progress: scmutil.progress,
         report: V2Report,
     ):
-        self.fp = fp
         self.report = report
         self.progress = progress
         self._chunks = util.filechunkiter(fp, limit=data_len)
 
+    def fill(self) -> None:
+        """Do nothing in non-threading context"""
+
     def __iter__(self) -> FileChunksT:
         for chunk in self._chunks:
             self.report.byte_count += len(chunk)
@@ -1161,6 +1231,44 @@
             yield chunk
 
 
+class _ThreadSafeFileChunker(_FileChunker):
+    """yield the chunk that constitute a file
+
+    Make sure you  call the "fill" function in the main thread to read the
+    right data at the right time.
+    """
+
+    def __init__(
+        self,
+        fp: bundle2mod.unbundlepart,
+        data_len: int,
+        progress: scmutil.progress,
+        report: V2Report,
+    ):
+        super().__init__(fp, data_len, progress, report)
+        self._queue = queue.Queue()
+
+    def fill(self) -> None:
+        """fill the file chunker queue with data read from the stream
+
+        This is meant to be called from the thread parsing information (and
+        consuming the stream data).
+        """
+        try:
+            for chunk in super().__iter__():
+                self._queue.put(chunk)
+        finally:
+            self._queue.put(None)
+
+    def __iter__(self) -> FileChunksT:
+        """Iterate over all the file chunk
+
+        This is meant to be called from the writer threads.
+        """
+        while (chunk := self._queue.get()) is not None:
+            yield chunk
+
+
 def _v2_parse_files(
     repo,
     fp: bundle2mod.unbundlepart,
@@ -1187,6 +1295,8 @@
             )
         chunks = file_chunker(fp, datalen, progress, report)
         yield (src, name, iter(chunks))
+        # make sure we read all the chunk before moving to the next file
+        chunks.fill()
 
 
 def _write_files(vfsmap, info: Iterable[FileInfoT]):
--- a/tests/test-stream-bundle-v2.t	Mon Jan 20 13:00:21 2025 +0100
+++ b/tests/test-stream-bundle-v2.t	Mon Jan 20 15:13:30 2025 +0100
@@ -1,6 +1,20 @@
 #require no-reposimplestore
 
 #testcases stream-v2 stream-v3
+#testcases threaded sequential
+
+#if threaded
+  $ cat << EOF >> $HGRCPATH
+  > [worker]
+  > parallel-stream-bundle-processing = yes
+  > parallel-stream-bundle-processing.num-writer = 2
+  > EOF
+#else
+  $ cat << EOF >> $HGRCPATH
+  > [worker]
+  > parallel-stream-bundle-processing = no
+  > EOF
+#endif
 
 #if stream-v2
   $ bundle_format="streamv2"