changeset 52926:d5ae681834e8

stream-clone-v2: also use a thread to read the streamed data We could do useful thing while waiting for data, so we introduce another thread dedicated to reading the bundle data and pass them to the thread parsing the bundle. This has a small impact on test output because the bundle part is consumed at a different time. However this will be smoothed out very soon so lets ignore that for now. As for the previous patch, the speed up in not there yet. We need to adjust various part of the implementation to see some benefit. However, this put the generic architecture in place so that we can focus on performance later. Below are some benchmark result to highlight the slowdown. ### benchmark.name = hg.perf.exchange.stream.consume # bin-env-vars.hg.flavor = default # bin-env-vars.hg.py-re2-module = default # benchmark.variants.memory-target = default # benchmark.variants.num-writer = default # benchmark.variants.parallel-processing = yes # benchmark.variants.progress = no # benchmark.variants.read-from-memory = yes # benchmark.variants.version = v2 ## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog no-thread: 7.244015 ~~~~~ write-thread: 9.128669 (+26.02%, +1.88) read-thread: 9.387256 (+29.59%, +2.14) ## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog no-thread: 0.249693 ~~~~~ write-thread: 0.275081 (+10.17%, +0.03) read-thread: 0.292601 (+17.18%, +0.04) ## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog no-thread: 13.136674 ~~~~~ write-thread: 16.374306 (+24.65%, +3.24) read-thread: 17.008865 (+29.48%, +3.87) ## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog no-thread: 5.317709 ~~~~~ write-thread: 6.783031 (+27.56%, +1.47) read-thread: 7.107141 (+33.65%, +1.79) ## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog no-thread: 5.398368 ~~~~~ write-thread: 6.737864 (+24.81%, +1.34) read-thread: 7.163505 (+32.70%, +1.77) ## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog no-thread: 3.acbb55 ~~~~~ write-thread: 3.758324 (+22.48%, +0.69) read-thread: 3.907693 (+27.34%, +0.84) ## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog no-thread: 51.934795 ~~~~~ write-thread: 66.561340 (+28.16%, +14.63) read-thread: 66.902619 (+28.82%, +14.97) ## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog no-thread: 52.253858 ~~~~~ write-thread: 66.955037 (+28.13%, +14.70) read-thread: 66.397609 (+27.07%, +14.14) ## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog # benchmark.variants.read-from-memory = no no-thread: 130.584329 ~~~~~ write-thread: 138.770454 (+6.27%, +8.19) read-thread: 145.137477 (+11.14%, +14.55)
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Mon, 20 Jan 2025 17:05:22 +0100
parents 58baa86c7a02
children fdae7c26d038
files mercurial/streamclone.py tests/test-stream-bundle-v2.t
diffstat 2 files changed, 91 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/streamclone.py	Mon Jan 27 19:15:39 2025 +0100
+++ b/mercurial/streamclone.py	Mon Jan 20 17:05:22 2025 +0100
@@ -1126,25 +1126,39 @@
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
-                if threaded:
-                    fc = _ThreadSafeFileChunker
-                else:
-                    fc = _FileChunker
-                files = _v2_parse_files(
-                    repo,
-                    fp,
-                    filecount,
-                    progress,
-                    report,
-                    file_chunker=fc,
-                )
-                if not threaded:
-                    _write_files(vfsmap, files)
-                else:
-                    info_queue = _FileInfoQueue(files)
+                workers = []
+                info_queue = None
+                data_queue = None
+                try:
+                    if not threaded:
+                        fc = _FileChunker
+                        raw_data = fp
+                    else:
+                        fc = _ThreadSafeFileChunker
+                        data_queue = _DataQueue()
+                        raw_data = util.chunkbuffer(data_queue)
 
