changeset 52359:3f0cf7bb3086

stream: preserve volatile cache early Since cache file are not protected by the lock, their state might change between their initial detection, the computation of their size and they preservation by the VolatileManager. So we gather all theses step in a single one to avoid such race. This also handle disappearing cache file in the "copy/hardlink" clone cases.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 04 Dec 2024 23:31:46 +0100
parents 11484a19cd77
children fca7d38e040b
files mercurial/store.py mercurial/streamclone.py tests/test-clone-stream-revlog-split.t tests/test-clone-stream.t tests/test-persistent-nodemap-stream-clone.t tests/testlib/ext-stream-clone-steps.py
diffstat 6 files changed, 177 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/store.py	Wed Dec 04 17:13:39 2024 +0100
+++ b/mercurial/store.py	Wed Dec 04 23:31:46 2024 +0100
@@ -460,6 +460,9 @@
     unencoded_path = attr.ib()
     _file_size = attr.ib(default=None)
     is_volatile = attr.ib(default=False)
+    # Missing file can be safely ignored, used by "copy/hardlink" local clone
+    # for cache file not covered by lock.
+    optional = False
 
     def file_size(self, vfs):
         if self._file_size is None:
--- a/mercurial/streamclone.py	Wed Dec 04 17:13:39 2024 +0100
+++ b/mercurial/streamclone.py	Wed Dec 04 23:31:46 2024 +0100
@@ -8,9 +8,12 @@
 from __future__ import annotations
 
 import contextlib
+import errno
 import os
 import struct
 
+from typing import Optional
+
 from .i18n import _
 from .interfaces import repository
 from . import (
@@ -271,6 +274,7 @@
     # Get consistent snapshot of repo, lock during scan.
     with repo.lock():
         repo.ui.debug(b'scanning\n')
+        _test_sync_point_walk_1_2(repo)
         for entry in _walkstreamfiles(repo):
             for f in entry.files():
                 file_size = f.file_size(repo.store.vfs)
@@ -653,6 +657,7 @@
         fp.seek(0, os.SEEK_END)
         size = fp.tell()
         self._volatile_fps[src] = (size, fp)
+        return size
 
     def __call__(self, src):
         """preserve the volatile file at src"""
@@ -661,6 +666,23 @@
             self._flush_some_on_disk()
         self._keep_one(src)
 
+    def try_keep(self, src) -> Optional[int]:
+        """record a volatile file and returns it size
+
+        return None if the file does not exists.
+
+        Used for cache file that are not lock protected.
+        """
+        assert 0 < self._counter
+        if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
+            self._flush_some_on_disk()
+        try:
+            return self._keep_one(src)
+        except IOError as err:
+            if err.errno not in (errno.ENOENT, errno.EPERM):
+                raise
+            return None
+
     @contextlib.contextmanager
     def open(self, src):
         assert 0 < self._counter
@@ -706,6 +728,7 @@
 
     # translate the vfs one
     entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+    _test_sync_point_walk_1_2(repo)
 
     max_linkrev = len(repo)
     file_count = totalfilesize = 0
@@ -771,16 +794,25 @@
         )
 
     # translate the vfs once
-    entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+    # we only turn this into a list for the `_test_sync`, this is not ideal
+    base_entries = list(entries)
+    _test_sync_point_walk_1_2(repo)
+    entries = []
     with VolatileManager() as volatiles:
         # make sure we preserve volatile files
-        for k, vfs, e in entries:
+        for vfs_key, e in base_entries:
+            vfs = vfsmap[vfs_key]
+            any_files = True
             if e.maybe_volatile:
+                any_files = False
                 e.preserve_volatiles(vfs, volatiles)
                 for f in e.files():
                     if f.is_volatile:
                         # record the expected size under lock
                         f.file_size(vfs)
+                    any_files = True
+            if any_files:
+                entries.append((vfs_key, vfsmap[vfs_key], e))
 
         total_entry_count = len(entries)
 
@@ -813,12 +845,79 @@
                 progress.increment()
 
 
+def _test_sync_point_walk_1_2(repo):
+    """a function for synchronisation during tests
+
+    Triggered after gather entry, but before starting to process/preserve them
+    under lock.
+
+    (on v1 is triggered before the actual walk start)
+    """
+
+
 def _test_sync_point_walk_3(repo):
-    """a function for synchronisation during tests"""
+    """a function for synchronisation during tests
+
+    Triggered right before releasing the lock, but after computing what need
+    needed to compute under lock.
+    """
 
 
 def _test_sync_point_walk_4(repo):
-    """a function for synchronisation during tests"""
+    """a function for synchronisation during tests
+
+    Triggered right after releasing the lock.
+    """
+
+
+# not really a StoreEntry, but close enough
+class CacheEntry(store.SimpleStoreEntry):
+    """Represent an entry for Cache files
+
+    It has special logic to preserve cache file early and accept optional
+    presence.
+
+
+    (Yes... this is not really a StoreEntry, but close enough. We could have a
+    BaseEntry base class, bbut the store one would be identical)
+    """
+
+    def __init__(self, entry_path):
+        super().__init__(
+            entry_path,
+            # we will directly deal with that in `setup_cache_file`
+            is_volatile=True,
+        )
+
+    def preserve_volatiles(self, vfs, volatiles):
+        self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
+        if self._file_size is None:
+            self._files = []
+        else:
+            assert self._is_volatile
+            self._files = [
+                CacheFile(
+                    unencoded_path=self._entry_path,
+                    file_size=self._file_size,
+                    is_volatile=self._is_volatile,
+                )
+            ]
+
+    def files(self):
+        if self._files is None:
+            self._files = [
+                CacheFile(
+                    unencoded_path=self._entry_path,
+                    is_volatile=self._is_volatile,
+                )
+            ]
+        return super().files()
+
+
+class CacheFile(store.StoreFile):
+    # inform the "copy/hardlink" version that this file might be missing
+    # without consequences.
+    optional = True
 
 
 def _entries_walk(repo, includes, excludes, includeobsmarkers):
@@ -850,11 +949,7 @@
         for name in cacheutil.cachetocopy(repo):
             if repo.cachevfs.exists(name):
                 # not really a StoreEntry, but close enough
-                entry = store.SimpleStoreEntry(
-                    entry_path=name,
-                    is_volatile=True,
-                )
-                yield (_srccache, entry)
+                yield (_srccache, CacheEntry(entry_path=name))
 
 
 def generatev2(repo, includes, excludes, includeobsmarkers):
@@ -879,7 +974,6 @@
             excludes=excludes,
             includeobsmarkers=includeobsmarkers,
         )
-
         chunks = _emit2(repo, entries)
         first = next(chunks)
         file_count, total_file_size = first
@@ -1141,7 +1235,7 @@
         hardlink[0] = False
         progress.topic = _(b'copying')
 
-    for k, path in entries:
+    for k, path, optional in entries:
         src_vfs = src_vfs_map[k]
         dst_vfs = dst_vfs_map[k]
         src_path = src_vfs.join(path)
@@ -1153,13 +1247,17 @@
             util.makedirs(dirname)
         dst_vfs.register_file(path)
         # XXX we could use the #nb_bytes argument.
-        util.copyfile(
-            src_path,
-            dst_path,
-            hardlink=hardlink[0],
-            no_hardlink_cb=copy_used,
-            check_fs_hardlink=False,
-        )
+        try:
+            util.copyfile(
+                src_path,
+                dst_path,
+                hardlink=hardlink[0],
+                no_hardlink_cb=copy_used,
+                check_fs_hardlink=False,
+            )
+        except FileNotFoundError:
+            if not optional:
+                raise
         progress.increment()
     return hardlink[0]
 
@@ -1214,7 +1312,7 @@
             # to the files while we do the clone, so this is not done yet. We
             # could do this blindly when copying files.
             files = [
-                (vfs_key, f.unencoded_path)
+                (vfs_key, f.unencoded_path, f.optional)
                 for vfs_key, e in entries
                 for f in e.files()
             ]
--- a/tests/test-clone-stream-revlog-split.t	Wed Dec 04 17:13:39 2024 +0100
+++ b/tests/test-clone-stream-revlog-split.t	Wed Dec 04 23:31:46 2024 +0100
@@ -44,12 +44,18 @@
 
 setup synchronisation file
 
+  $ HG_TEST_STREAM_WALKED_FILE_1="$TESTTMP/sync_file_walked_1"
+  $ export HG_TEST_STREAM_WALKED_FILE_1
+  $ HG_TEST_STREAM_WALKED_FILE_2="$TESTTMP/sync_file_walked_2"
+  $ export HG_TEST_STREAM_WALKED_FILE_2
   $ HG_TEST_STREAM_WALKED_FILE_3="$TESTTMP/sync_file_walked_3"
   $ export HG_TEST_STREAM_WALKED_FILE_3
   $ HG_TEST_STREAM_WALKED_FILE_4="$TESTTMP/sync_file_walked_4"
   $ export HG_TEST_STREAM_WALKED_FILE_4
   $ HG_TEST_STREAM_WALKED_FILE_5="$TESTTMP/sync_file_walked_5"
   $ export HG_TEST_STREAM_WALKED_FILE_5
+(we don't need this wait point)
+  $ touch $HG_TEST_STREAM_WALKED_FILE_2
 
 
 Test stream-clone raced by a revlog-split
--- a/tests/test-clone-stream.t	Wed Dec 04 17:13:39 2024 +0100
+++ b/tests/test-clone-stream.t	Wed Dec 04 23:31:46 2024 +0100
@@ -383,6 +383,10 @@
   $ touch repo/f1
   $ $TESTDIR/seq.py 50000 > repo/f2
   $ hg -R repo ci -Aqm "0"
+  $ HG_TEST_STREAM_WALKED_FILE_1="$TESTTMP/sync_file_walked_1"
+  $ export HG_TEST_STREAM_WALKED_FILE_1
+  $ HG_TEST_STREAM_WALKED_FILE_2="$TESTTMP/sync_file_walked_2"
+  $ export HG_TEST_STREAM_WALKED_FILE_2
   $ HG_TEST_STREAM_WALKED_FILE_3="$TESTTMP/sync_file_walked_3"
   $ export HG_TEST_STREAM_WALKED_FILE_3
   $ HG_TEST_STREAM_WALKED_FILE_4="$TESTTMP/sync_file_walked_4"
@@ -399,11 +403,41 @@
 clone while modifying the repo between stating file with write lock and
 actually serving file content
 
+also delete some cache in the process
+
   $ (hg clone -q --stream -U http://localhost:$HGPORT1 clone; touch "$HG_TEST_STREAM_WALKED_FILE_5") &
+
+  $ $RUNTESTDIR/testlib/wait-on-file 10 $HG_TEST_STREAM_WALKED_FILE_1
+(delete one file)
+  $ ls repo/.hg/cache/rbc-revs-v2
+  repo/.hg/cache/rbc-revs-v2
+  $ rm repo/.hg/cache/rbc-revs-v2
+(truncate another)
+  $ ls repo/.hg/cache/rbc-names-v2
+  repo/.hg/cache/rbc-names-v2
+  $ echo football > repo/.hg/cache/rbc-names-v2
+(lenghten another one)
+  $ ls repo/.hg/cache/branch2-served
+  repo/.hg/cache/branch2-served
+  $ echo bar >> repo/.hg/cache/branch2-served
+(remove one in wcache))
+  $ ls repo/.hg/wcache/manifestfulltextcache
+  repo/.hg/wcache/manifestfulltextcache
+  $ rm repo/.hg/wcache/manifestfulltextcache
+  $ touch $HG_TEST_STREAM_WALKED_FILE_2
+
   $ $RUNTESTDIR/testlib/wait-on-file 10 $HG_TEST_STREAM_WALKED_FILE_3
   $ echo >> repo/f1
   $ echo >> repo/f2
   $ hg -R repo ci -m "1" --config ui.timeout.warn=-1
+(truncate further)
+  $ ls repo/.hg/cache/rbc-names-v2
+  repo/.hg/cache/rbc-names-v2
+  $ echo foo > repo/.hg/cache/rbc-names-v2
+(lenghten another one)
+  $ ls repo/.hg/cache/branch2-served
+  repo/.hg/cache/branch2-served
+  $ echo babar >> repo/.hg/cache/branch2-served
   $ touch $HG_TEST_STREAM_WALKED_FILE_4
   $ $RUNTESTDIR/testlib/wait-on-file 10 $HG_TEST_STREAM_WALKED_FILE_5
   $ hg -R clone id
--- a/tests/test-persistent-nodemap-stream-clone.t	Wed Dec 04 17:13:39 2024 +0100
+++ b/tests/test-persistent-nodemap-stream-clone.t	Wed Dec 04 23:31:46 2024 +0100
@@ -79,6 +79,10 @@
 
 setup the step-by-step stream cloning
 
+  $ HG_TEST_STREAM_WALKED_FILE_1="$TESTTMP/sync_file_walked_1"
+  $ export HG_TEST_STREAM_WALKED_FILE_1
+  $ HG_TEST_STREAM_WALKED_FILE_2="$TESTTMP/sync_file_walked_2"
+  $ export HG_TEST_STREAM_WALKED_FILE_2
   $ HG_TEST_STREAM_WALKED_FILE_3="$TESTTMP/sync_file_walked_3"
   $ export HG_TEST_STREAM_WALKED_FILE_3
   $ HG_TEST_STREAM_WALKED_FILE_4="$TESTTMP/sync_file_walked_4"
@@ -89,6 +93,8 @@
   > [extensions]
   > steps=$RUNTESTDIR/testlib/ext-stream-clone-steps.py
   > EOF
+(we don't need this wait point)
+  $ touch $HG_TEST_STREAM_WALKED_FILE_2
 
 Check and record file state beforehand
 
--- a/tests/testlib/ext-stream-clone-steps.py	Wed Dec 04 17:13:39 2024 +0100
+++ b/tests/testlib/ext-stream-clone-steps.py	Wed Dec 04 23:31:46 2024 +0100
@@ -21,10 +21,17 @@
 )
 
 
+WALKED_FILE_1 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_1']
+WALKED_FILE_2 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_2']
 WALKED_FILE_3 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_3']
 WALKED_FILE_4 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_4']
 
 
+def _test_sync_point_walk_1_2(orig, repo):
+    testing.write_file(WALKED_FILE_1)
+    testing.wait_file(WALKED_FILE_2)
+
+
 def _test_sync_point_walk_3(orig, repo):
     testing.write_file(WALKED_FILE_3)
 
@@ -36,6 +43,10 @@
 
 def uisetup(ui):
     extensions.wrapfunction(
+        streamclone, '_test_sync_point_walk_1_2', _test_sync_point_walk_1_2
+    )
+
+    extensions.wrapfunction(
         streamclone, '_test_sync_point_walk_3', _test_sync_point_walk_3
     )