269 entries = [] |
272 entries = [] |
270 total_bytes = 0 |
273 total_bytes = 0 |
271 # Get consistent snapshot of repo, lock during scan. |
274 # Get consistent snapshot of repo, lock during scan. |
272 with repo.lock(): |
275 with repo.lock(): |
273 repo.ui.debug(b'scanning\n') |
276 repo.ui.debug(b'scanning\n') |
|
277 _test_sync_point_walk_1_2(repo) |
274 for entry in _walkstreamfiles(repo): |
278 for entry in _walkstreamfiles(repo): |
275 for f in entry.files(): |
279 for f in entry.files(): |
276 file_size = f.file_size(repo.store.vfs) |
280 file_size = f.file_size(repo.store.vfs) |
277 if file_size: |
281 if file_size: |
278 entries.append((f.unencoded_path, file_size)) |
282 entries.append((f.unencoded_path, file_size)) |
651 # store the file quickly to ensure we close it if any error happens |
655 # store the file quickly to ensure we close it if any error happens |
652 _, fp = self._volatile_fps[src] = (None, open(src, 'rb')) |
656 _, fp = self._volatile_fps[src] = (None, open(src, 'rb')) |
653 fp.seek(0, os.SEEK_END) |
657 fp.seek(0, os.SEEK_END) |
654 size = fp.tell() |
658 size = fp.tell() |
655 self._volatile_fps[src] = (size, fp) |
659 self._volatile_fps[src] = (size, fp) |
|
660 return size |
656 |
661 |
657 def __call__(self, src): |
662 def __call__(self, src): |
658 """preserve the volatile file at src""" |
663 """preserve the volatile file at src""" |
659 assert 0 < self._counter |
664 assert 0 < self._counter |
660 if len(self._volatile_fps) >= (self.MAX_OPEN - 1): |
665 if len(self._volatile_fps) >= (self.MAX_OPEN - 1): |
661 self._flush_some_on_disk() |
666 self._flush_some_on_disk() |
662 self._keep_one(src) |
667 self._keep_one(src) |
|
668 |
|
669 def try_keep(self, src) -> Optional[int]: |
|
670 """record a volatile file and returns it size |
|
671 |
|
672 return None if the file does not exists. |
|
673 |
|
674 Used for cache file that are not lock protected. |
|
675 """ |
|
676 assert 0 < self._counter |
|
677 if len(self._volatile_fps) >= (self.MAX_OPEN - 1): |
|
678 self._flush_some_on_disk() |
|
679 try: |
|
680 return self._keep_one(src) |
|
681 except IOError as err: |
|
682 if err.errno not in (errno.ENOENT, errno.EPERM): |
|
683 raise |
|
684 return None |
663 |
685 |
664 @contextlib.contextmanager |
686 @contextlib.contextmanager |
665 def open(self, src): |
687 def open(self, src): |
666 assert 0 < self._counter |
688 assert 0 < self._counter |
667 entry = self._volatile_fps.get(src) |
689 entry = self._volatile_fps.get(src) |
704 b'repo.vfs must not be added to vfsmap for security reasons' |
726 b'repo.vfs must not be added to vfsmap for security reasons' |
705 ) |
727 ) |
706 |
728 |
707 # translate the vfs one |
729 # translate the vfs one |
708 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] |
730 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] |
|
731 _test_sync_point_walk_1_2(repo) |
709 |
732 |
710 max_linkrev = len(repo) |
733 max_linkrev = len(repo) |
711 file_count = totalfilesize = 0 |
734 file_count = totalfilesize = 0 |
712 with VolatileManager() as volatiles: |
735 with VolatileManager() as volatiles: |
713 # make sure we preserve volatile files |
736 # make sure we preserve volatile files |
769 raise error.ProgrammingError( |
792 raise error.ProgrammingError( |
770 b'repo.vfs must not be added to vfsmap for security reasons' |
793 b'repo.vfs must not be added to vfsmap for security reasons' |
771 ) |
794 ) |
772 |
795 |
773 # translate the vfs once |
796 # translate the vfs once |
774 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] |
797 # we only turn this into a list for the `_test_sync`, this is not ideal |
|
798 base_entries = list(entries) |
|
799 _test_sync_point_walk_1_2(repo) |
|
800 entries = [] |
775 with VolatileManager() as volatiles: |
801 with VolatileManager() as volatiles: |
776 # make sure we preserve volatile files |
802 # make sure we preserve volatile files |
777 for k, vfs, e in entries: |
803 for vfs_key, e in base_entries: |
|
804 vfs = vfsmap[vfs_key] |
|
805 any_files = True |
778 if e.maybe_volatile: |
806 if e.maybe_volatile: |
|
807 any_files = False |
779 e.preserve_volatiles(vfs, volatiles) |
808 e.preserve_volatiles(vfs, volatiles) |
780 for f in e.files(): |
809 for f in e.files(): |
781 if f.is_volatile: |
810 if f.is_volatile: |
782 # record the expected size under lock |
811 # record the expected size under lock |
783 f.file_size(vfs) |
812 f.file_size(vfs) |
|
813 any_files = True |
|
814 if any_files: |
|
815 entries.append((vfs_key, vfsmap[vfs_key], e)) |
784 |
816 |
785 total_entry_count = len(entries) |
817 total_entry_count = len(entries) |
786 |
818 |
787 max_linkrev = len(repo) |
819 max_linkrev = len(repo) |
788 progress = repo.ui.makeprogress( |
820 progress = repo.ui.makeprogress( |
811 yield name |
843 yield name |
812 yield from stream |
844 yield from stream |
813 progress.increment() |
845 progress.increment() |
814 |
846 |
815 |
847 |
|
848 def _test_sync_point_walk_1_2(repo): |
|
849 """a function for synchronisation during tests |
|
850 |
|
851 Triggered after gather entry, but before starting to process/preserve them |
|
852 under lock. |
|
853 |
|
854 (on v1 is triggered before the actual walk start) |
|
855 """ |
|
856 |
|
857 |
816 def _test_sync_point_walk_3(repo): |
858 def _test_sync_point_walk_3(repo): |
817 """a function for synchronisation during tests""" |
859 """a function for synchronisation during tests |
|
860 |
|
861 Triggered right before releasing the lock, but after computing what need |
|
862 needed to compute under lock. |
|
863 """ |
818 |
864 |
819 |
865 |
820 def _test_sync_point_walk_4(repo): |
866 def _test_sync_point_walk_4(repo): |
821 """a function for synchronisation during tests""" |
867 """a function for synchronisation during tests |
|
868 |
|
869 Triggered right after releasing the lock. |
|
870 """ |
|
871 |
|
872 |
|
873 # not really a StoreEntry, but close enough |
|
874 class CacheEntry(store.SimpleStoreEntry): |
|
875 """Represent an entry for Cache files |
|
876 |
|
877 It has special logic to preserve cache file early and accept optional |
|
878 presence. |
|
879 |
|
880 |
|
881 (Yes... this is not really a StoreEntry, but close enough. We could have a |
|
882 BaseEntry base class, bbut the store one would be identical) |
|
883 """ |
|
884 |
|
885 def __init__(self, entry_path): |
|
886 super().__init__( |
|
887 entry_path, |
|
888 # we will directly deal with that in `setup_cache_file` |
|
889 is_volatile=True, |
|
890 ) |
|
891 |
|
892 def preserve_volatiles(self, vfs, volatiles): |
|
893 self._file_size = volatiles.try_keep(vfs.join(self._entry_path)) |
|
894 if self._file_size is None: |
|
895 self._files = [] |
|
896 else: |
|
897 assert self._is_volatile |
|
898 self._files = [ |
|
899 CacheFile( |
|
900 unencoded_path=self._entry_path, |
|
901 file_size=self._file_size, |
|
902 is_volatile=self._is_volatile, |
|
903 ) |
|
904 ] |
|
905 |
|
906 def files(self): |
|
907 if self._files is None: |
|
908 self._files = [ |
|
909 CacheFile( |
|
910 unencoded_path=self._entry_path, |
|
911 is_volatile=self._is_volatile, |
|
912 ) |
|
913 ] |
|
914 return super().files() |
|
915 |
|
916 |
|
917 class CacheFile(store.StoreFile): |
|
918 # inform the "copy/hardlink" version that this file might be missing |
|
919 # without consequences. |
|
920 optional = True |
822 |
921 |
823 |
922 |
824 def _entries_walk(repo, includes, excludes, includeobsmarkers): |
923 def _entries_walk(repo, includes, excludes, includeobsmarkers): |
825 """emit a seris of files information useful to clone a repo |
924 """emit a seris of files information useful to clone a repo |
826 |
925 |
848 yield (_srcstore, entry) |
947 yield (_srcstore, entry) |
849 |
948 |
850 for name in cacheutil.cachetocopy(repo): |
949 for name in cacheutil.cachetocopy(repo): |
851 if repo.cachevfs.exists(name): |
950 if repo.cachevfs.exists(name): |
852 # not really a StoreEntry, but close enough |
951 # not really a StoreEntry, but close enough |
853 entry = store.SimpleStoreEntry( |
952 yield (_srccache, CacheEntry(entry_path=name)) |
854 entry_path=name, |
|
855 is_volatile=True, |
|
856 ) |
|
857 yield (_srccache, entry) |
|
858 |
953 |
859 |
954 |
860 def generatev2(repo, includes, excludes, includeobsmarkers): |
955 def generatev2(repo, includes, excludes, includeobsmarkers): |
861 """Emit content for version 2 of a streaming clone. |
956 """Emit content for version 2 of a streaming clone. |
862 |
957 |
877 repo, |
972 repo, |
878 includes=includes, |
973 includes=includes, |
879 excludes=excludes, |
974 excludes=excludes, |
880 includeobsmarkers=includeobsmarkers, |
975 includeobsmarkers=includeobsmarkers, |
881 ) |
976 ) |
882 |
|
883 chunks = _emit2(repo, entries) |
977 chunks = _emit2(repo, entries) |
884 first = next(chunks) |
978 first = next(chunks) |
885 file_count, total_file_size = first |
979 file_count, total_file_size = first |
886 _test_sync_point_walk_3(repo) |
980 _test_sync_point_walk_3(repo) |
887 _test_sync_point_walk_4(repo) |
981 _test_sync_point_walk_4(repo) |
1139 |
1233 |
1140 def copy_used(): |
1234 def copy_used(): |
1141 hardlink[0] = False |
1235 hardlink[0] = False |
1142 progress.topic = _(b'copying') |
1236 progress.topic = _(b'copying') |
1143 |
1237 |
1144 for k, path in entries: |
1238 for k, path, optional in entries: |
1145 src_vfs = src_vfs_map[k] |
1239 src_vfs = src_vfs_map[k] |
1146 dst_vfs = dst_vfs_map[k] |
1240 dst_vfs = dst_vfs_map[k] |
1147 src_path = src_vfs.join(path) |
1241 src_path = src_vfs.join(path) |
1148 dst_path = dst_vfs.join(path) |
1242 dst_path = dst_vfs.join(path) |
1149 # We cannot use dirname and makedirs of dst_vfs here because the store |
1243 # We cannot use dirname and makedirs of dst_vfs here because the store |
1151 dirname = os.path.dirname(dst_path) |
1245 dirname = os.path.dirname(dst_path) |
1152 if not os.path.exists(dirname): |
1246 if not os.path.exists(dirname): |
1153 util.makedirs(dirname) |
1247 util.makedirs(dirname) |
1154 dst_vfs.register_file(path) |
1248 dst_vfs.register_file(path) |
1155 # XXX we could use the #nb_bytes argument. |
1249 # XXX we could use the #nb_bytes argument. |
1156 util.copyfile( |
1250 try: |
1157 src_path, |
1251 util.copyfile( |
1158 dst_path, |
1252 src_path, |
1159 hardlink=hardlink[0], |
1253 dst_path, |
1160 no_hardlink_cb=copy_used, |
1254 hardlink=hardlink[0], |
1161 check_fs_hardlink=False, |
1255 no_hardlink_cb=copy_used, |
1162 ) |
1256 check_fs_hardlink=False, |
|
1257 ) |
|
1258 except FileNotFoundError: |
|
1259 if not optional: |
|
1260 raise |
1163 progress.increment() |
1261 progress.increment() |
1164 return hardlink[0] |
1262 return hardlink[0] |
1165 |
1263 |
1166 |
1264 |
1167 def local_copy(src_repo, dest_repo): |
1265 def local_copy(src_repo, dest_repo): |
1212 # and the other one without the lock. However, in the linking case, |
1310 # and the other one without the lock. However, in the linking case, |
1213 # this would also requires checks that nobody is appending any data |
1311 # this would also requires checks that nobody is appending any data |
1214 # to the files while we do the clone, so this is not done yet. We |
1312 # to the files while we do the clone, so this is not done yet. We |
1215 # could do this blindly when copying files. |
1313 # could do this blindly when copying files. |
1216 files = [ |
1314 files = [ |
1217 (vfs_key, f.unencoded_path) |
1315 (vfs_key, f.unencoded_path, f.optional) |
1218 for vfs_key, e in entries |
1316 for vfs_key, e in entries |
1219 for f in e.files() |
1317 for f in e.files() |
1220 ] |
1318 ] |
1221 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress) |
1319 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress) |
1222 |
1320 |