--- a/mercurial/streamclone.py Mon Jan 27 18:22:01 2025 +0100
+++ b/mercurial/streamclone.py Mon Feb 03 21:31:35 2025 +0100
@@ -1236,7 +1236,17 @@
self._memory_target = memory_target
if self._memory_target is not None and self._memory_target <= 0:
raise error.ProgrammingError("memory target should be > 0")
- self._memory_condition = threading.Condition()
+
+ # the "_lock" protect manipulation of the _current_used" variable
+ # the "_wait" is used to have the "reading" thread waits for the
+ # "using" thread when the buffer is full.
+ #
+ # This is similar to the "threading.Condition", but without the absurd
+ # slowness of the stdlib implementation.
+ #
+ # the "_wait" is always released while holding the "_lock".
+ self._lock = threading.Lock()
+ self._wait = threading.Lock()
# only the stream reader touch this, it is find to touch without the lock
self._current_read = 0
# do not touch this without the lock
@@ -1245,7 +1255,7 @@
def _has_free_space(self):
"""True if more data can be read without further exceeding memory target
- Must be called under the condition lock.
+ Must be called under the lock.
"""
if self._memory_target is None:
# Ideally we should not even get into the locking business in that
@@ -1258,10 +1268,12 @@
This is meant to be used from another thread than the one filler the queue.
"""
- with self._memory_condition:
+ with self._lock:
if offset > self._current_used:
self._current_used = offset
- self._memory_condition.notify()
+ # If the reader is waiting for room, unblock it.
+ if self._wait.locked() and self._has_free_space():
+ self._wait.release()
def fill_from(self, data):
"""fill the data queue from a bundle2 part object
@@ -1275,10 +1287,20 @@
q.put(item)
if self._abort:
break
- with self._memory_condition:
- self._memory_condition.wait_for(
- self._has_free_space,
- )
+ with self._lock:
+ while not self._has_free_space():
+ # make sure the _wait lock is locked
+ # this is done under lock, so there case be no race with the release logic
+ self._wait.acquire(blocking=False)
+ self._lock.release()
+ # acquiring the lock will block until some other thread release it.
+ self._wait.acquire()
+ # lets dive into the locked section again
+ self._lock.acquire()
+ # make sure we release the lock we just grabed if
+ # needed.
+ if self._wait.locked():
+ self._wait.release()
finally:
q.put(None)
@@ -1299,7 +1321,10 @@
"""
self._abort = True
self._q.put(None)
- self._memory_condition.notify_all()
+ with self._lock:
+ # make sure we unstuck the reader thread.
+ if self._wait.locked():
+ self._wait.release()
class _FileInfoQueue: