diff -r 2787db338b15 -r aee193b1c784 mercurial/streamclone.py --- a/mercurial/streamclone.py Wed Jan 29 15:50:01 2025 +0100 +++ b/mercurial/streamclone.py Mon Jan 27 18:22:01 2025 +0100 @@ -15,6 +15,7 @@ import threading from typing import ( + Callable, Iterable, Iterator, Optional, @@ -54,6 +55,15 @@ } +# Number arbitrarily picked, feel free to adjust them. Do update the +# documentation if you do so +DEFAULT_MEMORY_TARGET = { + scmutil.RESOURCE_LOW: 100 * (2**20), # 100 MB + scmutil.RESOURCE_MEDIUM: 2**30, # 1 GB + scmutil.RESOURCE_HIGH: None, +} + + def new_stream_clone_requirements( default_requirements: Iterable[bytes], streamed_requirements: Iterable[bytes], @@ -1114,6 +1124,7 @@ ) cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu') + mem_profile = scmutil.get_resource_profile(repo.ui, b'memory') threaded = repo.ui.configbool( b"worker", b"parallel-stream-bundle-processing" ) @@ -1123,19 +1134,29 @@ ) if num_writer <= 0: num_writer = DEFAULT_NUM_WRITER[cpu_profile] + memory_target = repo.ui.configbytes( + b"worker", + b"parallel-stream-bundle-processing.memory-target", + ) + if memory_target < 0: + memory_target = None + elif memory_target == 0: + memory_target = DEFAULT_MEMORY_TARGET[mem_profile] with repo.transaction(b'clone'): ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) with nested(*ctxs): workers = [] info_queue = None data_queue = None + mark_used = None try: if not threaded: fc = _FileChunker raw_data = fp else: fc = _ThreadSafeFileChunker - data_queue = _DataQueue() + data_queue = _DataQueue(memory_target=memory_target) + mark_used = data_queue.mark_used raw_data = util.chunkbuffer(data_queue) w = threading.Thread( @@ -1151,6 +1172,7 @@ progress, report, file_chunker=fc, + mark_used=mark_used, ) if not threaded: _write_files(vfsmap, files) @@ -1201,11 +1223,45 @@ class _DataQueue: - """A queue passing data from the bundle stream to other thread""" + """A queue passing data from the bundle stream to other thread - def __init__(self): + It has a "target_memory" optional parameter to avoid buffering too much + information. The implementation is not exact and the memory target might be + exceed for a time in some situation. + """ + + def __init__(self, memory_target=None): self._q = queue.Queue() self._abort = False + self._memory_target = memory_target + if self._memory_target is not None and self._memory_target <= 0: + raise error.ProgrammingError("memory target should be > 0") + self._memory_condition = threading.Condition() + # only the stream reader touch this, it is find to touch without the lock + self._current_read = 0 + # do not touch this without the lock + self._current_used = 0 + + def _has_free_space(self): + """True if more data can be read without further exceeding memory target + + Must be called under the condition lock. + """ + if self._memory_target is None: + # Ideally we should not even get into the locking business in that + # case, but we keep the implementation simple for now. + return True + return (self._current_read - self._current_used) < self._memory_target + + def mark_used(self, offset): + """Notify we have used the buffer up to "offset" + + This is meant to be used from another thread than the one filler the queue. + """ + with self._memory_condition: + if offset > self._current_used: + self._current_used = offset + self._memory_condition.notify() def fill_from(self, data): """fill the data queue from a bundle2 part object @@ -1215,9 +1271,14 @@ q = self._q try: for item in data: + self._current_read += len(item) q.put(item) if self._abort: break + with self._memory_condition: + self._memory_condition.wait_for( + self._has_free_space, + ) finally: q.put(None) @@ -1232,12 +1293,13 @@ break def abort(self): - """stop the data-reading thread and interrup the iteration of the consumer + """stop the data-reading thread and interrupt the comsuming iteration - This is meant to be called on error. + This is meant to be called on errors. """ self._abort = True self._q.put(None) + self._memory_condition.notify_all() class _FileInfoQueue: @@ -1288,6 +1350,7 @@ data_len: int, progress: scmutil.progress, report: V2Report, + mark_used: Optional[Callable[[int], None]] = None, ): self.report = report self.progress = progress @@ -1316,9 +1379,12 @@ data_len: int, progress: scmutil.progress, report: V2Report, + mark_used: Optional[Callable[[int], None]] = None, ): super().__init__(fp, data_len, progress, report) + self._fp = fp self._queue = queue.Queue() + self._mark_used = mark_used def fill(self) -> None: """fill the file chunker queue with data read from the stream @@ -1328,7 +1394,8 @@ """ try: for chunk in super().__iter__(): - self._queue.put(chunk) + offset = self._fp.tell() + self._queue.put((chunk, offset)) finally: self._queue.put(None) @@ -1337,7 +1404,10 @@ This is meant to be called from the writer threads. """ - while (chunk := self._queue.get()) is not None: + while (info := self._queue.get()) is not None: + chunk, offset = info + if self._mark_used is not None: + self._mark_used(offset) yield chunk @@ -1348,6 +1418,7 @@ progress: scmutil.progress, report: V2Report, file_chunker: Type[_FileChunker] = _FileChunker, + mark_used: Optional[Callable[[int], None]] = None, ) -> Iterator[FileInfoT]: """do the "stream-parsing" part of stream v2 @@ -1365,7 +1436,13 @@ repo.ui.debug( b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen)) ) - chunks = file_chunker(fp, datalen, progress, report) + chunks = file_chunker( + fp, + datalen, + progress, + report, + mark_used=mark_used, + ) yield (src, name, iter(chunks)) # make sure we read all the chunk before moving to the next file chunks.fill()