diff mercurial/streamclone.py @ 52391:3f0cf7bb3086

stream: preserve volatile cache early Since cache file are not protected by the lock, their state might change between their initial detection, the computation of their size and they preservation by the VolatileManager. So we gather all theses step in a single one to avoid such race. This also handle disappearing cache file in the "copy/hardlink" clone cases.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 04 Dec 2024 23:31:46 +0100
parents 11484a19cd77
children 24ee91ba9aa8
line wrap: on
line diff
--- a/mercurial/streamclone.py	Wed Dec 04 17:13:39 2024 +0100
+++ b/mercurial/streamclone.py	Wed Dec 04 23:31:46 2024 +0100
@@ -8,9 +8,12 @@
 from __future__ import annotations
 
 import contextlib
+import errno
 import os
 import struct
 
+from typing import Optional
+
 from .i18n import _
 from .interfaces import repository
 from . import (
@@ -271,6 +274,7 @@
     # Get consistent snapshot of repo, lock during scan.
     with repo.lock():
         repo.ui.debug(b'scanning\n')
+        _test_sync_point_walk_1_2(repo)
         for entry in _walkstreamfiles(repo):
             for f in entry.files():
                 file_size = f.file_size(repo.store.vfs)
@@ -653,6 +657,7 @@
         fp.seek(0, os.SEEK_END)
         size = fp.tell()
         self._volatile_fps[src] = (size, fp)
+        return size
 
     def __call__(self, src):
         """preserve the volatile file at src"""
@@ -661,6 +666,23 @@
             self._flush_some_on_disk()
         self._keep_one(src)
 
+    def try_keep(self, src) -> Optional[int]:
+        """record a volatile file and returns it size
+
+        return None if the file does not exists.
+
+        Used for cache file that are not lock protected.
+        """
+        assert 0 < self._counter
+        if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
+            self._flush_some_on_disk()
+        try:
+            return self._keep_one(src)
+        except IOError as err:
+            if err.errno not in (errno.ENOENT, errno.EPERM):
+                raise
+            return None
+
     @contextlib.contextmanager
     def open(self, src):
         assert 0 < self._counter
@@ -706,6 +728,7 @@
 
     # translate the vfs one
     entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+    _test_sync_point_walk_1_2(repo)
 
     max_linkrev = len(repo)
     file_count = totalfilesize = 0
@@ -771,16 +794,25 @@
         )
 
     # translate the vfs once
-    entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+    # we only turn this into a list for the `_test_sync`, this is not ideal
+    base_entries = list(entries)
+    _test_sync_point_walk_1_2(repo)
+    entries = []
     with VolatileManager() as volatiles:
         # make sure we preserve volatile files
-        for k, vfs, e in entries:
+        for vfs_key, e in base_entries:
+            vfs = vfsmap[vfs_key]
+            any_files = True
             if e.maybe_volatile:
+                any_files = False
                 e.preserve_volatiles(vfs, volatiles)
                 for f in e.files():
                     if f.is_volatile:
                         # record the expected size under lock
                         f.file_size(vfs)
+                    any_files = True
+            if any_files:
+                entries.append((vfs_key, vfsmap[vfs_key], e))
 
         total_entry_count = len(entries)
 
@@ -813,12 +845,79 @@
                 progress.increment()
 
 
+def _test_sync_point_walk_1_2(repo):
+    """a function for synchronisation during tests
+
+    Triggered after gather entry, but before starting to process/preserve them
+    under lock.
+
+    (on v1 is triggered before the actual walk start)
+    """
+
+
 def _test_sync_point_walk_3(repo):
-    """a function for synchronisation during tests"""
+    """a function for synchronisation during tests
+
+    Triggered right before releasing the lock, but after computing what need
+    needed to compute under lock.
+    """
 
 
 def _test_sync_point_walk_4(repo):
-    """a function for synchronisation during tests"""
+    """a function for synchronisation during tests
+
+    Triggered right after releasing the lock.
+    """
+
+
+# not really a StoreEntry, but close enough
+class CacheEntry(store.SimpleStoreEntry):
+    """Represent an entry for Cache files
+
+    It has special logic to preserve cache file early and accept optional
+    presence.
+
+
+    (Yes... this is not really a StoreEntry, but close enough. We could have a
+    BaseEntry base class, bbut the store one would be identical)
+    """
+
+    def __init__(self, entry_path):
+        super().__init__(
+            entry_path,
+            # we will directly deal with that in `setup_cache_file`
+            is_volatile=True,
+        )
+
+    def preserve_volatiles(self, vfs, volatiles):
+        self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
+        if self._file_size is None:
+            self._files = []
+        else:
+            assert self._is_volatile
+            self._files = [
+                CacheFile(
+                    unencoded_path=self._entry_path,
+                    file_size=self._file_size,
+                    is_volatile=self._is_volatile,
+                )
+            ]
+
+    def files(self):
+        if self._files is None:
+            self._files = [
+                CacheFile(
+                    unencoded_path=self._entry_path,
+                    is_volatile=self._is_volatile,
+                )
+            ]
+        return super().files()
+
+
+class CacheFile(store.StoreFile):
+    # inform the "copy/hardlink" version that this file might be missing
+    # without consequences.
+    optional = True
 
 
 def _entries_walk(repo, includes, excludes, includeobsmarkers):
@@ -850,11 +949,7 @@
         for name in cacheutil.cachetocopy(repo):
             if repo.cachevfs.exists(name):
                 # not really a StoreEntry, but close enough
-                entry = store.SimpleStoreEntry(
-                    entry_path=name,
-                    is_volatile=True,
-                )
-                yield (_srccache, entry)
+                yield (_srccache, CacheEntry(entry_path=name))
 
 
 def generatev2(repo, includes, excludes, includeobsmarkers):
@@ -879,7 +974,6 @@
             excludes=excludes,
             includeobsmarkers=includeobsmarkers,
         )
-
         chunks = _emit2(repo, entries)
         first = next(chunks)
         file_count, total_file_size = first
@@ -1141,7 +1235,7 @@
         hardlink[0] = False
         progress.topic = _(b'copying')
 
-    for k, path in entries:
+    for k, path, optional in entries:
         src_vfs = src_vfs_map[k]
         dst_vfs = dst_vfs_map[k]
         src_path = src_vfs.join(path)
@@ -1153,13 +1247,17 @@
             util.makedirs(dirname)
         dst_vfs.register_file(path)
         # XXX we could use the #nb_bytes argument.
-        util.copyfile(
-            src_path,
-            dst_path,
-            hardlink=hardlink[0],
-            no_hardlink_cb=copy_used,
-            check_fs_hardlink=False,
-        )
+        try:
+            util.copyfile(
+                src_path,
+                dst_path,
+                hardlink=hardlink[0],
+                no_hardlink_cb=copy_used,
+                check_fs_hardlink=False,
+            )
+        except FileNotFoundError:
+            if not optional:
+                raise
         progress.increment()
     return hardlink[0]
 
@@ -1214,7 +1312,7 @@
             # to the files while we do the clone, so this is not done yet. We
             # could do this blindly when copying files.
             files = [
-                (vfs_key, f.unencoded_path)
+                (vfs_key, f.unencoded_path, f.optional)
                 for vfs_key, e in entries
                 for f in e.files()
             ]