mercurial/streamclone.py
changeset 52359 3f0cf7bb3086
parent 52358 11484a19cd77
child 52640 24ee91ba9aa8
--- a/mercurial/streamclone.py	Wed Dec 04 17:13:39 2024 +0100
+++ b/mercurial/streamclone.py	Wed Dec 04 23:31:46 2024 +0100
@@ -8,9 +8,12 @@
 from __future__ import annotations
 
 import contextlib
+import errno
 import os
 import struct
 
+from typing import Optional
+
 from .i18n import _
 from .interfaces import repository
 from . import (
@@ -271,6 +274,7 @@
     # Get consistent snapshot of repo, lock during scan.
     with repo.lock():
         repo.ui.debug(b'scanning\n')
+        _test_sync_point_walk_1_2(repo)
         for entry in _walkstreamfiles(repo):
             for f in entry.files():
                 file_size = f.file_size(repo.store.vfs)
@@ -653,6 +657,7 @@
         fp.seek(0, os.SEEK_END)
         size = fp.tell()
         self._volatile_fps[src] = (size, fp)
+        return size
 
     def __call__(self, src):
         """preserve the volatile file at src"""
@@ -661,6 +666,23 @@
             self._flush_some_on_disk()
         self._keep_one(src)
 
+    def try_keep(self, src) -> Optional[int]:
+        """record a volatile file and returns it size
+
+        return None if the file does not exists.
+
+        Used for cache file that are not lock protected.
+        """
+        assert 0 < self._counter
+        if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
+            self._flush_some_on_disk()
+        try:
+            return self._keep_one(src)
+        except IOError as err:
+            if err.errno not in (errno.ENOENT, errno.EPERM):
+                raise
+            return None
+
     @contextlib.contextmanager
     def open(self, src):
         assert 0 < self._counter
@@ -706,6 +728,7 @@
 
     # translate the vfs one
     entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+    _test_sync_point_walk_1_2(repo)
 
     max_linkrev = len(repo)
     file_count = totalfilesize = 0
@@ -771,16 +794,25 @@
         )
 
     # translate the vfs once
-    entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+    # we only turn this into a list for the `_test_sync`, this is not ideal
+    base_entries = list(entries)
+    _test_sync_point_walk_1_2(repo)
+    entries = []
     with VolatileManager() as volatiles:
         # make sure we preserve volatile files
-        for k, vfs, e in entries:
+        for vfs_key, e in base_entries:
+            vfs = vfsmap[vfs_key]
+            any_files = True
             if e.maybe_volatile:
+                any_files = False
                 e.preserve_volatiles(vfs, volatiles)
                 for f in e.files():
                     if f.is_volatile:
                         # record the expected size under lock
                         f.file_size(vfs)
+                    any_files = True
+            if any_files:
+                entries.append((vfs_key, vfsmap[vfs_key], e))
 
         total_entry_count = len(entries)
 
@@ -813,12 +845,79 @@
                 progress.increment()
 
 
+def _test_sync_point_walk_1_2(repo):
+    """a function for synchronisation during tests
+
+    Triggered after gather entry, but before starting to process/preserve them
+    under lock.
+
+    (on v1 is triggered before the actual walk start)
+    """
+
+
 def _test_sync_point_walk_3(repo):
-    """a function for synchronisation during tests"""
+    """a function for synchronisation during tests
+
+    Triggered right before releasing the lock, but after computing what need
+    needed to compute under lock.
+    """
 
 
 def _test_sync_point_walk_4(repo):
-    """a function for synchronisation during tests"""
+    """a function for synchronisation during tests
+
+    Triggered right after releasing the lock.
+    """
+
+
+# not really a StoreEntry, but close enough
+class CacheEntry(store.SimpleStoreEntry):
+    """Represent an entry for Cache files
+
+    It has special logic to preserve cache file early and accept optional
+    presence.
+
+
+    (Yes... this is not really a StoreEntry, but close enough. We could have a
+    BaseEntry base class, bbut the store one would be identical)
+    """
+
+    def __init__(self, entry_path):
+        super().__init__(
+            entry_path,
+            # we will directly deal with that in `setup_cache_file`
+            is_volatile=True,
+        )
+
+    def preserve_volatiles(self, vfs, volatiles):
+        self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
+        if self._file_size is None:
+            self._files = []
+        else:
+            assert self._is_volatile
+            self._files = [
+                CacheFile(
+                    unencoded_path=self._entry_path,
+                    file_size=self._file_size,
+                    is_volatile=self._is_volatile,
+                )
+            ]
+
+    def files(self):
+        if self._files is None:
+            self._files = [
+                CacheFile(
+                    unencoded_path=self._entry_path,
+                    is_volatile=self._is_volatile,
+                )
+            ]
+        return super().files()
+
+
+class CacheFile(store.StoreFile):
+    # inform the "copy/hardlink" version that this file might be missing
+    # without consequences.
+    optional = True
 
 
 def _entries_walk(repo, includes, excludes, includeobsmarkers):
@@ -850,11 +949,7 @@
         for name in cacheutil.cachetocopy(repo):
             if repo.cachevfs.exists(name):
                 # not really a StoreEntry, but close enough
-                entry = store.SimpleStoreEntry(
-                    entry_path=name,
-                    is_volatile=True,
-                )
-                yield (_srccache, entry)
+                yield (_srccache, CacheEntry(entry_path=name))
 
 
 def generatev2(repo, includes, excludes, includeobsmarkers):
@@ -879,7 +974,6 @@
             excludes=excludes,
             includeobsmarkers=includeobsmarkers,
         )
-
         chunks = _emit2(repo, entries)
         first = next(chunks)
         file_count, total_file_size = first
@@ -1141,7 +1235,7 @@
         hardlink[0] = False
         progress.topic = _(b'copying')
 
-    for k, path in entries:
+    for k, path, optional in entries:
         src_vfs = src_vfs_map[k]
         dst_vfs = dst_vfs_map[k]
         src_path = src_vfs.join(path)
@@ -1153,13 +1247,17 @@
             util.makedirs(dirname)
         dst_vfs.register_file(path)
         # XXX we could use the #nb_bytes argument.
-        util.copyfile(
-            src_path,
-            dst_path,
-            hardlink=hardlink[0],
-            no_hardlink_cb=copy_used,
-            check_fs_hardlink=False,
-        )
+        try:
+            util.copyfile(
+                src_path,
+                dst_path,
+                hardlink=hardlink[0],
+                no_hardlink_cb=copy_used,
+                check_fs_hardlink=False,
+            )
+        except FileNotFoundError:
+            if not optional:
+                raise
         progress.increment()
     return hardlink[0]
 
@@ -1214,7 +1312,7 @@
             # to the files while we do the clone, so this is not done yet. We
             # could do this blindly when copying files.
             files = [
-                (vfs_key, f.unencoded_path)
+                (vfs_key, f.unencoded_path, f.optional)
                 for vfs_key, e in entries
                 for f in e.files()
             ]