--- a/mercurial/streamclone.py Mon Jan 27 19:15:39 2025 +0100
+++ b/mercurial/streamclone.py Mon Jan 20 17:05:22 2025 +0100
@@ -1126,25 +1126,39 @@
with repo.transaction(b'clone'):
ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
with nested(*ctxs):
- if threaded:
- fc = _ThreadSafeFileChunker
- else:
- fc = _FileChunker
- files = _v2_parse_files(
- repo,
- fp,
- filecount,
- progress,
- report,
- file_chunker=fc,
- )
- if not threaded:
- _write_files(vfsmap, files)
- else:
- info_queue = _FileInfoQueue(files)
+ workers = []
+ info_queue = None
+ data_queue = None
+ try:
+ if not threaded:
+ fc = _FileChunker
+ raw_data = fp
+ else:
+ fc = _ThreadSafeFileChunker
+ data_queue = _DataQueue()
+ raw_data = util.chunkbuffer(data_queue)
- workers = []
- try:
+ # XXX we will drop this extra filechunkiter layer soon
+ part_content = util.filechunkiter(fp)
+ w = threading.Thread(
+ target=data_queue.fill_from,
+ args=(part_content,),
+ )
+ workers.append(w)
+ w.start()
+ files = _v2_parse_files(
+ repo,
+ raw_data,
+ filecount,
+ progress,
+ report,
+ file_chunker=fc,
+ )
+ if not threaded:
+ _write_files(vfsmap, files)
+ else:
+ info_queue = _FileInfoQueue(files)
+
for __ in range(num_writer):
w = threading.Thread(
target=_write_files,
@@ -1153,11 +1167,18 @@
workers.append(w)
w.start()
info_queue.fill()
- finally:
- # shut down all the workers
+ except: # re-raises
+ if data_queue is not None:
+ data_queue.abort()
+ raise
+ finally:
+ # shut down all the workers
+ if info_queue is not None:
+ # this is strictly speaking one too many worker for
+ # this queu, but closing too many is not a problem.
info_queue.close(len(workers))
- for w in workers:
- w.join()
+ for w in workers:
+ w.join()
# force @filecache properties to be reloaded from
# streamclone-ed file at next access
@@ -1181,6 +1202,46 @@
]
+class _DataQueue:
+ """A queue passing data from the bundle stream to other thread"""
+
+ def __init__(self):
+ self._q = queue.Queue()
+ self._abort = False
+
+ def fill_from(self, data):
+ """fill the data queue from a bundle2 part object
+
+ This is meant to be called by the data reading thread
+ """
+ q = self._q
+ try:
+ for item in data:
+ q.put(item)
+ if self._abort:
+ break
+ finally:
+ q.put(None)
+
+ def __iter__(self):
+ """Iterate over the bundle chunkgs
+
+ This is meant to be called by the data parsing thread."""
+ q = self._q
+ while (i := q.get()) is not None:
+ yield i
+ if self._abort:
+ break
+
+ def abort(self):
+ """stop the data-reading thread and interrup the iteration of the consumer
+
+ This is meant to be called on error.
+ """
+ self._abort = True
+ self._q.put(None)
+
+
class _FileInfoQueue:
"""A thread-safe queue to passer parsed file information to the writers"""