changeset 52921:363914ba328d

stream-clone-v2: no longer use the stdlib Condition object The `threading.Condition` object from the stdlib suffer for excessive creation/destruction of the underlying locks. This destroy its performance to a quite noticeable point. Recreating the same logic with persistent lock yield an absurd performance gain (or recovery from the slow down). Note that this problem also affect the stdlib `queue.Queue` object. This will be dealt with in the next changesets. ### benchmark.name = hg.perf.exchange.stream.consume # bin-env-vars.hg.flavor = default # bin-env-vars.hg.py-re2-module = default # benchmark.variants.memory-target = default # benchmark.variants.num-writer = default # benchmark.variants.parallel-processing = yes # benchmark.variants.progress = no # benchmark.variants.read-from-memory = yes # benchmark.variants.version = v2 ## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog no-thread: 0.249693 ~~~~~ threaded: 0.292601 (+17.18%, +0.04) mem-target: 0.305973 (+22.54%, +0.06) this-change: 0.305442 (+22.33%, +0.06) ## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog no-thread: 7.244015 ~~~~~ threaded: 9.387256 (+29.59%, +2.14) mem-target: 9.901032 (+36.68%, +2.66) this-change: 9.728038 (+34.29%, +2.48) ## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog no-thread: 13.136674 ~~~~~ threaded: 17.008865 (+29.48%, +3.87) mem-target: 18.467590 (+40.58%, +5.33) this-change: 17.788205 (+35.41%, +4.65) ## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog no-thread: 5.317709 ~~~~~ threaded: 7.107141 (+33.65%, +1.79) mem-target: 7.338505 (+38.00%, +2.02) this-change: 7.122798 (+33.94%, +1.81) ## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog no-thread: 5.398368 ~~~~~ threaded: 7.163505 (+32.70%, +1.77) mem-target: 7.333354 (+35.84%, +1.93) this-change: 7.123533 (+31.96%, +1.73) ## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog no-thread: 3.acbb55 ~~~~~ threaded: 3.907693 (+27.34%, +0.84) mem-target: 4.238172 (+38.11%, +1.17) this-change: 4.095587 (+33.47%, +1.03) ## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog no-thread: 51.934795 ~~~~~ threaded: 66.902619 (+28.82%, +14.97) mem-target: 78.194540 (+50.56%, +26.26) this-change: 72.574373 (+39.74%, +20.64) ## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog no-thread: 52.253858 ~~~~~ threaded: 66.397609 (+27.07%, +14.14) mem-target: 77.492938 (+48.30%, +25.24) this-change: 72.845963 (+39.41%, +20.59) ## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog # benchmark.variants.read-from-memory = no no-thread: 130.584329 ~~~~~ threaded: 145.137477 (+11.14%, +14.55) mem-target: 164.366925 (+25.87%, +33.78) this-change: 154.873570 (+18.60%, +24.29)
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Mon, 03 Feb 2025 21:31:35 +0100
parents aee193b1c784
children 0af8965b668a
files mercurial/streamclone.py
diffstat 1 files changed, 34 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/streamclone.py	Mon Jan 27 18:22:01 2025 +0100
+++ b/mercurial/streamclone.py	Mon Feb 03 21:31:35 2025 +0100
@@ -1236,7 +1236,17 @@
         self._memory_target = memory_target
         if self._memory_target is not None and self._memory_target <= 0:
             raise error.ProgrammingError("memory target should be > 0")
-        self._memory_condition = threading.Condition()
+
+        # the "_lock" protect manipulation of the _current_used" variable
+        # the "_wait" is used to have the "reading" thread waits for the
+        # "using" thread when the buffer is full.
+        #
+        # This is similar to the "threading.Condition", but without the absurd
+        # slowness of the stdlib implementation.
+        #
+        # the "_wait" is always released while holding the "_lock".
+        self._lock = threading.Lock()
+        self._wait = threading.Lock()
         # only the stream reader touch this, it is find to touch without the lock
         self._current_read = 0
         # do not touch this without the lock
@@ -1245,7 +1255,7 @@
     def _has_free_space(self):
         """True if more data can be read without further exceeding memory target
 
-        Must be called under the condition lock.
+        Must be called under the lock.
         """
         if self._memory_target is None:
             # Ideally we should not even get into the locking business in that
@@ -1258,10 +1268,12 @@
 
         This is meant to be used from another thread than the one filler the queue.
         """
-        with self._memory_condition:
+        with self._lock:
             if offset > self._current_used:
                 self._current_used = offset
-            self._memory_condition.notify()
+            # If the reader is waiting for room, unblock it.
+            if self._wait.locked() and self._has_free_space():
+                self._wait.release()
 
     def fill_from(self, data):
         """fill the data queue from a bundle2 part object
@@ -1275,10 +1287,20 @@
                 q.put(item)
                 if self._abort:
                     break
-                with self._memory_condition:
-                    self._memory_condition.wait_for(
-                        self._has_free_space,
-                    )
+                with self._lock:
+                    while not self._has_free_space():
+                        # make sure the _wait lock is locked
+                        # this is done under lock, so there case be no race with the release logic
+                        self._wait.acquire(blocking=False)
+                        self._lock.release()
+                        # acquiring the lock will block until some other thread release it.
+                        self._wait.acquire()
+                        # lets dive into the locked section again
+                        self._lock.acquire()
+                        # make sure we release the lock we just grabed if
+                        # needed.
+                        if self._wait.locked():
+                            self._wait.release()
         finally:
             q.put(None)
 
@@ -1299,7 +1321,10 @@
         """
         self._abort = True
         self._q.put(None)
-        self._memory_condition.notify_all()
+        with self._lock:
+            # make sure we unstuck the reader thread.
+            if self._wait.locked():
+                self._wait.release()
 
 
 class _FileInfoQueue: