diff -r 11484a19cd77 -r 3f0cf7bb3086 mercurial/streamclone.py --- a/mercurial/streamclone.py Wed Dec 04 17:13:39 2024 +0100 +++ b/mercurial/streamclone.py Wed Dec 04 23:31:46 2024 +0100 @@ -8,9 +8,12 @@ from __future__ import annotations import contextlib +import errno import os import struct +from typing import Optional + from .i18n import _ from .interfaces import repository from . import ( @@ -271,6 +274,7 @@ # Get consistent snapshot of repo, lock during scan. with repo.lock(): repo.ui.debug(b'scanning\n') + _test_sync_point_walk_1_2(repo) for entry in _walkstreamfiles(repo): for f in entry.files(): file_size = f.file_size(repo.store.vfs) @@ -653,6 +657,7 @@ fp.seek(0, os.SEEK_END) size = fp.tell() self._volatile_fps[src] = (size, fp) + return size def __call__(self, src): """preserve the volatile file at src""" @@ -661,6 +666,23 @@ self._flush_some_on_disk() self._keep_one(src) + def try_keep(self, src) -> Optional[int]: + """record a volatile file and returns it size + + return None if the file does not exists. + + Used for cache file that are not lock protected. + """ + assert 0 < self._counter + if len(self._volatile_fps) >= (self.MAX_OPEN - 1): + self._flush_some_on_disk() + try: + return self._keep_one(src) + except IOError as err: + if err.errno not in (errno.ENOENT, errno.EPERM): + raise + return None + @contextlib.contextmanager def open(self, src): assert 0 < self._counter @@ -706,6 +728,7 @@ # translate the vfs one entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + _test_sync_point_walk_1_2(repo) max_linkrev = len(repo) file_count = totalfilesize = 0 @@ -771,16 +794,25 @@ ) # translate the vfs once - entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + # we only turn this into a list for the `_test_sync`, this is not ideal + base_entries = list(entries) + _test_sync_point_walk_1_2(repo) + entries = [] with VolatileManager() as volatiles: # make sure we preserve volatile files - for k, vfs, e in entries: + for vfs_key, e in base_entries: + vfs = vfsmap[vfs_key] + any_files = True if e.maybe_volatile: + any_files = False e.preserve_volatiles(vfs, volatiles) for f in e.files(): if f.is_volatile: # record the expected size under lock f.file_size(vfs) + any_files = True + if any_files: + entries.append((vfs_key, vfsmap[vfs_key], e)) total_entry_count = len(entries) @@ -813,12 +845,79 @@ progress.increment() +def _test_sync_point_walk_1_2(repo): + """a function for synchronisation during tests + + Triggered after gather entry, but before starting to process/preserve them + under lock. + + (on v1 is triggered before the actual walk start) + """ + + def _test_sync_point_walk_3(repo): - """a function for synchronisation during tests""" + """a function for synchronisation during tests + + Triggered right before releasing the lock, but after computing what need + needed to compute under lock. + """ def _test_sync_point_walk_4(repo): - """a function for synchronisation during tests""" + """a function for synchronisation during tests + + Triggered right after releasing the lock. + """ + + +# not really a StoreEntry, but close enough +class CacheEntry(store.SimpleStoreEntry): + """Represent an entry for Cache files + + It has special logic to preserve cache file early and accept optional + presence. + + + (Yes... this is not really a StoreEntry, but close enough. We could have a + BaseEntry base class, bbut the store one would be identical) + """ + + def __init__(self, entry_path): + super().__init__( + entry_path, + # we will directly deal with that in `setup_cache_file` + is_volatile=True, + ) + + def preserve_volatiles(self, vfs, volatiles): + self._file_size = volatiles.try_keep(vfs.join(self._entry_path)) + if self._file_size is None: + self._files = [] + else: + assert self._is_volatile + self._files = [ + CacheFile( + unencoded_path=self._entry_path, + file_size=self._file_size, + is_volatile=self._is_volatile, + ) + ] + + def files(self): + if self._files is None: + self._files = [ + CacheFile( + unencoded_path=self._entry_path, + is_volatile=self._is_volatile, + ) + ] + return super().files() + + +class CacheFile(store.StoreFile): + # inform the "copy/hardlink" version that this file might be missing + # without consequences. + optional = True def _entries_walk(repo, includes, excludes, includeobsmarkers): @@ -850,11 +949,7 @@ for name in cacheutil.cachetocopy(repo): if repo.cachevfs.exists(name): # not really a StoreEntry, but close enough - entry = store.SimpleStoreEntry( - entry_path=name, - is_volatile=True, - ) - yield (_srccache, entry) + yield (_srccache, CacheEntry(entry_path=name)) def generatev2(repo, includes, excludes, includeobsmarkers): @@ -879,7 +974,6 @@ excludes=excludes, includeobsmarkers=includeobsmarkers, ) - chunks = _emit2(repo, entries) first = next(chunks) file_count, total_file_size = first @@ -1141,7 +1235,7 @@ hardlink[0] = False progress.topic = _(b'copying') - for k, path in entries: + for k, path, optional in entries: src_vfs = src_vfs_map[k] dst_vfs = dst_vfs_map[k] src_path = src_vfs.join(path) @@ -1153,13 +1247,17 @@ util.makedirs(dirname) dst_vfs.register_file(path) # XXX we could use the #nb_bytes argument. - util.copyfile( - src_path, - dst_path, - hardlink=hardlink[0], - no_hardlink_cb=copy_used, - check_fs_hardlink=False, - ) + try: + util.copyfile( + src_path, + dst_path, + hardlink=hardlink[0], + no_hardlink_cb=copy_used, + check_fs_hardlink=False, + ) + except FileNotFoundError: + if not optional: + raise progress.increment() return hardlink[0] @@ -1214,7 +1312,7 @@ # to the files while we do the clone, so this is not done yet. We # could do this blindly when copying files. files = [ - (vfs_key, f.unencoded_path) + (vfs_key, f.unencoded_path, f.optional) for vfs_key, e in entries for f in e.files() ]