changeset 52934:0af8965b668a

stream-clone-v2: use a Queue implementation without a stdlib Condition object The `threading.Condition` object from the stdlib suffer for excessive creation/destruction of the underlying locks. This destroy its performance to a quite noticeable point. Recreating the same logic with persistent lock yields an absurdly big performance gain. In practice, most of the slowdown introduced by threading is recovered. This benchmark compare the non-threaded baseline, the naive threaded implementation with memory limitation, the previous changeset and this one. ### benchmark.name = hg.perf.exchange.stream.consume # bin-env-vars.hg.flavor = default # bin-env-vars.hg.py-re2-module = default # benchmark.variants.memory-target = default # benchmark.variants.num-writer = default # benchmark.variants.parallel-processing = yes # benchmark.variants.progress = no # benchmark.variants.read-from-memory = yes # benchmark.variants.version = v2 ## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog no-thread: 0.249693 ~~~~~ naive-thread: 0.305973 (+22.54%, +0.06) prev-change: 0.305442 (+22.33%, +0.06) this-change: 0.258450 (+3.51%, +0.01) ## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog no-thread: 13.136674 ~~~~~ naive-thread: 18.467590 (+40.58%, +5.33) prev-change: 17.788205 (+35.41%, +4.65) this-change: 13.715187 (+4.40%, +0.58) ## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog no-thread: 5.317709 ~~~~~ naive-thread: 7.338505 (+38.00%, +2.02) prev-change: 7.122798 (+33.94%, +1.81) this-change: 5.687069 (+6.95%, +0.37) ## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog no-thread: 5.398368 ~~~~~ naive-thread: 7.333354 (+35.84%, +1.93) prev-change: 7.123533 (+31.96%, +1.73) this-change: 5.749082 (+6.50%, +0.35) ## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog no-thread: 3.acbb55 ~~~~~ naive-thread: 4.238172 (+38.11%, +1.17) prev-change: 4.095587 (+33.47%, +1.03) this-change: 3.179653 (+3.62%, +0.11) ## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog no-thread: 7.244015 ~~~~~ naive-thread: 9.901032 (+36.68%, +2.66) prev-change: 9.728038 (+34.29%, +2.48) this-change: 7.638277 (+5.44%, +0.39) ## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog no-thread: 51.934795 ~~~~~ naive-thread: 78.194540 (+50.56%, +26.26) prev-change: 72.574373 (+39.74%, +20.64) this-change: 56.267824 (+8.34%, +4.33) ## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog no-thread: 52.253858 ~~~~~ naive-thread: 77.492938 (+48.30%, +25.24) prev-change: 72.845963 (+39.41%, +20.59) this-change: 55.368316 (+5.96%, +3.11) ## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog # benchmark.variants.read-from-memory = no no-thread: 130.584329 ~~~~~ naive-thread: 164.366925 (+25.87%, +33.78) prev-change: 154.873570 (+18.60%, +24.29) this-change: 132.137611 (+1.19%, +1.55)
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 05 Feb 2025 12:17:00 +0100
parents 363914ba328d
children f1ac5117459b
files mercurial/streamclone.py
diffstat 1 files changed, 46 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/streamclone.py	Mon Feb 03 21:31:35 2025 +0100
+++ b/mercurial/streamclone.py	Wed Feb 05 12:17:00 2025 +0100
@@ -7,10 +7,10 @@
 
 from __future__ import annotations
 
+import collections
 import contextlib
 import errno
 import os
-import queue
 import struct
 import threading
 
@@ -1222,6 +1222,48 @@
 ]
 
 
+class _Queue:
+    """a reimplementation of queue.Queue which doesn't use thread.Condition"""
+
+    def __init__(self):
+        self._queue = collections.deque()
+
+        # the "_lock" protect manipulation of the "_queue" deque
+        # the "_wait" is used to have the "get" thread waits for the
+        # "put" thread when the queue is empty.
+        #
+        # This is similar to the "threading.Condition", but without the absurd
+        # slowness of the stdlib implementation.
+        #
+        # the "_wait" is always released while holding the "_lock".
+        self._lock = threading.Lock()
+        self._wait = threading.Lock()
+
+    def put(self, item):
+        with self._lock:
+            self._queue.append(item)
+            # if anyone is waiting on item, unblock it.
+            if self._wait.locked():
+                self._wait.release()
+
+    def get(self):
+        with self._lock:
+            while len(self._queue) == 0:
+                # "arm"  the waiting lock
+                self._wait.acquire(blocking=False)
+                # release the lock to let other touch the queue
+                # (especially the put call we wait on)
+                self._lock.release()
+                # wait for for a `put` call to release the lock
+                self._wait.acquire()
+                # grab the lock to look at a possible available value
+                self._lock.acquire()
+                # disarm the lock if necessary.
+                if self._wait.locked():
+                    self._wait.release()
+            return self._queue.popleft()
+
+
 class _DataQueue:
     """A queue passing data from the bundle stream to other thread
 
@@ -1231,7 +1273,7 @@
     """
 
     def __init__(self, memory_target=None):
-        self._q = queue.Queue()
+        self._q = _Queue()
         self._abort = False
         self._memory_target = memory_target
         if self._memory_target is not None and self._memory_target <= 0:
@@ -1332,7 +1374,7 @@
 
     def __init__(self, info: Iterable[FileInfoT]):
         self._info = info
-        self._q = queue.Queue()
+        self._q = _Queue()
 
     def fill(self):
         """iterate over the parsed information to file the queue
@@ -1408,7 +1450,7 @@
     ):
         super().__init__(fp, data_len, progress, report)
         self._fp = fp
-        self._queue = queue.Queue()
+        self._queue = _Queue()
         self._mark_used = mark_used
 
     def fill(self) -> None: