stream-clone-v2: bypass the vfs to write the file on disk
authorPierre-Yves David <pierre-yves.david@octobus.net>
Wed, 29 Jan 2025 02:23:02 +0100
changeset 52930 22e264ac7f60
parent 52929 5b8f6e198a6e
child 52931 c124308e3cd4
stream-clone-v2: bypass the vfs to write the file on disk Now that all preparation is done in the main thread, we no longer need the vfs to be involved at the writing end. This very significantly speeds things up. ### benchmark.name = hg.perf.exchange.stream.consume # bin-env-vars.hg.flavor = default # bin-env-vars.hg.py-re2-module = default # benchmark.variants.memory-target = default # benchmark.variants.num-writer = default # benchmark.variants.parallel-processing = yes # benchmark.variants.progress = no # benchmark.variants.read-from-memory = yes # benchmark.variants.version = v2 ## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog prev-change: 0.217293 ~~~~~ this-change: 0.154647 (-28.83%, -0.06) ## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog prev-change: 11.222771 ~~~~~ this-change: 7.843554 (-30.11%, -3.38) ## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog prev-change: 4.465113 ~~~~~ this-change: 3.040664 (-31.90%, -1.42) ## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog prev-change: 4.667360 ~~~~~ this-change: 3.070976 (-34.20%, -1.60) ## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog prev-change: 2.559670 ~~~~~ this-change: 1.832118 (-28.42%, -0.73) ## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog prev-change: 6.123469 ~~~~~ this-change: 4.478754 (-26.86%, -1.64) ## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog prev-change: 44.781498 ~~~~~ this-change: 30.349379 (-32.23%, -14.43) ## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog prev-change: 44.396959 ~~~~~ this-change: 31.179906 (-29.77%, -13.22) ## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog # benchmark.variants.read-from-memory = no prev-change: 108.552706 ~~~~~ this-change: 91.454508 (-15.75%, -17.10)
mercurial/streamclone.py
mercurial/vfs.py
--- a/mercurial/streamclone.py	Wed Jan 29 02:17:33 2025 +0100
+++ b/mercurial/streamclone.py	Wed Jan 29 02:23:02 2025 +0100
@@ -1176,14 +1176,14 @@
                         mark_used=mark_used,
                     )
                     if not threaded:
-                        _write_files(vfsmap, files)
+                        _write_files(files)
                     else:
                         info_queue = _FileInfoQueue(files)
 
                         for __ in range(num_writer):
                             w = threading.Thread(
                                 target=_write_files,
-                                args=(vfsmap, info_queue),
+                                args=(info_queue,),
                             )
                             workers.append(w)
                             w.start()
@@ -1217,8 +1217,8 @@
 FileChunksT = Iterator[bytes]
 # Contains the information necessary to write stream file on disk
 FileInfoT = Tuple[
-    bytes,  # vfs key
-    bytes,  # file name (non-vfs-encoded)
+    bytes,  # real fs path
+    Optional[int],  # permission to give to chmod
     FileChunksT,  # content
 ]
 
@@ -1521,14 +1521,14 @@
                 b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
             )
         vfs = vfs_map[src]
-        vfs.prepare_streamed_file(name, known_dirs)
+        path, mode = vfs.prepare_streamed_file(name, known_dirs)
         if datalen <= util.DEFAULT_FILE_CHUNK:
             c = fp.read(datalen)
             offset = fp.tell()
             report.byte_count += len(c)
             progress.increment(step=len(c))
             chunks = _trivial_file(c, mark_used, offset)
-            yield (src, name, iter(chunks))
+            yield (path, mode, iter(chunks))
         else:
             chunks = file_chunker(
                 fp,
@@ -1537,25 +1537,26 @@
                 report,
                 mark_used=mark_used,
             )
-            yield (src, name, iter(chunks))
+            yield (path, mode, iter(chunks))
             # make sure we read all the chunk before moving to the next file
             chunks.fill()
 
 
-def _write_files(vfsmap, info: Iterable[FileInfoT]):
+def _write_files(info: Iterable[FileInfoT]):
     """write files from parsed data"""
-    for src, name, data in info:
-        vfs = vfsmap[src]
+    for path, mode, data in info:
         # we disable the internal Python buffering because the streamed data
         # are assume to have been written with large enough block for it to not
         # matters. So we only have more memory copy and GIL holding time to
         # gain with the Python buffering.
-        with vfs(name, b'w', buffering=0) as ofp:
+        with open(path, 'wb', buffering=0) as ofp:
             for chunk in data:
                 written = ofp.write(chunk)
                 # write missing pieces if the write was interrupted
                 while written < len(chunk):
                     written += ofp.write(chunk[written:])
+        if mode is not None:
+            os.chmod(path, mode & 0o666)
 
 
 def consumev3(repo, fp) -> None:
--- a/mercurial/vfs.py	Wed Jan 29 02:17:33 2025 +0100
+++ b/mercurial/vfs.py	Wed Jan 29 02:23:02 2025 +0100
@@ -90,6 +90,8 @@
     # createmode is always available on subclasses
     createmode: int
 
+    _chmod: bool
+
     # TODO: type return, which is util.posixfile wrapped by a proxy
     @abc.abstractmethod
     def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs) -> Any:
@@ -462,12 +464,17 @@
 
     def prepare_streamed_file(
         self, path: bytes, known_directories: Set[bytes]
-    ) -> None:
+    ) -> Tuple[bytes, Optional[int]]:
         """make sure we are ready to write a file from a stream clone
 
         The "known_directories" variable is here to avoid trying to create the
         same directories over and over during a stream clone. It will be
         updated by this function.
+
+        return (path, mode)::
+
+            <path> is the real file system path content should be written to,
+            <mode> is the file mode that need to be set if any.
         """
         self._auditpath(path, b'wb')
         self.register_file(path)
@@ -476,6 +483,10 @@
         if dirname not in known_directories:
             util.makedirs(dirname, self.createmode, True)
             known_directories.add(dirname)
+        mode = None
+        if self.createmode is not None:
+            mode = self.createmode & 0o666
+        return real_path, mode
 
 
 class vfs(abstractvfs):