mercurial/streamclone.py
changeset 52914 d5ae681834e8
parent 52913 58baa86c7a02
child 52917 9abf173a958b
--- a/mercurial/streamclone.py	Mon Jan 27 19:15:39 2025 +0100
+++ b/mercurial/streamclone.py	Mon Jan 20 17:05:22 2025 +0100
@@ -1126,25 +1126,39 @@
         with repo.transaction(b'clone'):
             ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
             with nested(*ctxs):
-                if threaded:
-                    fc = _ThreadSafeFileChunker
-                else:
-                    fc = _FileChunker
-                files = _v2_parse_files(
-                    repo,
-                    fp,
-                    filecount,
-                    progress,
-                    report,
-                    file_chunker=fc,
-                )
-                if not threaded:
-                    _write_files(vfsmap, files)
-                else:
-                    info_queue = _FileInfoQueue(files)
+                workers = []
+                info_queue = None
+                data_queue = None
+                try:
+                    if not threaded:
+                        fc = _FileChunker
+                        raw_data = fp
+                    else:
+                        fc = _ThreadSafeFileChunker
+                        data_queue = _DataQueue()
+                        raw_data = util.chunkbuffer(data_queue)
 
-                    workers = []
-                    try:
+                        # XXX we will drop this extra filechunkiter layer soon
+                        part_content = util.filechunkiter(fp)
+                        w = threading.Thread(
+                            target=data_queue.fill_from,
+                            args=(part_content,),
+                        )
+                        workers.append(w)
+                        w.start()
+                    files = _v2_parse_files(
+                        repo,
+                        raw_data,
+                        filecount,
+                        progress,
+                        report,
+                        file_chunker=fc,
+                    )
+                    if not threaded:
+                        _write_files(vfsmap, files)
+                    else:
+                        info_queue = _FileInfoQueue(files)
+
                         for __ in range(num_writer):
                             w = threading.Thread(
                                 target=_write_files,
@@ -1153,11 +1167,18 @@
                             workers.append(w)
                             w.start()
                         info_queue.fill()
-                    finally:
-                        # shut down all the workers
+                except:  # re-raises
+                    if data_queue is not None:
+                        data_queue.abort()
+                    raise
+                finally:
+                    # shut down all the workers
+                    if info_queue is not None:
+                        # this is strictly speaking one too many worker for
+                        # this queu, but closing too many is not a problem.
                         info_queue.close(len(workers))
-                        for w in workers:
-                            w.join()
+                    for w in workers:
+                        w.join()
 
             # force @filecache properties to be reloaded from
             # streamclone-ed file at next access
@@ -1181,6 +1202,46 @@
 ]
 
 
+class _DataQueue:
+    """A queue passing data from the bundle stream to other thread"""
+
+    def __init__(self):
+        self._q = queue.Queue()
+        self._abort = False
+
+    def fill_from(self, data):
+        """fill the data queue from a bundle2 part object
+
+        This is meant to be called by the data reading thread
+        """
+        q = self._q
+        try:
+            for item in data:
+                q.put(item)
+                if self._abort:
+                    break
+        finally:
+            q.put(None)
+
+    def __iter__(self):
+        """Iterate over the bundle chunkgs
+
+        This is meant to be called by the data parsing thread."""
+        q = self._q
+        while (i := q.get()) is not None:
+            yield i
+            if self._abort:
+                break
+
+    def abort(self):
+        """stop the data-reading thread and interrup the iteration of the consumer
+
+        This is meant to be called on error.
+        """
+        self._abort = True
+        self._q.put(None)
+
+
 class _FileInfoQueue:
     """A thread-safe queue to passer parsed file information to the writers"""