Mercurial > public > mercurial-scm > hg-stable
diff mercurial/streamclone.py @ 52391:3f0cf7bb3086
stream: preserve volatile cache early
Since cache file are not protected by the lock, their state might change between
their initial detection, the computation of their size and they preservation by
the VolatileManager. So we gather all theses step in a single one to avoid such
race.
This also handle disappearing cache file in the "copy/hardlink" clone cases.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 04 Dec 2024 23:31:46 +0100 |
parents | 11484a19cd77 |
children | 24ee91ba9aa8 |
line wrap: on
line diff
--- a/mercurial/streamclone.py Wed Dec 04 17:13:39 2024 +0100 +++ b/mercurial/streamclone.py Wed Dec 04 23:31:46 2024 +0100 @@ -8,9 +8,12 @@ from __future__ import annotations import contextlib +import errno import os import struct +from typing import Optional + from .i18n import _ from .interfaces import repository from . import ( @@ -271,6 +274,7 @@ # Get consistent snapshot of repo, lock during scan. with repo.lock(): repo.ui.debug(b'scanning\n') + _test_sync_point_walk_1_2(repo) for entry in _walkstreamfiles(repo): for f in entry.files(): file_size = f.file_size(repo.store.vfs) @@ -653,6 +657,7 @@ fp.seek(0, os.SEEK_END) size = fp.tell() self._volatile_fps[src] = (size, fp) + return size def __call__(self, src): """preserve the volatile file at src""" @@ -661,6 +666,23 @@ self._flush_some_on_disk() self._keep_one(src) + def try_keep(self, src) -> Optional[int]: + """record a volatile file and returns it size + + return None if the file does not exists. + + Used for cache file that are not lock protected. + """ + assert 0 < self._counter + if len(self._volatile_fps) >= (self.MAX_OPEN - 1): + self._flush_some_on_disk() + try: + return self._keep_one(src) + except IOError as err: + if err.errno not in (errno.ENOENT, errno.EPERM): + raise + return None + @contextlib.contextmanager def open(self, src): assert 0 < self._counter @@ -706,6 +728,7 @@ # translate the vfs one entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + _test_sync_point_walk_1_2(repo) max_linkrev = len(repo) file_count = totalfilesize = 0 @@ -771,16 +794,25 @@ ) # translate the vfs once - entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + # we only turn this into a list for the `_test_sync`, this is not ideal + base_entries = list(entries) + _test_sync_point_walk_1_2(repo) + entries = [] with VolatileManager() as volatiles: # make sure we preserve volatile files - for k, vfs, e in entries: + for vfs_key, e in base_entries: + vfs = vfsmap[vfs_key] + any_files = True if e.maybe_volatile: + any_files = False e.preserve_volatiles(vfs, volatiles) for f in e.files(): if f.is_volatile: # record the expected size under lock f.file_size(vfs) + any_files = True + if any_files: + entries.append((vfs_key, vfsmap[vfs_key], e)) total_entry_count = len(entries) @@ -813,12 +845,79 @@ progress.increment() +def _test_sync_point_walk_1_2(repo): + """a function for synchronisation during tests + + Triggered after gather entry, but before starting to process/preserve them + under lock. + + (on v1 is triggered before the actual walk start) + """ + + def _test_sync_point_walk_3(repo): - """a function for synchronisation during tests""" + """a function for synchronisation during tests + + Triggered right before releasing the lock, but after computing what need + needed to compute under lock. + """ def _test_sync_point_walk_4(repo): - """a function for synchronisation during tests""" + """a function for synchronisation during tests + + Triggered right after releasing the lock. + """ + + +# not really a StoreEntry, but close enough +class CacheEntry(store.SimpleStoreEntry): + """Represent an entry for Cache files + + It has special logic to preserve cache file early and accept optional + presence. + + + (Yes... this is not really a StoreEntry, but close enough. We could have a + BaseEntry base class, bbut the store one would be identical) + """ + + def __init__(self, entry_path): + super().__init__( + entry_path, + # we will directly deal with that in `setup_cache_file` + is_volatile=True, + ) + + def preserve_volatiles(self, vfs, volatiles): + self._file_size = volatiles.try_keep(vfs.join(self._entry_path)) + if self._file_size is None: + self._files = [] + else: + assert self._is_volatile + self._files = [ + CacheFile( + unencoded_path=self._entry_path, + file_size=self._file_size, + is_volatile=self._is_volatile, + ) + ] + + def files(self): + if self._files is None: + self._files = [ + CacheFile( + unencoded_path=self._entry_path, + is_volatile=self._is_volatile, + ) + ] + return super().files() + + +class CacheFile(store.StoreFile): + # inform the "copy/hardlink" version that this file might be missing + # without consequences. + optional = True def _entries_walk(repo, includes, excludes, includeobsmarkers): @@ -850,11 +949,7 @@ for name in cacheutil.cachetocopy(repo): if repo.cachevfs.exists(name): # not really a StoreEntry, but close enough - entry = store.SimpleStoreEntry( - entry_path=name, - is_volatile=True, - ) - yield (_srccache, entry) + yield (_srccache, CacheEntry(entry_path=name)) def generatev2(repo, includes, excludes, includeobsmarkers): @@ -879,7 +974,6 @@ excludes=excludes, includeobsmarkers=includeobsmarkers, ) - chunks = _emit2(repo, entries) first = next(chunks) file_count, total_file_size = first @@ -1141,7 +1235,7 @@ hardlink[0] = False progress.topic = _(b'copying') - for k, path in entries: + for k, path, optional in entries: src_vfs = src_vfs_map[k] dst_vfs = dst_vfs_map[k] src_path = src_vfs.join(path) @@ -1153,13 +1247,17 @@ util.makedirs(dirname) dst_vfs.register_file(path) # XXX we could use the #nb_bytes argument. - util.copyfile( - src_path, - dst_path, - hardlink=hardlink[0], - no_hardlink_cb=copy_used, - check_fs_hardlink=False, - ) + try: + util.copyfile( + src_path, + dst_path, + hardlink=hardlink[0], + no_hardlink_cb=copy_used, + check_fs_hardlink=False, + ) + except FileNotFoundError: + if not optional: + raise progress.increment() return hardlink[0] @@ -1214,7 +1312,7 @@ # to the files while we do the clone, so this is not done yet. We # could do this blindly when copying files. files = [ - (vfs_key, f.unencoded_path) + (vfs_key, f.unencoded_path, f.optional) for vfs_key, e in entries for f in e.files() ]