--- a/mercurial/streamclone.py Wed Jan 29 15:50:01 2025 +0100
+++ b/mercurial/streamclone.py Mon Jan 27 18:22:01 2025 +0100
@@ -15,6 +15,7 @@
import threading
from typing import (
+ Callable,
Iterable,
Iterator,
Optional,
@@ -54,6 +55,15 @@
}
+# Number arbitrarily picked, feel free to adjust them. Do update the
+# documentation if you do so
+DEFAULT_MEMORY_TARGET = {
+ scmutil.RESOURCE_LOW: 100 * (2**20), # 100 MB
+ scmutil.RESOURCE_MEDIUM: 2**30, # 1 GB
+ scmutil.RESOURCE_HIGH: None,
+}
+
+
def new_stream_clone_requirements(
default_requirements: Iterable[bytes],
streamed_requirements: Iterable[bytes],
@@ -1114,6 +1124,7 @@
)
cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu')
+ mem_profile = scmutil.get_resource_profile(repo.ui, b'memory')
threaded = repo.ui.configbool(
b"worker", b"parallel-stream-bundle-processing"
)
@@ -1123,19 +1134,29 @@
)
if num_writer <= 0:
num_writer = DEFAULT_NUM_WRITER[cpu_profile]
+ memory_target = repo.ui.configbytes(
+ b"worker",
+ b"parallel-stream-bundle-processing.memory-target",
+ )
+ if memory_target < 0:
+ memory_target = None
+ elif memory_target == 0:
+ memory_target = DEFAULT_MEMORY_TARGET[mem_profile]
with repo.transaction(b'clone'):
ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
with nested(*ctxs):
workers = []
info_queue = None
data_queue = None
+ mark_used = None
try:
if not threaded:
fc = _FileChunker
raw_data = fp
else:
fc = _ThreadSafeFileChunker
- data_queue = _DataQueue()
+ data_queue = _DataQueue(memory_target=memory_target)
+ mark_used = data_queue.mark_used
raw_data = util.chunkbuffer(data_queue)
w = threading.Thread(
@@ -1151,6 +1172,7 @@
progress,
report,
file_chunker=fc,
+ mark_used=mark_used,
)
if not threaded:
_write_files(vfsmap, files)
@@ -1201,11 +1223,45 @@
class _DataQueue:
- """A queue passing data from the bundle stream to other thread"""
+ """A queue passing data from the bundle stream to other thread
- def __init__(self):
+ It has a "target_memory" optional parameter to avoid buffering too much
+ information. The implementation is not exact and the memory target might be
+ exceed for a time in some situation.
+ """
+
+ def __init__(self, memory_target=None):
self._q = queue.Queue()
self._abort = False
+ self._memory_target = memory_target
+ if self._memory_target is not None and self._memory_target <= 0:
+ raise error.ProgrammingError("memory target should be > 0")
+ self._memory_condition = threading.Condition()
+ # only the stream reader touch this, it is find to touch without the lock
+ self._current_read = 0
+ # do not touch this without the lock
+ self._current_used = 0
+
+ def _has_free_space(self):
+ """True if more data can be read without further exceeding memory target
+
+ Must be called under the condition lock.
+ """
+ if self._memory_target is None:
+ # Ideally we should not even get into the locking business in that
+ # case, but we keep the implementation simple for now.
+ return True
+ return (self._current_read - self._current_used) < self._memory_target
+
+ def mark_used(self, offset):
+ """Notify we have used the buffer up to "offset"
+
+ This is meant to be used from another thread than the one filler the queue.
+ """
+ with self._memory_condition:
+ if offset > self._current_used:
+ self._current_used = offset
+ self._memory_condition.notify()
def fill_from(self, data):
"""fill the data queue from a bundle2 part object
@@ -1215,9 +1271,14 @@
q = self._q
try:
for item in data:
+ self._current_read += len(item)
q.put(item)
if self._abort:
break
+ with self._memory_condition:
+ self._memory_condition.wait_for(
+ self._has_free_space,
+ )
finally:
q.put(None)
@@ -1232,12 +1293,13 @@
break
def abort(self):
- """stop the data-reading thread and interrup the iteration of the consumer
+ """stop the data-reading thread and interrupt the comsuming iteration
- This is meant to be called on error.
+ This is meant to be called on errors.
"""
self._abort = True
self._q.put(None)
+ self._memory_condition.notify_all()
class _FileInfoQueue:
@@ -1288,6 +1350,7 @@
data_len: int,
progress: scmutil.progress,
report: V2Report,
+ mark_used: Optional[Callable[[int], None]] = None,
):
self.report = report
self.progress = progress
@@ -1316,9 +1379,12 @@
data_len: int,
progress: scmutil.progress,
report: V2Report,
+ mark_used: Optional[Callable[[int], None]] = None,
):
super().__init__(fp, data_len, progress, report)
+ self._fp = fp
self._queue = queue.Queue()
+ self._mark_used = mark_used
def fill(self) -> None:
"""fill the file chunker queue with data read from the stream
@@ -1328,7 +1394,8 @@
"""
try:
for chunk in super().__iter__():
- self._queue.put(chunk)
+ offset = self._fp.tell()
+ self._queue.put((chunk, offset))
finally:
self._queue.put(None)
@@ -1337,7 +1404,10 @@
This is meant to be called from the writer threads.
"""
- while (chunk := self._queue.get()) is not None:
+ while (info := self._queue.get()) is not None:
+ chunk, offset = info
+ if self._mark_used is not None:
+ self._mark_used(offset)
yield chunk
@@ -1348,6 +1418,7 @@
progress: scmutil.progress,
report: V2Report,
file_chunker: Type[_FileChunker] = _FileChunker,
+ mark_used: Optional[Callable[[int], None]] = None,
) -> Iterator[FileInfoT]:
"""do the "stream-parsing" part of stream v2
@@ -1365,7 +1436,13 @@
repo.ui.debug(
b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
)
- chunks = file_chunker(fp, datalen, progress, report)
+ chunks = file_chunker(
+ fp,
+ datalen,
+ progress,
+ report,
+ mark_used=mark_used,
+ )
yield (src, name, iter(chunks))
# make sure we read all the chunk before moving to the next file
chunks.fill()