-                    workers = []
-                    try:
+                        # XXX we will drop this extra filechunkiter layer soon
+                        part_content = util.filechunkiter(fp)
+                        w = threading.Thread(
+                            target=data_queue.fill_from,
+                            args=(part_content,),
+                        )
+                        workers.append(w)
+                        w.start()
+                    files = _v2_parse_files(
+                        repo,
+                        raw_data,
+                        filecount,
+                        progress,
+                        report,
+                        file_chunker=fc,
+                    )
+                    if not threaded:
+                        _write_files(vfsmap, files)
+                    else:
+                        info_queue = _FileInfoQueue(files)
+
                         for __ in range(num_writer):
                             w = threading.Thread(
                                 target=_write_files,
@@ -1153,11 +1167,18 @@
                             workers.append(w)
                             w.start()
                         info_queue.fill()
-                    finally:
-                        # shut down all the workers
+                except:  # re-raises
+                    if data_queue is not None:
+                        data_queue.abort()
+                    raise
+                finally:
+                    # shut down all the workers
+                    if info_queue is not None:
+                        # this is strictly speaking one too many worker for
+                        # this queu, but closing too many is not a problem.
                         info_queue.close(len(workers))
-                        for w in workers:
-                            w.join()
+                    for w in workers:
+                        w.join()
 
             # force @filecache properties to be reloaded from
             # streamclone-ed file at next access
@@ -1181,6 +1202,46 @@
 ]
 
 
+class _DataQueue:
+    """A queue passing data from the bundle stream to other thread"""
+
+    def __init__(self):
+        self._q = queue.Queue()
+        self._abort = False
+
+    def fill_from(self, data):
+        """fill the data queue from a bundle2 part object
+
+        This is meant to be called by the data reading thread
+        """
+        q = self._q
+        try:
+            for item in data:
+                q.put(item)
+                if self._abort:
+                    break
+        finally:
+            q.put(None)
+
+    def __iter__(self):
+        """Iterate over the bundle chunkgs
+
+        This is meant to be called by the data parsing thread."""
+        q = self._q
+        while (i := q.get()) is not None:
+            yield i
+            if self._abort:
+                break
+
+    def abort(self):
+        """stop the data-reading thread and interrup the iteration of the consumer
+
+        This is meant to be called on error.
+        """
+        self._abort = True
+        self._q.put(None)
+
+
 class _FileInfoQueue:
     """A thread-safe queue to passer parsed file information to the writers"""
 
--- a/tests/test-stream-bundle-v2.t	Mon Jan 27 19:15:39 2025 +0100
+++ b/tests/test-stream-bundle-v2.t	Mon Jan 20 17:05:22 2025 +0100
@@ -132,6 +132,8 @@
   14 files to transfer, 1.78 KB of data (rust !)
   starting 4 threads for background file closing (?)
   starting 4 threads for background file closing (?)
+  bundle2-input-part: total payload size 1857 (no-rust threaded !)
+  bundle2-input-part: total payload size 2025 (rust threaded !)
   adding [s] data/A.i (66 bytes)
   adding [s] data/B.i (66 bytes)
   adding [s] data/C.i (66 bytes)
@@ -146,8 +148,8 @@
   adding [c] branch2-served (94 bytes)
   adding [c] rbc-names-v2 (7 bytes)
   adding [c] rbc-revs-v2 (40 bytes)
-  bundle2-input-part: total payload size 1857 (no-rust !)
-  bundle2-input-part: total payload size 2025 (rust !)
+  bundle2-input-part: total payload size 1857 (no-rust no-threaded !)
+  bundle2-input-part: total payload size 2025 (rust no-threaded !)
   stream-cloned 12 files / 1.65 KB in * seconds (* */sec) (glob) (no-rust !)
   stream-cloned 14 files / 1.78 KB in * seconds (* */sec) (glob) (rust !)
   bundle2-input-bundle: 1 parts total
@@ -195,6 +197,8 @@
   14 files to transfer, 1.78 KB of data (rust !)
   starting 4 threads for background file closing (?)
   starting 4 threads for background file closing (?)
+  bundle2-input-part: total payload size 1857 (no-rust threaded !)
+  bundle2-input-part: total payload size 2025 (rust threaded !)
   adding [s] data/A.i (66 bytes)
   adding [s] data/B.i (66 bytes)
   adding [s] data/C.i (66 bytes)
@@ -209,8 +213,8 @@
   adding [c] branch2-served (94 bytes)
   adding [c] rbc-names-v2 (7 bytes)
   adding [c] rbc-revs-v2 (40 bytes)
-  bundle2-input-part: total payload size 1857 (no-rust !)
-  bundle2-input-part: total payload size 2025 (rust !)
+  bundle2-input-part: total payload size 1857 (no-rust no-threaded !)
+  bundle2-input-part: total payload size 2025 (rust no-threaded !)
   stream-cloned 12 files / 1.65 KB in * seconds (* */sec) (glob) (no-rust !)
   stream-cloned 14 files / 1.78 KB in * seconds (* */sec) (glob) (rust !)
   bundle2-input-bundle: 1 parts total