--- a/mercurial/streamclone.py Mon Feb 03 21:31:35 2025 +0100
+++ b/mercurial/streamclone.py Wed Feb 05 12:17:00 2025 +0100
@@ -7,10 +7,10 @@
from __future__ import annotations
+import collections
import contextlib
import errno
import os
-import queue
import struct
import threading
@@ -1222,6 +1222,48 @@
]
+class _Queue:
+ """a reimplementation of queue.Queue which doesn't use thread.Condition"""
+
+ def __init__(self):
+ self._queue = collections.deque()
+
+ # the "_lock" protect manipulation of the "_queue" deque
+ # the "_wait" is used to have the "get" thread waits for the
+ # "put" thread when the queue is empty.
+ #
+ # This is similar to the "threading.Condition", but without the absurd
+ # slowness of the stdlib implementation.
+ #
+ # the "_wait" is always released while holding the "_lock".
+ self._lock = threading.Lock()
+ self._wait = threading.Lock()
+
+ def put(self, item):
+ with self._lock:
+ self._queue.append(item)
+ # if anyone is waiting on item, unblock it.
+ if self._wait.locked():
+ self._wait.release()
+
+ def get(self):
+ with self._lock:
+ while len(self._queue) == 0:
+ # "arm" the waiting lock
+ self._wait.acquire(blocking=False)
+ # release the lock to let other touch the queue
+ # (especially the put call we wait on)
+ self._lock.release()
+ # wait for for a `put` call to release the lock
+ self._wait.acquire()
+ # grab the lock to look at a possible available value
+ self._lock.acquire()
+ # disarm the lock if necessary.
+ if self._wait.locked():
+ self._wait.release()
+ return self._queue.popleft()
+
+
class _DataQueue:
"""A queue passing data from the bundle stream to other thread
@@ -1231,7 +1273,7 @@
"""
def __init__(self, memory_target=None):
- self._q = queue.Queue()
+ self._q = _Queue()
self._abort = False
self._memory_target = memory_target
if self._memory_target is not None and self._memory_target <= 0:
@@ -1332,7 +1374,7 @@
def __init__(self, info: Iterable[FileInfoT]):
self._info = info
- self._q = queue.Queue()
+ self._q = _Queue()
def fill(self):
"""iterate over the parsed information to file the queue
@@ -1408,7 +1450,7 @@
):
super().__init__(fp, data_len, progress, report)
self._fp = fp
- self._queue = queue.Queue()
+ self._queue = _Queue()
self._mark_used = mark_used
def fill(self) -> None: