diff mercurial/streamclone.py @ 52920:aee193b1c784

stream-clone-v2: introduce a way to limit memory usage of the threaded version This mechanism limit the read-aheadto avoid uncontrolled memory usage. Note that the memory target is not a hard limit and that mercurial will use more memory that specified in some cases, but "not too much more". The current implementation focus on "simplicity" and could be more efficient. However this provide a "safe" baseline for the feature. Now that the overall shape of the feature is here we can start making it faster. Here is more benchmark of how everything slows down. The next series will focus on optimizing this code path to actually speeds things up. ### benchmark.name = hg.perf.exchange.stream.consume # bin-env-vars.hg.flavor = default # bin-env-vars.hg.py-re2-module = default # benchmark.variants.memory-target = default # benchmark.variants.num-writer = default # benchmark.variants.parallel-processing = yes # benchmark.variants.progress = no # benchmark.variants.read-from-memory = yes # benchmark.variants.version = v2 ## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog no-thread: 7.244015 ~~~~~ write-thread: 9.128669 (+26.02%, +1.88) read-thread: 9.387256 (+29.59%, +2.14) mem-target: 9.901032 (+36.68%, +2.66) ## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog no-thread: 0.249693 ~~~~~ write-thread: 0.275081 (+10.17%, +0.03) read-thread: 0.292601 (+17.18%, +0.04) mem-target: 0.305973 (+22.54%, +0.06) ## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog no-thread: 13.136674 ~~~~~ write-thread: 16.374306 (+24.65%, +3.24) read-thread: 17.008865 (+29.48%, +3.87) mem-target: 18.467590 (+40.58%, +5.33) ## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog no-thread: 5.317709 ~~~~~ write-thread: 6.783031 (+27.56%, +1.47) read-thread: 7.107141 (+33.65%, +1.79) mem-target: 7.338505 (+38.00%, +2.02) ## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog no-thread: 5.398368 ~~~~~ write-thread: 6.737864 (+24.81%, +1.34) read-thread: 7.163505 (+32.70%, +1.77) mem-target: 7.333354 (+35.84%, +1.93) ## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog no-thread: 3.acbb55 ~~~~~ write-thread: 3.758324 (+22.48%, +0.69) read-thread: 3.907693 (+27.34%, +0.84) mem-target: 4.238172 (+38.11%, +1.17) ## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog no-thread: 51.934795 ~~~~~ write-thread: 66.561340 (+28.16%, +14.63) read-thread: 66.902619 (+28.82%, +14.97) mem-target: 78.194540 (+50.56%, +26.26) ## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog # benchmark.variants.read-from-memory = yes no-thread: 52.253858 ~~~~~ write-thread: 66.955037 (+28.13%, +14.70) read-thread: 66.397609 (+27.07%, +14.14) mem-target: 77.492938 (+48.30%, +25.24) ## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog # benchmark.variants.read-from-memory = no no-thread: 130.584329 ~~~~~ write-thread: 138.770454 (+6.27%, +8.19) read-thread: 145.137477 (+11.14%, +14.55) mem-target: 164.366925 (+25.87%, +33.78)
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Mon, 27 Jan 2025 18:22:01 +0100
parents 9abf173a958b
children 363914ba328d
line wrap: on
line diff
--- a/mercurial/streamclone.py	Wed Jan 29 15:50:01 2025 +0100
+++ b/mercurial/streamclone.py	Mon Jan 27 18:22:01 2025 +0100
@@ -15,6 +15,7 @@
 import threading
 
 from typing import (
+    Callable,
     Iterable,
     Iterator,
     Optional,
@@ -54,6 +55,15 @@
 }
 
 
+# Number arbitrarily picked, feel free to adjust them. Do update the
+# documentation if you do so
+DEFAULT_MEMORY_TARGET = {
+    scmutil.RESOURCE_LOW: 100 * (2**20),  # 100 MB
+    scmutil.RESOURCE_MEDIUM: 2**30,  # 1 GB
+    scmutil.RESOURCE_HIGH: None,
+}
+
+
 def new_stream_clone_requirements(
     default_requirements: Iterable[bytes],
     streamed_requirements: Iterable[bytes],
@@ -1114,6 +1124,7 @@
             )
 
         cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu')
+        mem_profile = scmutil.get_resource_profile(repo.ui, b'memory')
         threaded = repo.ui.configbool(
             b"worker", b"parallel-stream-bundle-processing"
         )
@@ -1123,19 +1134,29 @@
         )
         if num_writer <= 0:
             num_writer = DEFAULT_NUM_WRITER[cpu_profile]
+        memory_target = repo.ui.configbytes(
+            b"worker",
+            b"parallel-stream-bundle-processing.memory-target",
+        )
+        if memory_target < 0:
+            memory_target = None
+        elif memory_target == 0:
+            memory_target = DEFAULT_MEMORY_TARGET[mem_profile]
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
                 workers = []
                 info_queue = None
                 data_queue = None
