diff -r 307c4a0b91a0 -r 7f848cfc4286 mercurial/streamclone.py --- a/mercurial/streamclone.py Mon Jan 20 13:00:21 2025 +0100 +++ b/mercurial/streamclone.py Mon Jan 20 15:13:30 2025 +0100 @@ -10,7 +10,9 @@ import contextlib import errno import os +import queue import struct +import threading from typing import ( Iterable, @@ -1101,17 +1103,48 @@ 'repo.vfs must not be added to vfsmap for security reasons' ) + threaded = repo.ui.configbool( + b"worker", b"parallel-stream-bundle-processing" + ) + num_writer = repo.ui.configint( + b"worker", + b"parallel-stream-bundle-processing.num-writer", + ) with repo.transaction(b'clone'): ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) with nested(*ctxs): + if threaded: + fc = _ThreadSafeFileChunker + else: + fc = _FileChunker files = _v2_parse_files( repo, fp, filecount, progress, report, + file_chunker=fc, ) - _write_files(vfsmap, files) + if not threaded: + _write_files(vfsmap, files) + else: + info_queue = _FileInfoQueue(files) + + workers = [] + try: + for __ in range(num_writer): + w = threading.Thread( + target=_write_files, + args=(vfsmap, info_queue), + ) + workers.append(w) + w.start() + info_queue.fill() + finally: + # shut down all the workers + info_queue.close(len(workers)) + for w in workers: + w.join() # force @filecache properties to be reloaded from # streamclone-ed file at next access @@ -1135,10 +1168,45 @@ ] +class _FileInfoQueue: + """A thread-safe queue to passer parsed file information to the writers""" + + def __init__(self, info: Iterable[FileInfoT]): + self._info = info + self._q = queue.Queue() + + def fill(self): + """iterate over the parsed information to file the queue + + This is meant to be call from the thread parsing the stream information. + """ + q = self._q + for i in self._info: + q.put(i) + + def close(self, number_worker): + """signal all the workers that we no longer have any file info coming + + Called from the thread parsing the stream information (and/or the main + thread if different). + """ + for __ in range(number_worker): + self._q.put(None) + + def __iter__(self): + """iterate over the available file info + + This is meant to be called from the writer threads. + """ + q = self._q + while (i := q.get()) is not None: + yield i + + class _FileChunker: """yield the chunk that constitute a file - This class exists as the counterpart of a future threaded version and + This class exists as the counterpart of the threaded version and would not be very useful on its own. """ @@ -1149,11 +1217,13 @@ progress: scmutil.progress, report: V2Report, ): - self.fp = fp self.report = report self.progress = progress self._chunks = util.filechunkiter(fp, limit=data_len) + def fill(self) -> None: + """Do nothing in non-threading context""" + def __iter__(self) -> FileChunksT: for chunk in self._chunks: self.report.byte_count += len(chunk) @@ -1161,6 +1231,44 @@ yield chunk +class _ThreadSafeFileChunker(_FileChunker): + """yield the chunk that constitute a file + + Make sure you call the "fill" function in the main thread to read the + right data at the right time. + """ + + def __init__( + self, + fp: bundle2mod.unbundlepart, + data_len: int, + progress: scmutil.progress, + report: V2Report, + ): + super().__init__(fp, data_len, progress, report) + self._queue = queue.Queue() + + def fill(self) -> None: + """fill the file chunker queue with data read from the stream + + This is meant to be called from the thread parsing information (and + consuming the stream data). + """ + try: + for chunk in super().__iter__(): + self._queue.put(chunk) + finally: + self._queue.put(None) + + def __iter__(self) -> FileChunksT: + """Iterate over all the file chunk + + This is meant to be called from the writer threads. + """ + while (chunk := self._queue.get()) is not None: + yield chunk + + def _v2_parse_files( repo, fp: bundle2mod.unbundlepart, @@ -1187,6 +1295,8 @@ ) chunks = file_chunker(fp, datalen, progress, report) yield (src, name, iter(chunks)) + # make sure we read all the chunk before moving to the next file + chunks.fill() def _write_files(vfsmap, info: Iterable[FileInfoT]):