mercurial/streamclone.py
changeset 52922 0af8965b668a
parent 52921 363914ba328d
child 52923 f1ac5117459b
--- a/mercurial/streamclone.py	Mon Feb 03 21:31:35 2025 +0100
+++ b/mercurial/streamclone.py	Wed Feb 05 12:17:00 2025 +0100
@@ -7,10 +7,10 @@
 
 from __future__ import annotations
 
+import collections
 import contextlib
 import errno
 import os
-import queue
 import struct
 import threading
 
@@ -1222,6 +1222,48 @@
 ]
 
 
+class _Queue:
+    """a reimplementation of queue.Queue which doesn't use thread.Condition"""
+
+    def __init__(self):
+        self._queue = collections.deque()
+
+        # the "_lock" protect manipulation of the "_queue" deque
+        # the "_wait" is used to have the "get" thread waits for the
+        # "put" thread when the queue is empty.
+        #
+        # This is similar to the "threading.Condition", but without the absurd
+        # slowness of the stdlib implementation.
+        #
+        # the "_wait" is always released while holding the "_lock".
+        self._lock = threading.Lock()
+        self._wait = threading.Lock()
+
+    def put(self, item):
+        with self._lock:
+            self._queue.append(item)
+            # if anyone is waiting on item, unblock it.
+            if self._wait.locked():
+                self._wait.release()
+
+    def get(self):
+        with self._lock:
+            while len(self._queue) == 0:
+                # "arm"  the waiting lock
+                self._wait.acquire(blocking=False)
+                # release the lock to let other touch the queue
+                # (especially the put call we wait on)
+                self._lock.release()
+                # wait for for a `put` call to release the lock
+                self._wait.acquire()
+                # grab the lock to look at a possible available value
+                self._lock.acquire()
+                # disarm the lock if necessary.
+                if self._wait.locked():
+                    self._wait.release()
+            return self._queue.popleft()
+
+
 class _DataQueue:
     """A queue passing data from the bundle stream to other thread
 
@@ -1231,7 +1273,7 @@
     """
 
     def __init__(self, memory_target=None):
-        self._q = queue.Queue()
+        self._q = _Queue()
         self._abort = False
         self._memory_target = memory_target
         if self._memory_target is not None and self._memory_target <= 0:
@@ -1332,7 +1374,7 @@
 
     def __init__(self, info: Iterable[FileInfoT]):
         self._info = info
-        self._q = queue.Queue()
+        self._q = _Queue()
 
     def fill(self):
         """iterate over the parsed information to file the queue
@@ -1408,7 +1450,7 @@
     ):
         super().__init__(fp, data_len, progress, report)
         self._fp = fp
-        self._queue = queue.Queue()
+        self._queue = _Queue()
         self._mark_used = mark_used
 
     def fill(self) -> None: