Mercurial > public > mercurial-scm > hg-stable
changeset 52924:7f848cfc4286
stream-clone-v2: use dedicated threads to write the data on disk
This could provide a significant performance boost, but various implementation
details means it is currently slower. More update will help make this boost
real, but here is the basic idea. The implementation in this patch is unbounded
in memory which could be a problem in some situation. We will deal with that
soon.
There is the benchmark result showing a slower run
### benchmark.name = hg.perf.exchange.stream.consume
# bin-env-vars.hg.flavor = default
# bin-env-vars.hg.py-re2-module = default
# benchmark.variants.parallel-processing = yes
# benchmark.variants.progress = no
# benchmark.variants.read-from-memory = yes
# benchmark.variants.version = v2
## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog
before: 0.249693 ~~~~~
after: 0.275081 (+10.17%, +0.03)
## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog
before: 5.317709 ~~~~~
after: 6.783031 (+27.56%, +1.47)
## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog
before: 5.398368 ~~~~~
after: 6.737864 (+24.81%, +1.34)
## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog
before: 3.acbb55 ~~~~~
after: 3.758324 (+22.48%, +0.69)
## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog
before: 7.244015 ~~~~~
after: 9.128669 (+26.02%, +1.88)
## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog
before: 13.136674 ~~~~~
after: 16.374306 (+24.65%, +3.24)
## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog
before: 52.253858 ~~~~~
after: 66.955037 (+28.13%, +14.70)
## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog
before: 51.934795 ~~~~~
after: 66.561340 (+28.16%, +14.63)
## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog
# benchmark.variants.read-from-memory = no
before: 130.584329 ~~~~~
after: 138.770454 (+6.27%, +8.19)
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 20 Jan 2025 15:13:30 +0100 |
parents | 307c4a0b91a0 |
children | 58baa86c7a02 |
files | mercurial/configitems.toml mercurial/streamclone.py tests/test-stream-bundle-v2.t |
diffstat | 3 files changed, 153 insertions(+), 3 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/configitems.toml Mon Jan 20 13:00:21 2025 +0100 +++ b/mercurial/configitems.toml Mon Jan 20 15:13:30 2025 +0100 @@ -2946,6 +2946,32 @@ section = "worker" name = "numcpus" + +# experimental until we are happy with the implementation and some sanity +# checking has been done. Controlled memory usage when request in one of the +# requirements. +[[items]] +section = "worker" +name = "parallel-stream-bundle-processing" +default = false +documentation=""" +Read, parse and write stream bundle in parallel speeding up the process. + +Also see `worker.parallel-stream-bundle-processing.num-writer` to control the +amount of concurrent writers. +""" +experimental = true + +[[items]] +section = "worker" +name = "parallel-stream-bundle-processing.num-writer" +default = 2 +experimental = true +documentation=""" +Control the number of file being written concurrently when applying a stream +bundle. +""" + # Templates and template applications [[template-applications]]
--- a/mercurial/streamclone.py Mon Jan 20 13:00:21 2025 +0100 +++ b/mercurial/streamclone.py Mon Jan 20 15:13:30 2025 +0100 @@ -10,7 +10,9 @@ import contextlib import errno import os +import queue import struct +import threading from typing import ( Iterable, @@ -1101,17 +1103,48 @@ 'repo.vfs must not be added to vfsmap for security reasons' ) + threaded = repo.ui.configbool( + b"worker", b"parallel-stream-bundle-processing" + ) + num_writer = repo.ui.configint( + b"worker", + b"parallel-stream-bundle-processing.num-writer", + ) with repo.transaction(b'clone'): ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) with nested(*ctxs): + if threaded: + fc = _ThreadSafeFileChunker + else: + fc = _FileChunker files = _v2_parse_files( repo, fp, filecount, progress, report, + file_chunker=fc, ) - _write_files(vfsmap, files) + if not threaded: + _write_files(vfsmap, files) + else: + info_queue = _FileInfoQueue(files) + + workers = [] + try: + for __ in range(num_writer): + w = threading.Thread( + target=_write_files, + args=(vfsmap, info_queue), + ) + workers.append(w) + w.start() + info_queue.fill() + finally: + # shut down all the workers + info_queue.close(len(workers)) + for w in workers: + w.join() # force @filecache properties to be reloaded from # streamclone-ed file at next access @@ -1135,10 +1168,45 @@ ] +class _FileInfoQueue: + """A thread-safe queue to passer parsed file information to the writers""" + + def __init__(self, info: Iterable[FileInfoT]): + self._info = info + self._q = queue.Queue() + + def fill(self): + """iterate over the parsed information to file the queue + + This is meant to be call from the thread parsing the stream information. + """ + q = self._q + for i in self._info: + q.put(i) + + def close(self, number_worker): + """signal all the workers that we no longer have any file info coming + + Called from the thread parsing the stream information (and/or the main + thread if different). + """ + for __ in range(number_worker): + self._q.put(None) + + def __iter__(self): + """iterate over the available file info + + This is meant to be called from the writer threads. + """ + q = self._q + while (i := q.get()) is not None: + yield i + + class _FileChunker: """yield the chunk that constitute a file - This class exists as the counterpart of a future threaded version and + This class exists as the counterpart of the threaded version and would not be very useful on its own. """ @@ -1149,11 +1217,13 @@ progress: scmutil.progress, report: V2Report, ): - self.fp = fp self.report = report self.progress = progress self._chunks = util.filechunkiter(fp, limit=data_len) + def fill(self) -> None: + """Do nothing in non-threading context""" + def __iter__(self) -> FileChunksT: for chunk in self._chunks: self.report.byte_count += len(chunk) @@ -1161,6 +1231,44 @@ yield chunk +class _ThreadSafeFileChunker(_FileChunker): + """yield the chunk that constitute a file + + Make sure you call the "fill" function in the main thread to read the + right data at the right time. + """ + + def __init__( + self, + fp: bundle2mod.unbundlepart, + data_len: int, + progress: scmutil.progress, + report: V2Report, + ): + super().__init__(fp, data_len, progress, report) + self._queue = queue.Queue() + + def fill(self) -> None: + """fill the file chunker queue with data read from the stream + + This is meant to be called from the thread parsing information (and + consuming the stream data). + """ + try: + for chunk in super().__iter__(): + self._queue.put(chunk) + finally: + self._queue.put(None) + + def __iter__(self) -> FileChunksT: + """Iterate over all the file chunk + + This is meant to be called from the writer threads. + """ + while (chunk := self._queue.get()) is not None: + yield chunk + + def _v2_parse_files( repo, fp: bundle2mod.unbundlepart, @@ -1187,6 +1295,8 @@ ) chunks = file_chunker(fp, datalen, progress, report) yield (src, name, iter(chunks)) + # make sure we read all the chunk before moving to the next file + chunks.fill() def _write_files(vfsmap, info: Iterable[FileInfoT]):
--- a/tests/test-stream-bundle-v2.t Mon Jan 20 13:00:21 2025 +0100 +++ b/tests/test-stream-bundle-v2.t Mon Jan 20 15:13:30 2025 +0100 @@ -1,6 +1,20 @@ #require no-reposimplestore #testcases stream-v2 stream-v3 +#testcases threaded sequential + +#if threaded + $ cat << EOF >> $HGRCPATH + > [worker] + > parallel-stream-bundle-processing = yes + > parallel-stream-bundle-processing.num-writer = 2 + > EOF +#else + $ cat << EOF >> $HGRCPATH + > [worker] + > parallel-stream-bundle-processing = no + > EOF +#endif #if stream-v2 $ bundle_format="streamv2"