mercurial/streamclone.py
changeset 52912 7f848cfc4286
parent 52911 307c4a0b91a0
child 52913 58baa86c7a02
--- a/mercurial/streamclone.py	Mon Jan 20 13:00:21 2025 +0100
+++ b/mercurial/streamclone.py	Mon Jan 20 15:13:30 2025 +0100
@@ -10,7 +10,9 @@
 import contextlib
 import errno
 import os
+import queue
 import struct
+import threading
 
 from typing import (
     Iterable,
@@ -1101,17 +1103,48 @@
                 'repo.vfs must not be added to vfsmap for security reasons'
             )
 
+        threaded = repo.ui.configbool(
+            b"worker", b"parallel-stream-bundle-processing"
+        )
+        num_writer = repo.ui.configint(
+            b"worker",
+            b"parallel-stream-bundle-processing.num-writer",
+        )
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
+                if threaded:
+                    fc = _ThreadSafeFileChunker
+                else:
+                    fc = _FileChunker
                 files = _v2_parse_files(
                     repo,
                     fp,
                     filecount,
                     progress,
                     report,
+                    file_chunker=fc,
                 )
-                _write_files(vfsmap, files)
+                if not threaded:
+                    _write_files(vfsmap, files)
+                else:
+                    info_queue = _FileInfoQueue(files)
+
+                    workers = []
+                    try:
+                        for __ in range(num_writer):
+                            w = threading.Thread(
+                                target=_write_files,
+                                args=(vfsmap, info_queue),
+                            )
+                            workers.append(w)
+                            w.start()
+                        info_queue.fill()
+                    finally:
+                        # shut down all the workers
+                        info_queue.close(len(workers))
+                        for w in workers:
+                            w.join()
 
             # force @filecache properties to be reloaded from
             # streamclone-ed file at next access
@@ -1135,10 +1168,45 @@
 ]
 
 
+class _FileInfoQueue:
+    """A thread-safe queue to passer parsed file information to the writers"""
+
+    def __init__(self, info: Iterable[FileInfoT]):
+        self._info = info
+        self._q = queue.Queue()
+
+    def fill(self):
+        """iterate over the parsed information to file the queue
+
+        This is meant to be call from the thread parsing the stream information.
+        """
+        q = self._q
+        for i in self._info:
+            q.put(i)
+
+    def close(self, number_worker):
+        """signal all the workers that we no longer have any file info coming
+
+        Called from the thread parsing the stream information (and/or the main
+        thread if different).
+        """
+        for __ in range(number_worker):
+            self._q.put(None)
+
+    def __iter__(self):
+        """iterate over the available file info
+
+        This is meant to be called from the writer threads.
+        """
+        q = self._q
+        while (i := q.get()) is not None:
+            yield i
+
+
 class _FileChunker:
     """yield the chunk that constitute a file
 
-    This class exists as the counterpart of a future threaded version and
+    This class exists as the counterpart of the threaded version and
     would not be very useful on its own.
     """
 
@@ -1149,11 +1217,13 @@
         progress: scmutil.progress,
         report: V2Report,
     ):
-        self.fp = fp
         self.report = report
         self.progress = progress
         self._chunks = util.filechunkiter(fp, limit=data_len)
 
+    def fill(self) -> None:
+        """Do nothing in non-threading context"""
+
     def __iter__(self) -> FileChunksT:
         for chunk in self._chunks:
             self.report.byte_count += len(chunk)
@@ -1161,6 +1231,44 @@
             yield chunk
 
 
+class _ThreadSafeFileChunker(_FileChunker):
+    """yield the chunk that constitute a file
+
+    Make sure you  call the "fill" function in the main thread to read the
+    right data at the right time.
+    """
+
+    def __init__(
+        self,
+        fp: bundle2mod.unbundlepart,
+        data_len: int,
+        progress: scmutil.progress,
+        report: V2Report,
+    ):
+        super().__init__(fp, data_len, progress, report)
+        self._queue = queue.Queue()
+
+    def fill(self) -> None:
+        """fill the file chunker queue with data read from the stream
+
+        This is meant to be called from the thread parsing information (and
+        consuming the stream data).
+        """
+        try:
+            for chunk in super().__iter__():
+                self._queue.put(chunk)
+        finally:
+            self._queue.put(None)
+
+    def __iter__(self) -> FileChunksT:
+        """Iterate over all the file chunk
+
+        This is meant to be called from the writer threads.
+        """
+        while (chunk := self._queue.get()) is not None:
+            yield chunk
+
+
 def _v2_parse_files(
     repo,
     fp: bundle2mod.unbundlepart,
@@ -1187,6 +1295,8 @@
             )
         chunks = file_chunker(fp, datalen, progress, report)
         yield (src, name, iter(chunks))
+        # make sure we read all the chunk before moving to the next file
+        chunks.fill()
 
 
 def _write_files(vfsmap, info: Iterable[FileInfoT]):