stream-clone-v2: bypass the vfs to write the file on disk
Now that all preparation is done in the main thread, we no longer need the vfs
to be involved at the writing end.
This very significantly speeds things up.
### benchmark.name = hg.perf.exchange.stream.consume
# bin-env-vars.hg.flavor = default
# bin-env-vars.hg.py-re2-module = default
# benchmark.variants.memory-target = default
# benchmark.variants.num-writer = default
# benchmark.variants.parallel-processing = yes
# benchmark.variants.progress = no
# benchmark.variants.read-from-memory = yes
# benchmark.variants.version = v2
## data-env-vars.name = mercurial-public-2024-03-22-zstd-sparse-revlog
prev-change: 0.217293 ~~~~~
this-change: 0.154647 (-28.83%, -0.06)
## data-env-vars.name = netbeans-2019-11-07-zstd-sparse-revlog
prev-change: 11.222771 ~~~~~
this-change: 7.843554 (-30.11%, -3.38)
## data-env-vars.name = netbsd-xsrc-all-2024-09-19-zstd-sparse-revlog
prev-change: 4.465113 ~~~~~
this-change: 3.040664 (-31.90%, -1.42)
## data-env-vars.name = netbsd-xsrc-draft-2024-09-19-zstd-sparse-revlog
prev-change: 4.667360 ~~~~~
this-change: 3.070976 (-34.20%, -1.60)
## data-env-vars.name = pypy-2024-03-22-zstd-sparse-revlog
prev-change: 2.559670 ~~~~~
this-change: 1.832118 (-28.42%, -0.73)
## data-env-vars.name = heptapod-public-2024-03-25-zstd-sparse-revlog
prev-change: 6.123469 ~~~~~
this-change: 4.478754 (-26.86%, -1.64)
## data-env-vars.name = mozilla-central-2024-03-22-zstd-sparse-revlog
prev-change: 44.781498 ~~~~~
this-change: 30.349379 (-32.23%, -14.43)
## data-env-vars.name = mozilla-unified-2024-03-22-zstd-sparse-revlog
prev-change: 44.396959 ~~~~~
this-change: 31.179906 (-29.77%, -13.22)
## data-env-vars.name = mozilla-try-2024-03-26-zstd-sparse-revlog
# benchmark.variants.read-from-memory = no
prev-change: 108.552706 ~~~~~
this-change: 91.454508 (-15.75%, -17.10)
--- a/mercurial/streamclone.py Wed Jan 29 02:17:33 2025 +0100
+++ b/mercurial/streamclone.py Wed Jan 29 02:23:02 2025 +0100
@@ -1176,14 +1176,14 @@
mark_used=mark_used,
)
if not threaded:
- _write_files(vfsmap, files)
+ _write_files(files)
else:
info_queue = _FileInfoQueue(files)
for __ in range(num_writer):
w = threading.Thread(
target=_write_files,
- args=(vfsmap, info_queue),
+ args=(info_queue,),
)
workers.append(w)
w.start()
@@ -1217,8 +1217,8 @@
FileChunksT = Iterator[bytes]
# Contains the information necessary to write stream file on disk
FileInfoT = Tuple[
- bytes, # vfs key
- bytes, # file name (non-vfs-encoded)
+ bytes, # real fs path
+ Optional[int], # permission to give to chmod
FileChunksT, # content
]
@@ -1521,14 +1521,14 @@
b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
)
vfs = vfs_map[src]
- vfs.prepare_streamed_file(name, known_dirs)
+ path, mode = vfs.prepare_streamed_file(name, known_dirs)
if datalen <= util.DEFAULT_FILE_CHUNK:
c = fp.read(datalen)
offset = fp.tell()
report.byte_count += len(c)
progress.increment(step=len(c))
chunks = _trivial_file(c, mark_used, offset)
- yield (src, name, iter(chunks))
+ yield (path, mode, iter(chunks))
else:
chunks = file_chunker(
fp,
@@ -1537,25 +1537,26 @@
report,
mark_used=mark_used,
)
- yield (src, name, iter(chunks))
+ yield (path, mode, iter(chunks))
# make sure we read all the chunk before moving to the next file
chunks.fill()
-def _write_files(vfsmap, info: Iterable[FileInfoT]):
+def _write_files(info: Iterable[FileInfoT]):
"""write files from parsed data"""
- for src, name, data in info:
- vfs = vfsmap[src]
+ for path, mode, data in info:
# we disable the internal Python buffering because the streamed data
# are assume to have been written with large enough block for it to not
# matters. So we only have more memory copy and GIL holding time to
# gain with the Python buffering.
- with vfs(name, b'w', buffering=0) as ofp:
+ with open(path, 'wb', buffering=0) as ofp:
for chunk in data:
written = ofp.write(chunk)
# write missing pieces if the write was interrupted
while written < len(chunk):
written += ofp.write(chunk[written:])
+ if mode is not None:
+ os.chmod(path, mode & 0o666)
def consumev3(repo, fp) -> None:
--- a/mercurial/vfs.py Wed Jan 29 02:17:33 2025 +0100
+++ b/mercurial/vfs.py Wed Jan 29 02:23:02 2025 +0100
@@ -90,6 +90,8 @@
# createmode is always available on subclasses
createmode: int
+ _chmod: bool
+
# TODO: type return, which is util.posixfile wrapped by a proxy
@abc.abstractmethod
def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs) -> Any:
@@ -462,12 +464,17 @@
def prepare_streamed_file(
self, path: bytes, known_directories: Set[bytes]
- ) -> None:
+ ) -> Tuple[bytes, Optional[int]]:
"""make sure we are ready to write a file from a stream clone
The "known_directories" variable is here to avoid trying to create the
same directories over and over during a stream clone. It will be
updated by this function.
+
+ return (path, mode)::
+
+ <path> is the real file system path content should be written to,
+ <mode> is the file mode that need to be set if any.
"""
self._auditpath(path, b'wb')
self.register_file(path)
@@ -476,6 +483,10 @@
if dirname not in known_directories:
util.makedirs(dirname, self.createmode, True)
known_directories.add(dirname)
+ mode = None
+ if self.createmode is not None:
+ mode = self.createmode & 0o666
+ return real_path, mode
class vfs(abstractvfs):