mercurial/streamclone.py
changeset 52920 aee193b1c784
parent 52917 9abf173a958b
child 52921 363914ba328d
--- a/mercurial/streamclone.py	Wed Jan 29 15:50:01 2025 +0100
+++ b/mercurial/streamclone.py	Mon Jan 27 18:22:01 2025 +0100
@@ -15,6 +15,7 @@
 import threading
 
 from typing import (
+    Callable,
     Iterable,
     Iterator,
     Optional,
@@ -54,6 +55,15 @@
 }
 
 
+# Number arbitrarily picked, feel free to adjust them. Do update the
+# documentation if you do so
+DEFAULT_MEMORY_TARGET = {
+    scmutil.RESOURCE_LOW: 100 * (2**20),  # 100 MB
+    scmutil.RESOURCE_MEDIUM: 2**30,  # 1 GB
+    scmutil.RESOURCE_HIGH: None,
+}
+
+
 def new_stream_clone_requirements(
     default_requirements: Iterable[bytes],
     streamed_requirements: Iterable[bytes],
@@ -1114,6 +1124,7 @@
             )
 
         cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu')
+        mem_profile = scmutil.get_resource_profile(repo.ui, b'memory')
         threaded = repo.ui.configbool(
             b"worker", b"parallel-stream-bundle-processing"
         )
@@ -1123,19 +1134,29 @@
         )
         if num_writer <= 0:
             num_writer = DEFAULT_NUM_WRITER[cpu_profile]
+        memory_target = repo.ui.configbytes(
+            b"worker",
+            b"parallel-stream-bundle-processing.memory-target",
+        )
+        if memory_target < 0:
+            memory_target = None
+        elif memory_target == 0:
+            memory_target = DEFAULT_MEMORY_TARGET[mem_profile]
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
                 workers = []
                 info_queue = None
                 data_queue = None
+                mark_used = None
                 try:
                     if not threaded:
                         fc = _FileChunker
                         raw_data = fp
                     else:
                         fc = _ThreadSafeFileChunker
-                        data_queue = _DataQueue()
+                        data_queue = _DataQueue(memory_target=memory_target)
+                        mark_used = data_queue.mark_used
                         raw_data = util.chunkbuffer(data_queue)
 
                         w = threading.Thread(
@@ -1151,6 +1172,7 @@
                         progress,
                         report,
                         file_chunker=fc,
+                        mark_used=mark_used,
                     )
                     if not threaded:
                         _write_files(vfsmap, files)
@@ -1201,11 +1223,45 @@
 
 
 class _DataQueue:
-    """A queue passing data from the bundle stream to other thread"""
+    """A queue passing data from the bundle stream to other thread
 
-    def __init__(self):
+    It has a "target_memory" optional parameter to avoid buffering too much
+    information. The implementation is not exact and the memory target might be
+    exceed for a time in some situation.
+    """
+
+    def __init__(self, memory_target=None):
         self._q = queue.Queue()
         self._abort = False
+        self._memory_target = memory_target
+        if self._memory_target is not None and self._memory_target <= 0:
+            raise error.ProgrammingError("memory target should be > 0")
+        self._memory_condition = threading.Condition()
+        # only the stream reader touch this, it is find to touch without the lock
+        self._current_read = 0
+        # do not touch this without the lock
+        self._current_used = 0
+
+    def _has_free_space(self):
+        """True if more data can be read without further exceeding memory target
+
+        Must be called under the condition lock.
+        """
+        if self._memory_target is None:
+            # Ideally we should not even get into the locking business in that
+            # case, but we keep the implementation simple for now.
+            return True
+        return (self._current_read - self._current_used) < self._memory_target
+
+    def mark_used(self, offset):
+        """Notify we have used the buffer up to "offset"
+
+        This is meant to be used from another thread than the one filler the queue.
+        """
+        with self._memory_condition:
+            if offset > self._current_used:
+                self._current_used = offset
+            self._memory_condition.notify()
 
     def fill_from(self, data):
         """fill the data queue from a bundle2 part object
@@ -1215,9 +1271,14 @@
         q = self._q
         try:
             for item in data:
+                self._current_read += len(item)
                 q.put(item)
                 if self._abort:
                     break
+                with self._memory_condition:
+                    self._memory_condition.wait_for(
+                        self._has_free_space,
+                    )
         finally:
             q.put(None)
 
@@ -1232,12 +1293,13 @@
                 break
 
     def abort(self):
-        """stop the data-reading thread and interrup the iteration of the consumer
+        """stop the data-reading thread and interrupt the comsuming iteration
 
-        This is meant to be called on error.
+        This is meant to be called on errors.
         """
         self._abort = True
         self._q.put(None)
+        self._memory_condition.notify_all()
 
 
 class _FileInfoQueue:
@@ -1288,6 +1350,7 @@
         data_len: int,
         progress: scmutil.progress,
         report: V2Report,
+        mark_used: Optional[Callable[[int], None]] = None,
     ):
         self.report = report
         self.progress = progress
@@ -1316,9 +1379,12 @@
         data_len: int,
         progress: scmutil.progress,
         report: V2Report,
+        mark_used: Optional[Callable[[int], None]] = None,
     ):
         super().__init__(fp, data_len, progress, report)
+        self._fp = fp
         self._queue = queue.Queue()
+        self._mark_used = mark_used
 
     def fill(self) -> None:
         """fill the file chunker queue with data read from the stream
@@ -1328,7 +1394,8 @@
         """
         try:
             for chunk in super().__iter__():
-                self._queue.put(chunk)
+                offset = self._fp.tell()
+                self._queue.put((chunk, offset))
         finally:
             self._queue.put(None)
 
@@ -1337,7 +1404,10 @@
 
         This is meant to be called from the writer threads.
         """
-        while (chunk := self._queue.get()) is not None:
+        while (info := self._queue.get()) is not None:
+            chunk, offset = info
+            if self._mark_used is not None:
+                self._mark_used(offset)
             yield chunk
 
 
@@ -1348,6 +1418,7 @@
     progress: scmutil.progress,
     report: V2Report,
     file_chunker: Type[_FileChunker] = _FileChunker,
+    mark_used: Optional[Callable[[int], None]] = None,
 ) -> Iterator[FileInfoT]:
     """do the "stream-parsing" part of stream v2
 
@@ -1365,7 +1436,13 @@
             repo.ui.debug(
                 b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
             )
-        chunks = file_chunker(fp, datalen, progress, report)
+        chunks = file_chunker(
+            fp,
+            datalen,
+            progress,
+            report,
+            mark_used=mark_used,
+        )
         yield (src, name, iter(chunks))
         # make sure we read all the chunk before moving to the next file
         chunks.fill()