mercurial/streamclone.py
changeset 52921 363914ba328d
parent 52920 aee193b1c784
child 52922 0af8965b668a
--- a/mercurial/streamclone.py	Mon Jan 27 18:22:01 2025 +0100
+++ b/mercurial/streamclone.py	Mon Feb 03 21:31:35 2025 +0100
@@ -1236,7 +1236,17 @@
         self._memory_target = memory_target
         if self._memory_target is not None and self._memory_target <= 0:
             raise error.ProgrammingError("memory target should be > 0")
-        self._memory_condition = threading.Condition()
+
+        # the "_lock" protect manipulation of the _current_used" variable
+        # the "_wait" is used to have the "reading" thread waits for the
+        # "using" thread when the buffer is full.
+        #
+        # This is similar to the "threading.Condition", but without the absurd
+        # slowness of the stdlib implementation.
+        #
+        # the "_wait" is always released while holding the "_lock".
+        self._lock = threading.Lock()
+        self._wait = threading.Lock()
         # only the stream reader touch this, it is find to touch without the lock
         self._current_read = 0
         # do not touch this without the lock
@@ -1245,7 +1255,7 @@
     def _has_free_space(self):
         """True if more data can be read without further exceeding memory target
 
-        Must be called under the condition lock.
+        Must be called under the lock.
         """
         if self._memory_target is None:
             # Ideally we should not even get into the locking business in that
@@ -1258,10 +1268,12 @@
 
         This is meant to be used from another thread than the one filler the queue.
         """
-        with self._memory_condition:
+        with self._lock:
             if offset > self._current_used:
                 self._current_used = offset
-            self._memory_condition.notify()
+            # If the reader is waiting for room, unblock it.
+            if self._wait.locked() and self._has_free_space():
+                self._wait.release()
 
     def fill_from(self, data):
         """fill the data queue from a bundle2 part object
@@ -1275,10 +1287,20 @@
                 q.put(item)
                 if self._abort:
                     break
-                with self._memory_condition:
-                    self._memory_condition.wait_for(
-                        self._has_free_space,
-                    )
+                with self._lock:
+                    while not self._has_free_space():
+                        # make sure the _wait lock is locked
+                        # this is done under lock, so there case be no race with the release logic
+                        self._wait.acquire(blocking=False)
+                        self._lock.release()
+                        # acquiring the lock will block until some other thread release it.
+                        self._wait.acquire()
+                        # lets dive into the locked section again
+                        self._lock.acquire()
+                        # make sure we release the lock we just grabed if
+                        # needed.
+                        if self._wait.locked():
+                            self._wait.release()
         finally:
             q.put(None)
 
@@ -1299,7 +1321,10 @@
         """
         self._abort = True
         self._q.put(None)
-        self._memory_condition.notify_all()
+        with self._lock:
+            # make sure we unstuck the reader thread.
+            if self._wait.locked():
+                self._wait.release()
 
 
 class _FileInfoQueue: