--- a/mercurial/streamclone.py Wed Dec 04 17:13:39 2024 +0100
+++ b/mercurial/streamclone.py Wed Dec 04 23:31:46 2024 +0100
@@ -8,9 +8,12 @@
from __future__ import annotations
import contextlib
+import errno
import os
import struct
+from typing import Optional
+
from .i18n import _
from .interfaces import repository
from . import (
@@ -271,6 +274,7 @@
# Get consistent snapshot of repo, lock during scan.
with repo.lock():
repo.ui.debug(b'scanning\n')
+ _test_sync_point_walk_1_2(repo)
for entry in _walkstreamfiles(repo):
for f in entry.files():
file_size = f.file_size(repo.store.vfs)
@@ -653,6 +657,7 @@
fp.seek(0, os.SEEK_END)
size = fp.tell()
self._volatile_fps[src] = (size, fp)
+ return size
def __call__(self, src):
"""preserve the volatile file at src"""
@@ -661,6 +666,23 @@
self._flush_some_on_disk()
self._keep_one(src)
+ def try_keep(self, src) -> Optional[int]:
+ """record a volatile file and returns it size
+
+ return None if the file does not exists.
+
+ Used for cache file that are not lock protected.
+ """
+ assert 0 < self._counter
+ if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
+ self._flush_some_on_disk()
+ try:
+ return self._keep_one(src)
+ except IOError as err:
+ if err.errno not in (errno.ENOENT, errno.EPERM):
+ raise
+ return None
+
@contextlib.contextmanager
def open(self, src):
assert 0 < self._counter
@@ -706,6 +728,7 @@
# translate the vfs one
entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+ _test_sync_point_walk_1_2(repo)
max_linkrev = len(repo)
file_count = totalfilesize = 0
@@ -771,16 +794,25 @@
)
# translate the vfs once
- entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+ # we only turn this into a list for the `_test_sync`, this is not ideal
+ base_entries = list(entries)
+ _test_sync_point_walk_1_2(repo)
+ entries = []
with VolatileManager() as volatiles:
# make sure we preserve volatile files
- for k, vfs, e in entries:
+ for vfs_key, e in base_entries:
+ vfs = vfsmap[vfs_key]
+ any_files = True
if e.maybe_volatile:
+ any_files = False
e.preserve_volatiles(vfs, volatiles)
for f in e.files():
if f.is_volatile:
# record the expected size under lock
f.file_size(vfs)
+ any_files = True
+ if any_files:
+ entries.append((vfs_key, vfsmap[vfs_key], e))
total_entry_count = len(entries)
@@ -813,12 +845,79 @@
progress.increment()
+def _test_sync_point_walk_1_2(repo):
+ """a function for synchronisation during tests
+
+ Triggered after gather entry, but before starting to process/preserve them
+ under lock.
+
+ (on v1 is triggered before the actual walk start)
+ """
+
+
def _test_sync_point_walk_3(repo):
- """a function for synchronisation during tests"""
+ """a function for synchronisation during tests
+
+ Triggered right before releasing the lock, but after computing what need
+ needed to compute under lock.
+ """
def _test_sync_point_walk_4(repo):
- """a function for synchronisation during tests"""
+ """a function for synchronisation during tests
+
+ Triggered right after releasing the lock.
+ """
+
+
+# not really a StoreEntry, but close enough
+class CacheEntry(store.SimpleStoreEntry):
+ """Represent an entry for Cache files
+
+ It has special logic to preserve cache file early and accept optional
+ presence.
+
+
+ (Yes... this is not really a StoreEntry, but close enough. We could have a
+ BaseEntry base class, bbut the store one would be identical)
+ """
+
+ def __init__(self, entry_path):
+ super().__init__(
+ entry_path,
+ # we will directly deal with that in `setup_cache_file`
+ is_volatile=True,
+ )
+
+ def preserve_volatiles(self, vfs, volatiles):
+ self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
+ if self._file_size is None:
+ self._files = []
+ else:
+ assert self._is_volatile
+ self._files = [
+ CacheFile(
+ unencoded_path=self._entry_path,
+ file_size=self._file_size,
+ is_volatile=self._is_volatile,
+ )
+ ]
+
+ def files(self):
+ if self._files is None:
+ self._files = [
+ CacheFile(
+ unencoded_path=self._entry_path,
+ is_volatile=self._is_volatile,
+ )
+ ]
+ return super().files()
+
+
+class CacheFile(store.StoreFile):
+ # inform the "copy/hardlink" version that this file might be missing
+ # without consequences.
+ optional = True
def _entries_walk(repo, includes, excludes, includeobsmarkers):
@@ -850,11 +949,7 @@
for name in cacheutil.cachetocopy(repo):
if repo.cachevfs.exists(name):
# not really a StoreEntry, but close enough
- entry = store.SimpleStoreEntry(
- entry_path=name,
- is_volatile=True,
- )
- yield (_srccache, entry)
+ yield (_srccache, CacheEntry(entry_path=name))
def generatev2(repo, includes, excludes, includeobsmarkers):
@@ -879,7 +974,6 @@
excludes=excludes,
includeobsmarkers=includeobsmarkers,
)
-
chunks = _emit2(repo, entries)
first = next(chunks)
file_count, total_file_size = first
@@ -1141,7 +1235,7 @@
hardlink[0] = False
progress.topic = _(b'copying')
- for k, path in entries:
+ for k, path, optional in entries:
src_vfs = src_vfs_map[k]
dst_vfs = dst_vfs_map[k]
src_path = src_vfs.join(path)
@@ -1153,13 +1247,17 @@
util.makedirs(dirname)
dst_vfs.register_file(path)
# XXX we could use the #nb_bytes argument.
- util.copyfile(
- src_path,
- dst_path,
- hardlink=hardlink[0],
- no_hardlink_cb=copy_used,
- check_fs_hardlink=False,
- )
+ try:
+ util.copyfile(
+ src_path,
+ dst_path,
+ hardlink=hardlink[0],
+ no_hardlink_cb=copy_used,
+ check_fs_hardlink=False,
+ )
+ except FileNotFoundError:
+ if not optional:
+ raise
progress.increment()
return hardlink[0]
@@ -1214,7 +1312,7 @@
# to the files while we do the clone, so this is not done yet. We
# could do this blindly when copying files.
files = [
- (vfs_key, f.unencoded_path)
+ (vfs_key, f.unencoded_path, f.optional)
for vfs_key, e in entries
for f in e.files()
]