+                mark_used = None
                 try:
                     if not threaded:
                         fc = _FileChunker
                         raw_data = fp
                     else:
                         fc = _ThreadSafeFileChunker
-                        data_queue = _DataQueue()
+                        data_queue = _DataQueue(memory_target=memory_target)
+                        mark_used = data_queue.mark_used
                         raw_data = util.chunkbuffer(data_queue)
 
                         w = threading.Thread(
@@ -1151,6 +1172,7 @@
                         progress,
                         report,
                         file_chunker=fc,
+                        mark_used=mark_used,
                     )
                     if not threaded:
                         _write_files(vfsmap, files)
@@ -1201,11 +1223,45 @@
 
 
 class _DataQueue:
-    """A queue passing data from the bundle stream to other thread"""
+    """A queue passing data from the bundle stream to other thread
 
-    def __init__(self):
+    It has a "target_memory" optional parameter to avoid buffering too much
+    information. The implementation is not exact and the memory target might be
+    exceed for a time in some situation.
+    """
+
+    def __init__(self, memory_target=None):
         self._q = queue.Queue()
         self._abort = False
+        self._memory_target = memory_target
+        if self._memory_target is not None and self._memory_target <= 0:
+            raise error.ProgrammingError("memory target should be > 0")
+        self._memory_condition = threading.Condition()
+        # only the stream reader touch this, it is find to touch without the lock
+        self._current_read = 0
+        # do not touch this without the lock
+        self._current_used = 0
+
+    def _has_free_space(self):
+        """True if more data can be read without further exceeding memory target
+
+        Must be called under the condition lock.
+        """
+        if self._memory_target is None:
+            # Ideally we should not even get into the locking business in that
+            # case, but we keep the implementation simple for now.
+            return True
+        return (self._current_read - self._current_used) < self._memory_target
+
+    def mark_used(self, offset):
+        """Notify we have used the buffer up to "offset"
+
+        This is meant to be used from another thread than the one filler the queue.
+        """
+        with self._memory_condition:
+            if offset > self._current_used:
+                self._current_used = offset
+            self._memory_condition.notify()
 
     def fill_from(self, data):
         """fill the data queue from a bundle2 part object
@@ -1215,9 +1271,14 @@
         q = self._q
         try:
             for item in data:
+                self._current_read += len(item)
                 q.put(item)
                 if self._abort:
                     break
+                with self._memory_condition:
+                    self._memory_condition.wait_for(
+                        self._has_free_space,
+                    )
         finally:
             q.put(None)
 
@@ -1232,12 +1293,13 @@
                 break
 
     def abort(self):
-        """stop the data-reading thread and interrup the iteration of the consumer
+        """stop the data-reading thread and interrupt the comsuming iteration
 
-        This is meant to be called on error.
+        This is meant to be called on errors.
         """
         self._abort = True
         self._q.put(None)
+        self._memory_condition.notify_all()
 
 
 class _FileInfoQueue:
@@ -1288,6 +1350,7 @@
         data_len: int,
         progress: scmutil.progress,
         report: V2Report,
+        mark_used: Optional[Callable[[int], None]] = None,
     ):
         self.report = report
         self.progress = progress
@@ -1316,9 +1379,12 @@
         data_len: int,
         progress: scmutil.progress,
         report: V2Report,
+        mark_used: Optional[Callable[[int], None]] = None,
     ):
         super().__init__(fp, data_len, progress, report)
+        self._fp = fp
         self._queue = queue.Queue()
+        self._mark_used = mark_used
 
     def fill(self) -> None:
         """fill the file chunker queue with data read from the stream
@@ -1328,7 +1394,8 @@
         """
         try:
             for chunk in super().__iter__():
-                self._queue.put(chunk)
+                offset = self._fp.tell()
+                self._queue.put((chunk, offset))
         finally:
             self._queue.put(None)
 
@@ -1337,7 +1404,10 @@
 
         This is meant to be called from the writer threads.
         """
-        while (chunk := self._queue.get()) is not None:
+        while (info := self._queue.get()) is not None:
+            chunk, offset = info
+            if self._mark_used is not None:
+                self._mark_used(offset)
             yield chunk
 
 
@@ -1348,6 +1418,7 @@
     progress: scmutil.progress,
     report: V2Report,
     file_chunker: Type[_FileChunker] = _FileChunker,
+    mark_used: Optional[Callable[[int], None]] = None,
 ) -> Iterator[FileInfoT]:
     """do the "stream-parsing" part of stream v2
 
@@ -1365,7 +1436,13 @@
             repo.ui.debug(
                 b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
             )
-        chunks = file_chunker(fp, datalen, progress, report)
+        chunks = file_chunker(
+            fp,
+            datalen,
+            progress,
+            report,
+            mark_used=mark_used,
+        )
         yield (src, name, iter(chunks))
         # make sure we read all the chunk before moving to the next file
         chunks.fill()