Mercurial > public > mercurial-scm > hg-stable
changeset 52926:d5ae681834e8
stream-clone-v2: also use a thread to read the streamed data
We could do useful thing while waiting for data, so we introduce another thread
dedicated to reading the bundle data and pass them to the thread parsing the
bundle.
This has a small impact on test output because the bundle part is consumed at a
different time. However this will be smoothed out very soon so lets ignore that
for now.
As for the previous patch, the speed up in not there yet. We need to adjust
various part of the implementation to see some benefit. However, this put the
generic architecture in place so that we can focus on performance later.
Below are some benchmark result to highlight the slowdown.
### benchmark.name = hg.perf.exchange.stream.consume
# bin-env-vars.hg.flavor = default
# bin-env-vars.hg.py-re2-module = default
# benchmark.variants.memory-target = default
# benchmark.variants.num-writer = default
# benchmark.variants.parallel-processing = yes
# benchmark.variants.progress = no
# benchmark.variants.read-from-memory = yes
# benchmark.variants.version = v2
## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog
no-thread: 7.244015 ~~~~~
write-thread: 9.128669 (+26.02%, +1.88)
read-thread: 9.387256 (+29.59%, +2.14)
## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog
no-thread: 0.249693 ~~~~~
write-thread: 0.275081 (+10.17%, +0.03)
read-thread: 0.292601 (+17.18%, +0.04)
## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog
no-thread: 13.136674 ~~~~~
write-thread: 16.374306 (+24.65%, +3.24)
read-thread: 17.008865 (+29.48%, +3.87)
## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog
no-thread: 5.317709 ~~~~~
write-thread: 6.783031 (+27.56%, +1.47)
read-thread: 7.107141 (+33.65%, +1.79)
## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog
no-thread: 5.398368 ~~~~~
write-thread: 6.737864 (+24.81%, +1.34)
read-thread: 7.163505 (+32.70%, +1.77)
## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog
no-thread: 3.acbb55 ~~~~~
write-thread: 3.758324 (+22.48%, +0.69)
read-thread: 3.907693 (+27.34%, +0.84)
## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog
no-thread: 51.934795 ~~~~~
write-thread: 66.561340 (+28.16%, +14.63)
read-thread: 66.902619 (+28.82%, +14.97)
## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog
no-thread: 52.253858 ~~~~~
write-thread: 66.955037 (+28.13%, +14.70)
read-thread: 66.397609 (+27.07%, +14.14)
## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog
# benchmark.variants.read-from-memory = no
no-thread: 130.584329 ~~~~~
write-thread: 138.770454 (+6.27%, +8.19)
read-thread: 145.137477 (+11.14%, +14.55)
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 20 Jan 2025 17:05:22 +0100 |
parents | 58baa86c7a02 |
children | fdae7c26d038 |
files | mercurial/streamclone.py tests/test-stream-bundle-v2.t |
diffstat | 2 files changed, 91 insertions(+), 26 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/streamclone.py Mon Jan 27 19:15:39 2025 +0100 +++ b/mercurial/streamclone.py Mon Jan 20 17:05:22 2025 +0100 @@ -1126,25 +1126,39 @@ with repo.transaction(b'clone'): ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) with nested(*ctxs): - if threaded: - fc = _ThreadSafeFileChunker - else: - fc = _FileChunker - files = _v2_parse_files( - repo, - fp, - filecount, - progress, - report, - file_chunker=fc, - ) - if not threaded: - _write_files(vfsmap, files) - else: - info_queue = _FileInfoQueue(files) + workers = [] + info_queue = None + data_queue = None + try: + if not threaded: + fc = _FileChunker + raw_data = fp + else: + fc = _ThreadSafeFileChunker + data_queue = _DataQueue() + raw_data = util.chunkbuffer(data_queue) - workers = [] - try: + # XXX we will drop this extra filechunkiter layer soon + part_content = util.filechunkiter(fp) + w = threading.Thread( + target=data_queue.fill_from, + args=(part_content,), + ) + workers.append(w) + w.start() + files = _v2_parse_files( + repo, + raw_data, + filecount, + progress, + report, + file_chunker=fc, + ) + if not threaded: + _write_files(vfsmap, files) + else: + info_queue = _FileInfoQueue(files) + for __ in range(num_writer): w = threading.Thread( target=_write_files, @@ -1153,11 +1167,18 @@ workers.append(w) w.start() info_queue.fill() - finally: - # shut down all the workers + except: # re-raises + if data_queue is not None: + data_queue.abort() + raise + finally: + # shut down all the workers + if info_queue is not None: + # this is strictly speaking one too many worker for + # this queu, but closing too many is not a problem. info_queue.close(len(workers)) - for w in workers: - w.join() + for w in workers: + w.join() # force @filecache properties to be reloaded from # streamclone-ed file at next access @@ -1181,6 +1202,46 @@ ] +class _DataQueue: + """A queue passing data from the bundle stream to other thread""" + + def __init__(self): + self._q = queue.Queue() + self._abort = False + + def fill_from(self, data): + """fill the data queue from a bundle2 part object + + This is meant to be called by the data reading thread + """ + q = self._q + try: + for item in data: + q.put(item) + if self._abort: + break + finally: + q.put(None) + + def __iter__(self): + """Iterate over the bundle chunkgs + + This is meant to be called by the data parsing thread.""" + q = self._q + while (i := q.get()) is not None: + yield i + if self._abort: + break + + def abort(self): + """stop the data-reading thread and interrup the iteration of the consumer + + This is meant to be called on error. + """ + self._abort = True + self._q.put(None) + + class _FileInfoQueue: """A thread-safe queue to passer parsed file information to the writers"""
--- a/tests/test-stream-bundle-v2.t Mon Jan 27 19:15:39 2025 +0100 +++ b/tests/test-stream-bundle-v2.t Mon Jan 20 17:05:22 2025 +0100 @@ -132,6 +132,8 @@ 14 files to transfer, 1.78 KB of data (rust !) starting 4 threads for background file closing (?) starting 4 threads for background file closing (?) + bundle2-input-part: total payload size 1857 (no-rust threaded !) + bundle2-input-part: total payload size 2025 (rust threaded !) adding [s] data/A.i (66 bytes) adding [s] data/B.i (66 bytes) adding [s] data/C.i (66 bytes) @@ -146,8 +148,8 @@ adding [c] branch2-served (94 bytes) adding [c] rbc-names-v2 (7 bytes) adding [c] rbc-revs-v2 (40 bytes) - bundle2-input-part: total payload size 1857 (no-rust !) - bundle2-input-part: total payload size 2025 (rust !) + bundle2-input-part: total payload size 1857 (no-rust no-threaded !) + bundle2-input-part: total payload size 2025 (rust no-threaded !) stream-cloned 12 files / 1.65 KB in * seconds (* */sec) (glob) (no-rust !) stream-cloned 14 files / 1.78 KB in * seconds (* */sec) (glob) (rust !) bundle2-input-bundle: 1 parts total @@ -195,6 +197,8 @@ 14 files to transfer, 1.78 KB of data (rust !) starting 4 threads for background file closing (?) starting 4 threads for background file closing (?) + bundle2-input-part: total payload size 1857 (no-rust threaded !) + bundle2-input-part: total payload size 2025 (rust threaded !) adding [s] data/A.i (66 bytes) adding [s] data/B.i (66 bytes) adding [s] data/C.i (66 bytes) @@ -209,8 +213,8 @@ adding [c] branch2-served (94 bytes) adding [c] rbc-names-v2 (7 bytes) adding [c] rbc-revs-v2 (40 bytes) - bundle2-input-part: total payload size 1857 (no-rust !) - bundle2-input-part: total payload size 2025 (rust !) + bundle2-input-part: total payload size 1857 (no-rust no-threaded !) + bundle2-input-part: total payload size 2025 (rust no-threaded !) stream-cloned 12 files / 1.65 KB in * seconds (* */sec) (glob) (no-rust !) stream-cloned 14 files / 1.78 KB in * seconds (* */sec) (glob) (rust !) bundle2-input-bundle: 1 parts total