stream-clone-v2: no longer use the stdlib Condition object
The `threading.Condition` object from the stdlib suffer for excessive
creation/destruction of the underlying locks. This destroy its performance to a
quite noticeable point.
Recreating the same logic with persistent lock yield an absurd performance gain
(or recovery from the slow down).
Note that this problem also affect the stdlib `queue.Queue` object. This will be dealt
with in the next changesets.
### benchmark.name = hg.perf.exchange.stream.consume
# bin-env-vars.hg.flavor = default
# bin-env-vars.hg.py-re2-module = default
# benchmark.variants.memory-target = default
# benchmark.variants.num-writer = default
# benchmark.variants.parallel-processing = yes
# benchmark.variants.progress = no
# benchmark.variants.read-from-memory = yes
# benchmark.variants.version = v2
## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog
no-thread: 0.249693 ~~~~~
threaded: 0.292601 (+17.18%, +0.04)
mem-target: 0.305973 (+22.54%, +0.06)
this-change: 0.305442 (+22.33%, +0.06)
## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog
no-thread: 7.244015 ~~~~~
threaded: 9.387256 (+29.59%, +2.14)
mem-target: 9.901032 (+36.68%, +2.66)
this-change: 9.728038 (+34.29%, +2.48)
## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog
no-thread: 13.136674 ~~~~~
threaded: 17.008865 (+29.48%, +3.87)
mem-target: 18.467590 (+40.58%, +5.33)
this-change: 17.788205 (+35.41%, +4.65)
## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog
no-thread: 5.317709 ~~~~~
threaded: 7.107141 (+33.65%, +1.79)
mem-target: 7.338505 (+38.00%, +2.02)
this-change: 7.122798 (+33.94%, +1.81)
## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog
no-thread: 5.398368 ~~~~~
threaded: 7.163505 (+32.70%, +1.77)
mem-target: 7.333354 (+35.84%, +1.93)
this-change: 7.123533 (+31.96%, +1.73)
## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog
no-thread: 3.acbb55 ~~~~~
threaded: 3.907693 (+27.34%, +0.84)
mem-target: 4.238172 (+38.11%, +1.17)
this-change: 4.095587 (+33.47%, +1.03)
## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog
no-thread: 51.934795 ~~~~~
threaded: 66.902619 (+28.82%, +14.97)
mem-target: 78.194540 (+50.56%, +26.26)
this-change: 72.574373 (+39.74%, +20.64)
## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog
no-thread: 52.253858 ~~~~~
threaded: 66.397609 (+27.07%, +14.14)
mem-target: 77.492938 (+48.30%, +25.24)
this-change: 72.845963 (+39.41%, +20.59)
## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog
# benchmark.variants.read-from-memory = no
no-thread: 130.584329 ~~~~~
threaded: 145.137477 (+11.14%, +14.55)
mem-target: 164.366925 (+25.87%, +33.78)
this-change: 154.873570 (+18.60%, +24.29)
--- a/mercurial/streamclone.py Mon Jan 27 18:22:01 2025 +0100
+++ b/mercurial/streamclone.py Mon Feb 03 21:31:35 2025 +0100
@@ -1236,7 +1236,17 @@
self._memory_target = memory_target
if self._memory_target is not None and self._memory_target <= 0:
raise error.ProgrammingError("memory target should be > 0")
- self._memory_condition = threading.Condition()
+
+ # the "_lock" protect manipulation of the _current_used" variable
+ # the "_wait" is used to have the "reading" thread waits for the
+ # "using" thread when the buffer is full.
+ #
+ # This is similar to the "threading.Condition", but without the absurd
+ # slowness of the stdlib implementation.
+ #
+ # the "_wait" is always released while holding the "_lock".
+ self._lock = threading.Lock()
+ self._wait = threading.Lock()
# only the stream reader touch this, it is find to touch without the lock
self._current_read = 0
# do not touch this without the lock
@@ -1245,7 +1255,7 @@
def _has_free_space(self):
"""True if more data can be read without further exceeding memory target
- Must be called under the condition lock.
+ Must be called under the lock.
"""
if self._memory_target is None:
# Ideally we should not even get into the locking business in that
@@ -1258,10 +1268,12 @@
This is meant to be used from another thread than the one filler the queue.
"""
- with self._memory_condition:
+ with self._lock:
if offset > self._current_used:
self._current_used = offset
- self._memory_condition.notify()
+ # If the reader is waiting for room, unblock it.
+ if self._wait.locked() and self._has_free_space():
+ self._wait.release()
def fill_from(self, data):
"""fill the data queue from a bundle2 part object
@@ -1275,10 +1287,20 @@
q.put(item)
if self._abort:
break
- with self._memory_condition:
- self._memory_condition.wait_for(
- self._has_free_space,
- )
+ with self._lock:
+ while not self._has_free_space():
+ # make sure the _wait lock is locked
+ # this is done under lock, so there case be no race with the release logic
+ self._wait.acquire(blocking=False)
+ self._lock.release()
+ # acquiring the lock will block until some other thread release it.
+ self._wait.acquire()
+ # lets dive into the locked section again
+ self._lock.acquire()
+ # make sure we release the lock we just grabed if
+ # needed.
+ if self._wait.locked():
+ self._wait.release()
finally:
q.put(None)
@@ -1299,7 +1321,10 @@
"""
self._abort = True
self._q.put(None)
- self._memory_condition.notify_all()
+ with self._lock:
+ # make sure we unstuck the reader thread.
+ if self._wait.locked():
+ self._wait.release()
class _FileInfoQueue: