Mercurial > public > mercurial-scm > hg
changeset 52359:3f0cf7bb3086
stream: preserve volatile cache early
Since cache file are not protected by the lock, their state might change between
their initial detection, the computation of their size and they preservation by
the VolatileManager. So we gather all theses step in a single one to avoid such
race.
This also handle disappearing cache file in the "copy/hardlink" clone cases.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 04 Dec 2024 23:31:46 +0100 |
parents | 11484a19cd77 |
children | fca7d38e040b |
files | mercurial/store.py mercurial/streamclone.py tests/test-clone-stream-revlog-split.t tests/test-clone-stream.t tests/test-persistent-nodemap-stream-clone.t tests/testlib/ext-stream-clone-steps.py |
diffstat | 6 files changed, 177 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/store.py Wed Dec 04 17:13:39 2024 +0100 +++ b/mercurial/store.py Wed Dec 04 23:31:46 2024 +0100 @@ -460,6 +460,9 @@ unencoded_path = attr.ib() _file_size = attr.ib(default=None) is_volatile = attr.ib(default=False) + # Missing file can be safely ignored, used by "copy/hardlink" local clone + # for cache file not covered by lock. + optional = False def file_size(self, vfs): if self._file_size is None:
--- a/mercurial/streamclone.py Wed Dec 04 17:13:39 2024 +0100 +++ b/mercurial/streamclone.py Wed Dec 04 23:31:46 2024 +0100 @@ -8,9 +8,12 @@ from __future__ import annotations import contextlib +import errno import os import struct +from typing import Optional + from .i18n import _ from .interfaces import repository from . import ( @@ -271,6 +274,7 @@ # Get consistent snapshot of repo, lock during scan. with repo.lock(): repo.ui.debug(b'scanning\n') + _test_sync_point_walk_1_2(repo) for entry in _walkstreamfiles(repo): for f in entry.files(): file_size = f.file_size(repo.store.vfs) @@ -653,6 +657,7 @@ fp.seek(0, os.SEEK_END) size = fp.tell() self._volatile_fps[src] = (size, fp) + return size def __call__(self, src): """preserve the volatile file at src""" @@ -661,6 +666,23 @@ self._flush_some_on_disk() self._keep_one(src) + def try_keep(self, src) -> Optional[int]: + """record a volatile file and returns it size + + return None if the file does not exists. + + Used for cache file that are not lock protected. + """ + assert 0 < self._counter + if len(self._volatile_fps) >= (self.MAX_OPEN - 1): + self._flush_some_on_disk() + try: + return self._keep_one(src) + except IOError as err: + if err.errno not in (errno.ENOENT, errno.EPERM): + raise + return None + @contextlib.contextmanager def open(self, src): assert 0 < self._counter @@ -706,6 +728,7 @@ # translate the vfs one entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + _test_sync_point_walk_1_2(repo) max_linkrev = len(repo) file_count = totalfilesize = 0 @@ -771,16 +794,25 @@ ) # translate the vfs once - entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + # we only turn this into a list for the `_test_sync`, this is not ideal + base_entries = list(entries) + _test_sync_point_walk_1_2(repo) + entries = [] with VolatileManager() as volatiles: # make sure we preserve volatile files - for k, vfs, e in entries: + for vfs_key, e in base_entries: + vfs = vfsmap[vfs_key] + any_files = True if e.maybe_volatile: + any_files = False e.preserve_volatiles(vfs, volatiles) for f in e.files(): if f.is_volatile: # record the expected size under lock f.file_size(vfs) + any_files = True + if any_files: + entries.append((vfs_key, vfsmap[vfs_key], e)) total_entry_count = len(entries) @@ -813,12 +845,79 @@ progress.increment() +def _test_sync_point_walk_1_2(repo): + """a function for synchronisation during tests + + Triggered after gather entry, but before starting to process/preserve them + under lock. + + (on v1 is triggered before the actual walk start) + """ + + def _test_sync_point_walk_3(repo): - """a function for synchronisation during tests""" + """a function for synchronisation during tests + + Triggered right before releasing the lock, but after computing what need + needed to compute under lock. + """ def _test_sync_point_walk_4(repo): - """a function for synchronisation during tests""" + """a function for synchronisation during tests + + Triggered right after releasing the lock. + """ + + +# not really a StoreEntry, but close enough +class CacheEntry(store.SimpleStoreEntry): + """Represent an entry for Cache files + + It has special logic to preserve cache file early and accept optional + presence. + + + (Yes... this is not really a StoreEntry, but close enough. We could have a + BaseEntry base class, bbut the store one would be identical) + """ + + def __init__(self, entry_path): + super().__init__( + entry_path, + # we will directly deal with that in `setup_cache_file` + is_volatile=True, + ) + + def preserve_volatiles(self, vfs, volatiles): + self._file_size = volatiles.try_keep(vfs.join(self._entry_path)) + if self._file_size is None: + self._files = [] + else: + assert self._is_volatile + self._files = [ + CacheFile( + unencoded_path=self._entry_path, + file_size=self._file_size, + is_volatile=self._is_volatile, + ) + ] + + def files(self): + if self._files is None: + self._files = [ + CacheFile( + unencoded_path=self._entry_path, + is_volatile=self._is_volatile, + ) + ] + return super().files() + + +class CacheFile(store.StoreFile): + # inform the "copy/hardlink" version that this file might be missing + # without consequences. + optional = True def _entries_walk(repo, includes, excludes, includeobsmarkers): @@ -850,11 +949,7 @@ for name in cacheutil.cachetocopy(repo): if repo.cachevfs.exists(name): # not really a StoreEntry, but close enough - entry = store.SimpleStoreEntry( - entry_path=name, - is_volatile=True, - ) - yield (_srccache, entry) + yield (_srccache, CacheEntry(entry_path=name)) def generatev2(repo, includes, excludes, includeobsmarkers): @@ -879,7 +974,6 @@ excludes=excludes, includeobsmarkers=includeobsmarkers, ) - chunks = _emit2(repo, entries) first = next(chunks) file_count, total_file_size = first @@ -1141,7 +1235,7 @@ hardlink[0] = False progress.topic = _(b'copying') - for k, path in entries: + for k, path, optional in entries: src_vfs = src_vfs_map[k] dst_vfs = dst_vfs_map[k] src_path = src_vfs.join(path) @@ -1153,13 +1247,17 @@ util.makedirs(dirname) dst_vfs.register_file(path) # XXX we could use the #nb_bytes argument. - util.copyfile( - src_path, - dst_path, - hardlink=hardlink[0], - no_hardlink_cb=copy_used, - check_fs_hardlink=False, - ) + try: + util.copyfile( + src_path, + dst_path, + hardlink=hardlink[0], + no_hardlink_cb=copy_used, + check_fs_hardlink=False, + ) + except FileNotFoundError: + if not optional: + raise progress.increment() return hardlink[0] @@ -1214,7 +1312,7 @@ # to the files while we do the clone, so this is not done yet. We # could do this blindly when copying files. files = [ - (vfs_key, f.unencoded_path) + (vfs_key, f.unencoded_path, f.optional) for vfs_key, e in entries for f in e.files() ]
--- a/tests/test-clone-stream-revlog-split.t Wed Dec 04 17:13:39 2024 +0100 +++ b/tests/test-clone-stream-revlog-split.t Wed Dec 04 23:31:46 2024 +0100 @@ -44,12 +44,18 @@ setup synchronisation file + $ HG_TEST_STREAM_WALKED_FILE_1="$TESTTMP/sync_file_walked_1" + $ export HG_TEST_STREAM_WALKED_FILE_1 + $ HG_TEST_STREAM_WALKED_FILE_2="$TESTTMP/sync_file_walked_2" + $ export HG_TEST_STREAM_WALKED_FILE_2 $ HG_TEST_STREAM_WALKED_FILE_3="$TESTTMP/sync_file_walked_3" $ export HG_TEST_STREAM_WALKED_FILE_3 $ HG_TEST_STREAM_WALKED_FILE_4="$TESTTMP/sync_file_walked_4" $ export HG_TEST_STREAM_WALKED_FILE_4 $ HG_TEST_STREAM_WALKED_FILE_5="$TESTTMP/sync_file_walked_5" $ export HG_TEST_STREAM_WALKED_FILE_5 +(we don't need this wait point) + $ touch $HG_TEST_STREAM_WALKED_FILE_2 Test stream-clone raced by a revlog-split
--- a/tests/test-clone-stream.t Wed Dec 04 17:13:39 2024 +0100 +++ b/tests/test-clone-stream.t Wed Dec 04 23:31:46 2024 +0100 @@ -383,6 +383,10 @@ $ touch repo/f1 $ $TESTDIR/seq.py 50000 > repo/f2 $ hg -R repo ci -Aqm "0" + $ HG_TEST_STREAM_WALKED_FILE_1="$TESTTMP/sync_file_walked_1" + $ export HG_TEST_STREAM_WALKED_FILE_1 + $ HG_TEST_STREAM_WALKED_FILE_2="$TESTTMP/sync_file_walked_2" + $ export HG_TEST_STREAM_WALKED_FILE_2 $ HG_TEST_STREAM_WALKED_FILE_3="$TESTTMP/sync_file_walked_3" $ export HG_TEST_STREAM_WALKED_FILE_3 $ HG_TEST_STREAM_WALKED_FILE_4="$TESTTMP/sync_file_walked_4" @@ -399,11 +403,41 @@ clone while modifying the repo between stating file with write lock and actually serving file content +also delete some cache in the process + $ (hg clone -q --stream -U http://localhost:$HGPORT1 clone; touch "$HG_TEST_STREAM_WALKED_FILE_5") & + + $ $RUNTESTDIR/testlib/wait-on-file 10 $HG_TEST_STREAM_WALKED_FILE_1 +(delete one file) + $ ls repo/.hg/cache/rbc-revs-v2 + repo/.hg/cache/rbc-revs-v2 + $ rm repo/.hg/cache/rbc-revs-v2 +(truncate another) + $ ls repo/.hg/cache/rbc-names-v2 + repo/.hg/cache/rbc-names-v2 + $ echo football > repo/.hg/cache/rbc-names-v2 +(lenghten another one) + $ ls repo/.hg/cache/branch2-served + repo/.hg/cache/branch2-served + $ echo bar >> repo/.hg/cache/branch2-served +(remove one in wcache)) + $ ls repo/.hg/wcache/manifestfulltextcache + repo/.hg/wcache/manifestfulltextcache + $ rm repo/.hg/wcache/manifestfulltextcache + $ touch $HG_TEST_STREAM_WALKED_FILE_2 + $ $RUNTESTDIR/testlib/wait-on-file 10 $HG_TEST_STREAM_WALKED_FILE_3 $ echo >> repo/f1 $ echo >> repo/f2 $ hg -R repo ci -m "1" --config ui.timeout.warn=-1 +(truncate further) + $ ls repo/.hg/cache/rbc-names-v2 + repo/.hg/cache/rbc-names-v2 + $ echo foo > repo/.hg/cache/rbc-names-v2 +(lenghten another one) + $ ls repo/.hg/cache/branch2-served + repo/.hg/cache/branch2-served + $ echo babar >> repo/.hg/cache/branch2-served $ touch $HG_TEST_STREAM_WALKED_FILE_4 $ $RUNTESTDIR/testlib/wait-on-file 10 $HG_TEST_STREAM_WALKED_FILE_5 $ hg -R clone id
--- a/tests/test-persistent-nodemap-stream-clone.t Wed Dec 04 17:13:39 2024 +0100 +++ b/tests/test-persistent-nodemap-stream-clone.t Wed Dec 04 23:31:46 2024 +0100 @@ -79,6 +79,10 @@ setup the step-by-step stream cloning + $ HG_TEST_STREAM_WALKED_FILE_1="$TESTTMP/sync_file_walked_1" + $ export HG_TEST_STREAM_WALKED_FILE_1 + $ HG_TEST_STREAM_WALKED_FILE_2="$TESTTMP/sync_file_walked_2" + $ export HG_TEST_STREAM_WALKED_FILE_2 $ HG_TEST_STREAM_WALKED_FILE_3="$TESTTMP/sync_file_walked_3" $ export HG_TEST_STREAM_WALKED_FILE_3 $ HG_TEST_STREAM_WALKED_FILE_4="$TESTTMP/sync_file_walked_4" @@ -89,6 +93,8 @@ > [extensions] > steps=$RUNTESTDIR/testlib/ext-stream-clone-steps.py > EOF +(we don't need this wait point) + $ touch $HG_TEST_STREAM_WALKED_FILE_2 Check and record file state beforehand
--- a/tests/testlib/ext-stream-clone-steps.py Wed Dec 04 17:13:39 2024 +0100 +++ b/tests/testlib/ext-stream-clone-steps.py Wed Dec 04 23:31:46 2024 +0100 @@ -21,10 +21,17 @@ ) +WALKED_FILE_1 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_1'] +WALKED_FILE_2 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_2'] WALKED_FILE_3 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_3'] WALKED_FILE_4 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_4'] +def _test_sync_point_walk_1_2(orig, repo): + testing.write_file(WALKED_FILE_1) + testing.wait_file(WALKED_FILE_2) + + def _test_sync_point_walk_3(orig, repo): testing.write_file(WALKED_FILE_3) @@ -36,6 +43,10 @@ def uisetup(ui): extensions.wrapfunction( + streamclone, '_test_sync_point_walk_1_2', _test_sync_point_walk_1_2 + ) + + extensions.wrapfunction( streamclone, '_test_sync_point_walk_3', _test_sync_point_walk_3 )