Mercurial > public > mercurial-scm > hg-stable
changeset 52934:0af8965b668a
stream-clone-v2: use a Queue implementation without a stdlib Condition object
The `threading.Condition` object from the stdlib suffer for excessive
creation/destruction of the underlying locks. This destroy its performance to a
quite noticeable point.
Recreating the same logic with persistent lock yields an absurdly big
performance gain. In practice, most of the slowdown introduced by threading is
recovered.
This benchmark compare the non-threaded baseline, the naive threaded
implementation with memory limitation, the previous changeset and this one.
### benchmark.name = hg.perf.exchange.stream.consume
# bin-env-vars.hg.flavor = default
# bin-env-vars.hg.py-re2-module = default
# benchmark.variants.memory-target = default
# benchmark.variants.num-writer = default
# benchmark.variants.parallel-processing = yes
# benchmark.variants.progress = no
# benchmark.variants.read-from-memory = yes
# benchmark.variants.version = v2
## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog
no-thread: 0.249693 ~~~~~
naive-thread: 0.305973 (+22.54%, +0.06)
prev-change: 0.305442 (+22.33%, +0.06)
this-change: 0.258450 (+3.51%, +0.01)
## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog
no-thread: 13.136674 ~~~~~
naive-thread: 18.467590 (+40.58%, +5.33)
prev-change: 17.788205 (+35.41%, +4.65)
this-change: 13.715187 (+4.40%, +0.58)
## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog
no-thread: 5.317709 ~~~~~
naive-thread: 7.338505 (+38.00%, +2.02)
prev-change: 7.122798 (+33.94%, +1.81)
this-change: 5.687069 (+6.95%, +0.37)
## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog
no-thread: 5.398368 ~~~~~
naive-thread: 7.333354 (+35.84%, +1.93)
prev-change: 7.123533 (+31.96%, +1.73)
this-change: 5.749082 (+6.50%, +0.35)
## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog
no-thread: 3.acbb55 ~~~~~
naive-thread: 4.238172 (+38.11%, +1.17)
prev-change: 4.095587 (+33.47%, +1.03)
this-change: 3.179653 (+3.62%, +0.11)
## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog
no-thread: 7.244015 ~~~~~
naive-thread: 9.901032 (+36.68%, +2.66)
prev-change: 9.728038 (+34.29%, +2.48)
this-change: 7.638277 (+5.44%, +0.39)
## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog
no-thread: 51.934795 ~~~~~
naive-thread: 78.194540 (+50.56%, +26.26)
prev-change: 72.574373 (+39.74%, +20.64)
this-change: 56.267824 (+8.34%, +4.33)
## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog
no-thread: 52.253858 ~~~~~
naive-thread: 77.492938 (+48.30%, +25.24)
prev-change: 72.845963 (+39.41%, +20.59)
this-change: 55.368316 (+5.96%, +3.11)
## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog
# benchmark.variants.read-from-memory = no
no-thread: 130.584329 ~~~~~
naive-thread: 164.366925 (+25.87%, +33.78)
prev-change: 154.873570 (+18.60%, +24.29)
this-change: 132.137611 (+1.19%, +1.55)
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 05 Feb 2025 12:17:00 +0100 |
parents | 363914ba328d |
children | f1ac5117459b |
files | mercurial/streamclone.py |
diffstat | 1 files changed, 46 insertions(+), 4 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/streamclone.py Mon Feb 03 21:31:35 2025 +0100 +++ b/mercurial/streamclone.py Wed Feb 05 12:17:00 2025 +0100 @@ -7,10 +7,10 @@ from __future__ import annotations +import collections import contextlib import errno import os -import queue import struct import threading @@ -1222,6 +1222,48 @@ ] +class _Queue: + """a reimplementation of queue.Queue which doesn't use thread.Condition""" + + def __init__(self): + self._queue = collections.deque() + + # the "_lock" protect manipulation of the "_queue" deque + # the "_wait" is used to have the "get" thread waits for the + # "put" thread when the queue is empty. + # + # This is similar to the "threading.Condition", but without the absurd + # slowness of the stdlib implementation. + # + # the "_wait" is always released while holding the "_lock". + self._lock = threading.Lock() + self._wait = threading.Lock() + + def put(self, item): + with self._lock: + self._queue.append(item) + # if anyone is waiting on item, unblock it. + if self._wait.locked(): + self._wait.release() + + def get(self): + with self._lock: + while len(self._queue) == 0: + # "arm" the waiting lock + self._wait.acquire(blocking=False) + # release the lock to let other touch the queue + # (especially the put call we wait on) + self._lock.release() + # wait for for a `put` call to release the lock + self._wait.acquire() + # grab the lock to look at a possible available value + self._lock.acquire() + # disarm the lock if necessary. + if self._wait.locked(): + self._wait.release() + return self._queue.popleft() + + class _DataQueue: """A queue passing data from the bundle stream to other thread @@ -1231,7 +1273,7 @@ """ def __init__(self, memory_target=None): - self._q = queue.Queue() + self._q = _Queue() self._abort = False self._memory_target = memory_target if self._memory_target is not None and self._memory_target <= 0: @@ -1332,7 +1374,7 @@ def __init__(self, info: Iterable[FileInfoT]): self._info = info - self._q = queue.Queue() + self._q = _Queue() def fill(self): """iterate over the parsed information to file the queue @@ -1408,7 +1450,7 @@ ): super().__init__(fp, data_len, progress, report) self._fp = fp - self._queue = queue.Queue() + self._queue = _Queue() self._mark_used = mark_used def fill(self) -> None: