--- a/mercurial/streamclone.py Mon Jan 20 13:00:21 2025 +0100
+++ b/mercurial/streamclone.py Mon Jan 20 15:13:30 2025 +0100
@@ -10,7 +10,9 @@
import contextlib
import errno
import os
+import queue
import struct
+import threading
from typing import (
Iterable,
@@ -1101,17 +1103,48 @@
'repo.vfs must not be added to vfsmap for security reasons'
)
+ threaded = repo.ui.configbool(
+ b"worker", b"parallel-stream-bundle-processing"
+ )
+ num_writer = repo.ui.configint(
+ b"worker",
+ b"parallel-stream-bundle-processing.num-writer",
+ )
with repo.transaction(b'clone'):
ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
with nested(*ctxs):
+ if threaded:
+ fc = _ThreadSafeFileChunker
+ else:
+ fc = _FileChunker
files = _v2_parse_files(
repo,
fp,
filecount,
progress,
report,
+ file_chunker=fc,
)
- _write_files(vfsmap, files)
+ if not threaded:
+ _write_files(vfsmap, files)
+ else:
+ info_queue = _FileInfoQueue(files)
+
+ workers = []
+ try:
+ for __ in range(num_writer):
+ w = threading.Thread(
+ target=_write_files,
+ args=(vfsmap, info_queue),
+ )
+ workers.append(w)
+ w.start()
+ info_queue.fill()
+ finally:
+ # shut down all the workers
+ info_queue.close(len(workers))
+ for w in workers:
+ w.join()
# force @filecache properties to be reloaded from
# streamclone-ed file at next access
@@ -1135,10 +1168,45 @@
]
+class _FileInfoQueue:
+ """A thread-safe queue to passer parsed file information to the writers"""
+
+ def __init__(self, info: Iterable[FileInfoT]):
+ self._info = info
+ self._q = queue.Queue()
+
+ def fill(self):
+ """iterate over the parsed information to file the queue
+
+ This is meant to be call from the thread parsing the stream information.
+ """
+ q = self._q
+ for i in self._info:
+ q.put(i)
+
+ def close(self, number_worker):
+ """signal all the workers that we no longer have any file info coming
+
+ Called from the thread parsing the stream information (and/or the main
+ thread if different).
+ """
+ for __ in range(number_worker):
+ self._q.put(None)
+
+ def __iter__(self):
+ """iterate over the available file info
+
+ This is meant to be called from the writer threads.
+ """
+ q = self._q
+ while (i := q.get()) is not None:
+ yield i
+
+
class _FileChunker:
"""yield the chunk that constitute a file
- This class exists as the counterpart of a future threaded version and
+ This class exists as the counterpart of the threaded version and
would not be very useful on its own.
"""
@@ -1149,11 +1217,13 @@
progress: scmutil.progress,
report: V2Report,
):
- self.fp = fp
self.report = report
self.progress = progress
self._chunks = util.filechunkiter(fp, limit=data_len)
+ def fill(self) -> None:
+ """Do nothing in non-threading context"""
+
def __iter__(self) -> FileChunksT:
for chunk in self._chunks:
self.report.byte_count += len(chunk)
@@ -1161,6 +1231,44 @@
yield chunk
+class _ThreadSafeFileChunker(_FileChunker):
+ """yield the chunk that constitute a file
+
+ Make sure you call the "fill" function in the main thread to read the
+ right data at the right time.
+ """
+
+ def __init__(
+ self,
+ fp: bundle2mod.unbundlepart,
+ data_len: int,
+ progress: scmutil.progress,
+ report: V2Report,
+ ):
+ super().__init__(fp, data_len, progress, report)
+ self._queue = queue.Queue()
+
+ def fill(self) -> None:
+ """fill the file chunker queue with data read from the stream
+
+ This is meant to be called from the thread parsing information (and
+ consuming the stream data).
+ """
+ try:
+ for chunk in super().__iter__():
+ self._queue.put(chunk)
+ finally:
+ self._queue.put(None)
+
+ def __iter__(self) -> FileChunksT:
+ """Iterate over all the file chunk
+
+ This is meant to be called from the writer threads.
+ """
+ while (chunk := self._queue.get()) is not None:
+ yield chunk
+
+
def _v2_parse_files(
repo,
fp: bundle2mod.unbundlepart,
@@ -1187,6 +1295,8 @@
)
chunks = file_chunker(fp, datalen, progress, report)
yield (src, name, iter(chunks))
+ # make sure we read all the chunk before moving to the next file
+ chunks.fill()
def _write_files(vfsmap, info: Iterable[FileInfoT]):