mercurial/streamclone.py
changeset 52359 3f0cf7bb3086
parent 52358 11484a19cd77
child 52640 24ee91ba9aa8
equal deleted inserted replaced
52358:11484a19cd77 52359:3f0cf7bb3086
     6 # GNU General Public License version 2 or any later version.
     6 # GNU General Public License version 2 or any later version.
     7 
     7 
     8 from __future__ import annotations
     8 from __future__ import annotations
     9 
     9 
    10 import contextlib
    10 import contextlib
       
    11 import errno
    11 import os
    12 import os
    12 import struct
    13 import struct
       
    14 
       
    15 from typing import Optional
    13 
    16 
    14 from .i18n import _
    17 from .i18n import _
    15 from .interfaces import repository
    18 from .interfaces import repository
    16 from . import (
    19 from . import (
    17     bookmarks,
    20     bookmarks,
   269     entries = []
   272     entries = []
   270     total_bytes = 0
   273     total_bytes = 0
   271     # Get consistent snapshot of repo, lock during scan.
   274     # Get consistent snapshot of repo, lock during scan.
   272     with repo.lock():
   275     with repo.lock():
   273         repo.ui.debug(b'scanning\n')
   276         repo.ui.debug(b'scanning\n')
       
   277         _test_sync_point_walk_1_2(repo)
   274         for entry in _walkstreamfiles(repo):
   278         for entry in _walkstreamfiles(repo):
   275             for f in entry.files():
   279             for f in entry.files():
   276                 file_size = f.file_size(repo.store.vfs)
   280                 file_size = f.file_size(repo.store.vfs)
   277                 if file_size:
   281                 if file_size:
   278                     entries.append((f.unencoded_path, file_size))
   282                     entries.append((f.unencoded_path, file_size))
   651         # store the file quickly to ensure we close it if any error happens
   655         # store the file quickly to ensure we close it if any error happens
   652         _, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
   656         _, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
   653         fp.seek(0, os.SEEK_END)
   657         fp.seek(0, os.SEEK_END)
   654         size = fp.tell()
   658         size = fp.tell()
   655         self._volatile_fps[src] = (size, fp)
   659         self._volatile_fps[src] = (size, fp)
       
   660         return size
   656 
   661 
   657     def __call__(self, src):
   662     def __call__(self, src):
   658         """preserve the volatile file at src"""
   663         """preserve the volatile file at src"""
   659         assert 0 < self._counter
   664         assert 0 < self._counter
   660         if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
   665         if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
   661             self._flush_some_on_disk()
   666             self._flush_some_on_disk()
   662         self._keep_one(src)
   667         self._keep_one(src)
       
   668 
       
   669     def try_keep(self, src) -> Optional[int]:
       
   670         """record a volatile file and returns it size
       
   671 
       
   672         return None if the file does not exists.
       
   673 
       
   674         Used for cache file that are not lock protected.
       
   675         """
       
   676         assert 0 < self._counter
       
   677         if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
       
   678             self._flush_some_on_disk()
       
   679         try:
       
   680             return self._keep_one(src)
       
   681         except IOError as err:
       
   682             if err.errno not in (errno.ENOENT, errno.EPERM):
       
   683                 raise
       
   684             return None
   663 
   685 
   664     @contextlib.contextmanager
   686     @contextlib.contextmanager
   665     def open(self, src):
   687     def open(self, src):
   666         assert 0 < self._counter
   688         assert 0 < self._counter
   667         entry = self._volatile_fps.get(src)
   689         entry = self._volatile_fps.get(src)
   704             b'repo.vfs must not be added to vfsmap for security reasons'
   726             b'repo.vfs must not be added to vfsmap for security reasons'
   705         )
   727         )
   706 
   728 
   707     # translate the vfs one
   729     # translate the vfs one
   708     entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
   730     entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
       
   731     _test_sync_point_walk_1_2(repo)
   709 
   732 
   710     max_linkrev = len(repo)
   733     max_linkrev = len(repo)
   711     file_count = totalfilesize = 0
   734     file_count = totalfilesize = 0
   712     with VolatileManager() as volatiles:
   735     with VolatileManager() as volatiles:
   713         # make sure we preserve volatile files
   736         # make sure we preserve volatile files
   769         raise error.ProgrammingError(
   792         raise error.ProgrammingError(
   770             b'repo.vfs must not be added to vfsmap for security reasons'
   793             b'repo.vfs must not be added to vfsmap for security reasons'
   771         )
   794         )
   772 
   795 
   773     # translate the vfs once
   796     # translate the vfs once
   774     entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
   797     # we only turn this into a list for the `_test_sync`, this is not ideal
       
   798     base_entries = list(entries)
       
   799     _test_sync_point_walk_1_2(repo)
       
   800     entries = []
   775     with VolatileManager() as volatiles:
   801     with VolatileManager() as volatiles:
   776         # make sure we preserve volatile files
   802         # make sure we preserve volatile files
   777         for k, vfs, e in entries:
   803         for vfs_key, e in base_entries:
       
   804             vfs = vfsmap[vfs_key]
       
   805             any_files = True
   778             if e.maybe_volatile:
   806             if e.maybe_volatile:
       
   807                 any_files = False
   779                 e.preserve_volatiles(vfs, volatiles)
   808                 e.preserve_volatiles(vfs, volatiles)
   780                 for f in e.files():
   809                 for f in e.files():
   781                     if f.is_volatile:
   810                     if f.is_volatile:
   782                         # record the expected size under lock
   811                         # record the expected size under lock
   783                         f.file_size(vfs)
   812                         f.file_size(vfs)
       
   813                     any_files = True
       
   814             if any_files:
       
   815                 entries.append((vfs_key, vfsmap[vfs_key], e))
   784 
   816 
   785         total_entry_count = len(entries)
   817         total_entry_count = len(entries)
   786 
   818 
   787         max_linkrev = len(repo)
   819         max_linkrev = len(repo)
   788         progress = repo.ui.makeprogress(
   820         progress = repo.ui.makeprogress(
   811                     yield name
   843                     yield name
   812                     yield from stream
   844                     yield from stream
   813                 progress.increment()
   845                 progress.increment()
   814 
   846 
   815 
   847 
       
   848 def _test_sync_point_walk_1_2(repo):
       
   849     """a function for synchronisation during tests
       
   850 
       
   851     Triggered after gather entry, but before starting to process/preserve them
       
   852     under lock.
       
   853 
       
   854     (on v1 is triggered before the actual walk start)
       
   855     """
       
   856 
       
   857 
   816 def _test_sync_point_walk_3(repo):
   858 def _test_sync_point_walk_3(repo):
   817     """a function for synchronisation during tests"""
   859     """a function for synchronisation during tests
       
   860 
       
   861     Triggered right before releasing the lock, but after computing what need
       
   862     needed to compute under lock.
       
   863     """
   818 
   864 
   819 
   865 
   820 def _test_sync_point_walk_4(repo):
   866 def _test_sync_point_walk_4(repo):
   821     """a function for synchronisation during tests"""
   867     """a function for synchronisation during tests
       
   868 
       
   869     Triggered right after releasing the lock.
       
   870     """
       
   871 
       
   872 
       
   873 # not really a StoreEntry, but close enough
       
   874 class CacheEntry(store.SimpleStoreEntry):
       
   875     """Represent an entry for Cache files
       
   876 
       
   877     It has special logic to preserve cache file early and accept optional
       
   878     presence.
       
   879 
       
   880 
       
   881     (Yes... this is not really a StoreEntry, but close enough. We could have a
       
   882     BaseEntry base class, bbut the store one would be identical)
       
   883     """
       
   884 
       
   885     def __init__(self, entry_path):
       
   886         super().__init__(
       
   887             entry_path,
       
   888             # we will directly deal with that in `setup_cache_file`
       
   889             is_volatile=True,
       
   890         )
       
   891 
       
   892     def preserve_volatiles(self, vfs, volatiles):
       
   893         self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
       
   894         if self._file_size is None:
       
   895             self._files = []
       
   896         else:
       
   897             assert self._is_volatile
       
   898             self._files = [
       
   899                 CacheFile(
       
   900                     unencoded_path=self._entry_path,
       
   901                     file_size=self._file_size,
       
   902                     is_volatile=self._is_volatile,
       
   903                 )
       
   904             ]
       
   905 
       
   906     def files(self):
       
   907         if self._files is None:
       
   908             self._files = [
       
   909                 CacheFile(
       
   910                     unencoded_path=self._entry_path,
       
   911                     is_volatile=self._is_volatile,
       
   912                 )
       
   913             ]
       
   914         return super().files()
       
   915 
       
   916 
       
   917 class CacheFile(store.StoreFile):
       
   918     # inform the "copy/hardlink" version that this file might be missing
       
   919     # without consequences.
       
   920     optional = True
   822 
   921 
   823 
   922 
   824 def _entries_walk(repo, includes, excludes, includeobsmarkers):
   923 def _entries_walk(repo, includes, excludes, includeobsmarkers):
   825     """emit a seris of files information useful to clone a repo
   924     """emit a seris of files information useful to clone a repo
   826 
   925 
   848             yield (_srcstore, entry)
   947             yield (_srcstore, entry)
   849 
   948 
   850         for name in cacheutil.cachetocopy(repo):
   949         for name in cacheutil.cachetocopy(repo):
   851             if repo.cachevfs.exists(name):
   950             if repo.cachevfs.exists(name):
   852                 # not really a StoreEntry, but close enough
   951                 # not really a StoreEntry, but close enough
   853                 entry = store.SimpleStoreEntry(
   952                 yield (_srccache, CacheEntry(entry_path=name))
   854                     entry_path=name,
       
   855                     is_volatile=True,
       
   856                 )
       
   857                 yield (_srccache, entry)
       
   858 
   953 
   859 
   954 
   860 def generatev2(repo, includes, excludes, includeobsmarkers):
   955 def generatev2(repo, includes, excludes, includeobsmarkers):
   861     """Emit content for version 2 of a streaming clone.
   956     """Emit content for version 2 of a streaming clone.
   862 
   957 
   877             repo,
   972             repo,
   878             includes=includes,
   973             includes=includes,
   879             excludes=excludes,
   974             excludes=excludes,
   880             includeobsmarkers=includeobsmarkers,
   975             includeobsmarkers=includeobsmarkers,
   881         )
   976         )
   882 
       
   883         chunks = _emit2(repo, entries)
   977         chunks = _emit2(repo, entries)
   884         first = next(chunks)
   978         first = next(chunks)
   885         file_count, total_file_size = first
   979         file_count, total_file_size = first
   886         _test_sync_point_walk_3(repo)
   980         _test_sync_point_walk_3(repo)
   887     _test_sync_point_walk_4(repo)
   981     _test_sync_point_walk_4(repo)
  1139 
  1233 
  1140     def copy_used():
  1234     def copy_used():
  1141         hardlink[0] = False
  1235         hardlink[0] = False
  1142         progress.topic = _(b'copying')
  1236         progress.topic = _(b'copying')
  1143 
  1237 
  1144     for k, path in entries:
  1238     for k, path, optional in entries:
  1145         src_vfs = src_vfs_map[k]
  1239         src_vfs = src_vfs_map[k]
  1146         dst_vfs = dst_vfs_map[k]
  1240         dst_vfs = dst_vfs_map[k]
  1147         src_path = src_vfs.join(path)
  1241         src_path = src_vfs.join(path)
  1148         dst_path = dst_vfs.join(path)
  1242         dst_path = dst_vfs.join(path)
  1149         # We cannot use dirname and makedirs of dst_vfs here because the store
  1243         # We cannot use dirname and makedirs of dst_vfs here because the store
  1151         dirname = os.path.dirname(dst_path)
  1245         dirname = os.path.dirname(dst_path)
  1152         if not os.path.exists(dirname):
  1246         if not os.path.exists(dirname):
  1153             util.makedirs(dirname)
  1247             util.makedirs(dirname)
  1154         dst_vfs.register_file(path)
  1248         dst_vfs.register_file(path)
  1155         # XXX we could use the #nb_bytes argument.
  1249         # XXX we could use the #nb_bytes argument.
  1156         util.copyfile(
  1250         try:
  1157             src_path,
  1251             util.copyfile(
  1158             dst_path,
  1252                 src_path,
  1159             hardlink=hardlink[0],
  1253                 dst_path,
  1160             no_hardlink_cb=copy_used,
  1254                 hardlink=hardlink[0],
  1161             check_fs_hardlink=False,
  1255                 no_hardlink_cb=copy_used,
  1162         )
  1256                 check_fs_hardlink=False,
       
  1257             )
       
  1258         except FileNotFoundError:
       
  1259             if not optional:
       
  1260                 raise
  1163         progress.increment()
  1261         progress.increment()
  1164     return hardlink[0]
  1262     return hardlink[0]
  1165 
  1263 
  1166 
  1264 
  1167 def local_copy(src_repo, dest_repo):
  1265 def local_copy(src_repo, dest_repo):
  1212             # and the other one without the lock. However, in the linking case,
  1310             # and the other one without the lock. However, in the linking case,
  1213             # this would also requires checks that nobody is appending any data
  1311             # this would also requires checks that nobody is appending any data
  1214             # to the files while we do the clone, so this is not done yet. We
  1312             # to the files while we do the clone, so this is not done yet. We
  1215             # could do this blindly when copying files.
  1313             # could do this blindly when copying files.
  1216             files = [
  1314             files = [
  1217                 (vfs_key, f.unencoded_path)
  1315                 (vfs_key, f.unencoded_path, f.optional)
  1218                 for vfs_key, e in entries
  1316                 for vfs_key, e in entries
  1219                 for f in e.files()
  1317                 for f in e.files()
  1220             ]
  1318             ]
  1221             hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
  1319             hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
  1222 
  1320