comparison mercurial/streamclone.py @ 53031:e705fec4a03f stable

branching: merging with 7.0 changes Since 6.9.3 was made after 7.0rc0 we need to deal with more branching than usual.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 05 Mar 2025 23:02:19 +0100
parents 89ab2459f62a e75ed9ae5fb9
children
comparison
equal deleted inserted replaced
53030:74439d1cbeba 53031:e705fec4a03f
5 # This software may be used and distributed according to the terms of the 5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version. 6 # GNU General Public License version 2 or any later version.
7 7
8 from __future__ import annotations 8 from __future__ import annotations
9 9
10 import collections
10 import contextlib 11 import contextlib
12 import errno
11 import os 13 import os
12 import struct 14 import struct
15 import threading
16
17 from typing import (
18 Callable,
19 Iterable,
20 Iterator,
21 Optional,
22 Set,
23 Tuple,
24 Type,
25 )
13 26
14 from .i18n import _ 27 from .i18n import _
15 from .interfaces import repository 28 from .interfaces import repository
16 from . import ( 29 from . import (
17 bookmarks, 30 bookmarks,
30 from .revlogutils import ( 43 from .revlogutils import (
31 nodemap, 44 nodemap,
32 ) 45 )
33 46
34 47
35 def new_stream_clone_requirements(default_requirements, streamed_requirements): 48 # Number arbitrarily picked, feel free to change them (but the LOW one)
49 #
50 # update the configuration documentation if you touch this.
51 DEFAULT_NUM_WRITER = {
52 scmutil.RESOURCE_LOW: 1,
53 scmutil.RESOURCE_MEDIUM: 4,
54 scmutil.RESOURCE_HIGH: 8,
55 }
56
57
58 # Number arbitrarily picked, feel free to adjust them. Do update the
59 # documentation if you do so
60 DEFAULT_MEMORY_TARGET = {
61 scmutil.RESOURCE_LOW: 50 * (2**20), # 100 MB
62 scmutil.RESOURCE_MEDIUM: 500 * 2**20, # 500 MB
63 scmutil.RESOURCE_HIGH: 2 * 2**30, # 2 GB
64 }
65
66
67 def new_stream_clone_requirements(
68 default_requirements: Iterable[bytes],
69 streamed_requirements: Iterable[bytes],
70 ) -> Set[bytes]:
36 """determine the final set of requirement for a new stream clone 71 """determine the final set of requirement for a new stream clone
37 72
38 this method combine the "default" requirements that a new repository would 73 this method combine the "default" requirements that a new repository would
39 use with the constaint we get from the stream clone content. We keep local 74 use with the constaint we get from the stream clone content. We keep local
40 configuration choice when possible. 75 configuration choice when possible.
43 requirements & requirementsmod.STREAM_IGNORABLE_REQUIREMENTS 78 requirements & requirementsmod.STREAM_IGNORABLE_REQUIREMENTS
44 requirements.update(streamed_requirements) 79 requirements.update(streamed_requirements)
45 return requirements 80 return requirements
46 81
47 82
48 def streamed_requirements(repo): 83 def streamed_requirements(repo) -> Set[bytes]:
49 """the set of requirement the new clone will have to support 84 """the set of requirement the new clone will have to support
50 85
51 This is used for advertising the stream options and to generate the actual 86 This is used for advertising the stream options and to generate the actual
52 stream content.""" 87 stream content."""
53 requiredformats = ( 88 requiredformats = (
54 repo.requirements - requirementsmod.STREAM_IGNORABLE_REQUIREMENTS 89 repo.requirements - requirementsmod.STREAM_IGNORABLE_REQUIREMENTS
55 ) 90 )
56 return requiredformats 91 return requiredformats
57 92
58 93
59 def canperformstreamclone(pullop, bundle2=False): 94 def canperformstreamclone(pullop, bundle2: bool = False):
60 """Whether it is possible to perform a streaming clone as part of pull. 95 """Whether it is possible to perform a streaming clone as part of pull.
61 96
62 ``bundle2`` will cause the function to consider stream clone through 97 ``bundle2`` will cause the function to consider stream clone through
63 bundle2 and only through bundle2. 98 bundle2 and only through bundle2.
64 99
148 requirements = streamreqs 183 requirements = streamreqs
149 184
150 return True, requirements 185 return True, requirements
151 186
152 187
153 def maybeperformlegacystreamclone(pullop): 188 def maybeperformlegacystreamclone(pullop) -> None:
154 """Possibly perform a legacy stream clone operation. 189 """Possibly perform a legacy stream clone operation.
155 190
156 Legacy stream clones are performed as part of pull but before all other 191 Legacy stream clones are performed as part of pull but before all other
157 operations. 192 operations.
158 193
223 repo._branchcaches.replace(repo, rbranchmap) 258 repo._branchcaches.replace(repo, rbranchmap)
224 259
225 repo.invalidate() 260 repo.invalidate()
226 261
227 262
228 def allowservergeneration(repo): 263 def allowservergeneration(repo) -> bool:
229 """Whether streaming clones are allowed from the server.""" 264 """Whether streaming clones are allowed from the server."""
230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: 265 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
231 return False 266 return False
232 267
233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True): 268 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
241 276
242 return True 277 return True
243 278
244 279
245 # This is it's own function so extensions can override it. 280 # This is it's own function so extensions can override it.
246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False): 281 def _walkstreamfiles(
282 repo, matcher=None, phase: bool = False, obsolescence: bool = False
283 ):
247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence) 284 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
248 285
249 286
250 def generatev1(repo): 287 def _report_transferred(
288 repo, start_time: float, file_count: int, byte_count: int
289 ):
290 """common utility to report time it took to apply the stream bundle"""
291 elapsed = util.timer() - start_time
292 if elapsed <= 0:
293 elapsed = 0.001
294 m = _(b'stream-cloned %d files / %s in %.1f seconds (%s/sec)\n')
295 m %= (
296 file_count,
297 util.bytecount(byte_count),
298 elapsed,
299 util.bytecount(byte_count / elapsed),
300 )
301 repo.ui.status(m)
302
303
304 def generatev1(repo) -> tuple[int, int, Iterator[bytes]]:
251 """Emit content for version 1 of a streaming clone. 305 """Emit content for version 1 of a streaming clone.
252 306
253 This returns a 3-tuple of (file count, byte size, data iterator). 307 This returns a 3-tuple of (file count, byte size, data iterator).
254 308
255 The data iterator consists of N entries for each file being transferred. 309 The data iterator consists of N entries for each file being transferred.
269 entries = [] 323 entries = []
270 total_bytes = 0 324 total_bytes = 0
271 # Get consistent snapshot of repo, lock during scan. 325 # Get consistent snapshot of repo, lock during scan.
272 with repo.lock(): 326 with repo.lock():
273 repo.ui.debug(b'scanning\n') 327 repo.ui.debug(b'scanning\n')
328 _test_sync_point_walk_1_2(repo)
274 for entry in _walkstreamfiles(repo): 329 for entry in _walkstreamfiles(repo):
275 for f in entry.files(): 330 for f in entry.files():
276 file_size = f.file_size(repo.store.vfs) 331 file_size = f.file_size(repo.store.vfs)
277 if file_size: 332 if file_size:
278 entries.append((f.unencoded_path, file_size)) 333 entries.append((f.unencoded_path, file_size))
279 total_bytes += file_size 334 total_bytes += file_size
280 _test_sync_point_walk_1(repo) 335 _test_sync_point_walk_3(repo)
281 _test_sync_point_walk_2(repo) 336 _test_sync_point_walk_4(repo)
282 337
283 repo.ui.debug( 338 repo.ui.debug(
284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) 339 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
285 ) 340 )
286 341
287 svfs = repo.svfs 342 svfs = repo.svfs
288 debugflag = repo.ui.debugflag 343 debugflag = repo.ui.debugflag
289 344
290 def emitrevlogdata(): 345 def emitrevlogdata() -> Iterator[bytes]:
291 for name, size in entries: 346 for name, size in entries:
292 if debugflag: 347 if debugflag:
293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size)) 348 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
294 # partially encode name over the wire for backwards compat 349 # partially encode name over the wire for backwards compat
295 yield b'%s\0%d\n' % (store.encodedir(name), size) 350 yield b'%s\0%d\n' % (store.encodedir(name), size)
297 # trusted by the local repo) and expensive 352 # trusted by the local repo) and expensive
298 with svfs(name, b'rb', auditpath=False) as fp: 353 with svfs(name, b'rb', auditpath=False) as fp:
299 if size <= 65536: 354 if size <= 65536:
300 yield fp.read(size) 355 yield fp.read(size)
301 else: 356 else:
302 for chunk in util.filechunkiter(fp, limit=size): 357 yield from util.filechunkiter(fp, limit=size)
303 yield chunk
304 358
305 return len(entries), total_bytes, emitrevlogdata() 359 return len(entries), total_bytes, emitrevlogdata()
306 360
307 361
308 def generatev1wireproto(repo): 362 def generatev1wireproto(repo) -> Iterator[bytes]:
309 """Emit content for version 1 of streaming clone suitable for the wire. 363 """Emit content for version 1 of streaming clone suitable for the wire.
310 364
311 This is the data output from ``generatev1()`` with 2 header lines. The 365 This is the data output from ``generatev1()`` with 2 header lines. The
312 first line indicates overall success. The 2nd contains the file count and 366 first line indicates overall success. The 2nd contains the file count and
313 byte size of payload. 367 byte size of payload.
327 return 381 return
328 382
329 # Indicates successful response. 383 # Indicates successful response.
330 yield b'0\n' 384 yield b'0\n'
331 yield b'%d %d\n' % (filecount, bytecount) 385 yield b'%d %d\n' % (filecount, bytecount)
332 for chunk in it: 386 yield from it
333 yield chunk 387
334 388
335 389 def generatebundlev1(
336 def generatebundlev1(repo, compression=b'UN'): 390 repo, compression: bytes = b'UN'
391 ) -> tuple[Set[bytes], Iterator[bytes]]:
337 """Emit content for version 1 of a stream clone bundle. 392 """Emit content for version 1 of a stream clone bundle.
338 393
339 The first 4 bytes of the output ("HGS1") denote this as stream clone 394 The first 4 bytes of the output ("HGS1") denote this as stream clone
340 bundle version 1. 395 bundle version 1.
341 396
354 compressed in the future). 409 compressed in the future).
355 410
356 Returns a tuple of (requirements, data generator). 411 Returns a tuple of (requirements, data generator).
357 """ 412 """
358 if compression != b'UN': 413 if compression != b'UN':
359 raise ValueError(b'we do not support the compression argument yet') 414 raise ValueError('we do not support the compression argument yet')
360 415
361 requirements = streamed_requirements(repo) 416 requirements = streamed_requirements(repo)
362 requires = b','.join(sorted(requirements)) 417 requires = b','.join(sorted(requirements))
363 418
364 def gen(): 419 def gen() -> Iterator[bytes]:
365 yield b'HGS1' 420 yield b'HGS1'
366 yield compression 421 yield compression
367 422
368 filecount, bytecount, it = generatev1(repo) 423 filecount, bytecount, it = generatev1(repo)
369 repo.ui.status( 424 repo.ui.status(
389 progress.complete() 444 progress.complete()
390 445
391 return requirements, gen() 446 return requirements, gen()
392 447
393 448
394 def consumev1(repo, fp, filecount, bytecount): 449 def consumev1(repo, fp, filecount: int, bytecount: int) -> None:
395 """Apply the contents from version 1 of a streaming clone file handle. 450 """Apply the contents from version 1 of a streaming clone file handle.
396 451
397 This takes the output from "stream_out" and applies it to the specified 452 This takes the output from "stream_out" and applies it to the specified
398 repository. 453 repository.
399 454
423 # 478 #
424 # But transaction nesting can't be simply prohibited, because 479 # But transaction nesting can't be simply prohibited, because
425 # nesting occurs also in ordinary case (e.g. enabling 480 # nesting occurs also in ordinary case (e.g. enabling
426 # clonebundles). 481 # clonebundles).
427 482
483 total_file_count = 0
428 with repo.transaction(b'clone'): 484 with repo.transaction(b'clone'):
429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount): 485 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
430 for i in range(filecount): 486 for i in range(filecount):
431 # XXX doesn't support '\n' or '\r' in filenames 487 # XXX doesn't support '\n' or '\r' in filenames
432 if hasattr(fp, 'readline'): 488 if hasattr(fp, 'readline'):
451 b'adding %s (%s)\n' % (name, util.bytecount(size)) 507 b'adding %s (%s)\n' % (name, util.bytecount(size))
452 ) 508 )
453 # for backwards compat, name was partially encoded 509 # for backwards compat, name was partially encoded
454 path = store.decodedir(name) 510 path = store.decodedir(name)
455 with repo.svfs(path, b'w', backgroundclose=True) as ofp: 511 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
512 total_file_count += 1
456 for chunk in util.filechunkiter(fp, limit=size): 513 for chunk in util.filechunkiter(fp, limit=size):
457 progress.increment(step=len(chunk)) 514 progress.increment(step=len(chunk))
458 ofp.write(chunk) 515 ofp.write(chunk)
459 516
460 # force @filecache properties to be reloaded from 517 # force @filecache properties to be reloaded from
461 # streamclone-ed file at next access 518 # streamclone-ed file at next access
462 repo.invalidate(clearfilecache=True) 519 repo.invalidate(clearfilecache=True)
463 520
464 elapsed = util.timer() - start
465 if elapsed <= 0:
466 elapsed = 0.001
467 progress.complete() 521 progress.complete()
468 repo.ui.status( 522 _report_transferred(repo, start, total_file_count, bytecount)
469 _(b'transferred %s in %.1f seconds (%s/sec)\n') 523
470 % ( 524
471 util.bytecount(bytecount), 525 def readbundle1header(fp) -> tuple[int, int, Set[bytes]]:
472 elapsed,
473 util.bytecount(bytecount / elapsed),
474 )
475 )
476
477
478 def readbundle1header(fp):
479 compression = fp.read(2) 526 compression = fp.read(2)
480 if compression != b'UN': 527 if compression != b'UN':
481 raise error.Abort( 528 raise error.Abort(
482 _( 529 _(
483 b'only uncompressed stream clone bundles are ' 530 b'only uncompressed stream clone bundles are '
501 requirements = set(requires.rstrip(b'\0').split(b',')) 548 requirements = set(requires.rstrip(b'\0').split(b','))
502 549
503 return filecount, bytecount, requirements 550 return filecount, bytecount, requirements
504 551
505 552
506 def applybundlev1(repo, fp): 553 def applybundlev1(repo, fp) -> None:
507 """Apply the content from a stream clone bundle version 1. 554 """Apply the content from a stream clone bundle version 1.
508 555
509 We assume the 4 byte header has been read and validated and the file handle 556 We assume the 4 byte header has been read and validated and the file handle
510 is at the 2 byte compression identifier. 557 is at the 2 byte compression identifier.
511 """ 558 """
531 578
532 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle 579 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
533 readers to perform bundle type-specific functionality. 580 readers to perform bundle type-specific functionality.
534 """ 581 """
535 582
536 def __init__(self, fh): 583 def __init__(self, fh) -> None:
537 self._fh = fh 584 self._fh = fh
538 585
539 def apply(self, repo): 586 def apply(self, repo) -> None:
540 return applybundlev1(repo, self._fh) 587 return applybundlev1(repo, self._fh)
541 588
542 589
543 # type of file to stream 590 # type of file to stream
544 _fileappend = 0 # append only file 591 _fileappend = 0 # append only file
548 _srcstore = b's' # store (svfs) 595 _srcstore = b's' # store (svfs)
549 _srccache = b'c' # cache (cache) 596 _srccache = b'c' # cache (cache)
550 597
551 598
552 # This is it's own function so extensions can override it. 599 # This is it's own function so extensions can override it.
553 def _walkstreamfullstorefiles(repo): 600 def _walkstreamfullstorefiles(repo) -> list[bytes]:
554 """list snapshot file from the store""" 601 """list snapshot file from the store"""
555 fnames = [] 602 fnames = []
556 if not repo.publishing(): 603 if not repo.publishing():
557 fnames.append(b'phaseroots') 604 fnames.append(b'phaseroots')
558 return fnames 605 return fnames
585 # usage. The Windows value of 2 is actually 1 file open at a time, due to 632 # usage. The Windows value of 2 is actually 1 file open at a time, due to
586 # the `flush_count = self.MAX_OPEN // 2` and `self.MAX_OPEN - 1` threshold 633 # the `flush_count = self.MAX_OPEN // 2` and `self.MAX_OPEN - 1` threshold
587 # for flushing to disk in __call__(). 634 # for flushing to disk in __call__().
588 MAX_OPEN = 2 if pycompat.iswindows else 100 635 MAX_OPEN = 2 if pycompat.iswindows else 100
589 636
590 def __init__(self): 637 def __init__(self) -> None:
591 self._counter = 0 638 self._counter = 0
592 self._volatile_fps = None 639 self._volatile_fps = None
593 self._copies = None 640 self._copies = None
594 self._dst_dir = None 641 self._dst_dir = None
595 642
615 self._dst_dir = None 662 self._dst_dir = None
616 assert self._volatile_fps is None 663 assert self._volatile_fps is None
617 assert self._copies is None 664 assert self._copies is None
618 assert self._dst_dir is None 665 assert self._dst_dir is None
619 666
620 def _init_tmp_copies(self): 667 def _init_tmp_copies(self) -> None:
621 """prepare a temporary directory to save volatile files 668 """prepare a temporary directory to save volatile files
622 669
623 This will be used as backup if we have too many files open""" 670 This will be used as backup if we have too many files open"""
624 assert 0 < self._counter 671 assert 0 < self._counter
625 assert self._copies is None 672 assert self._copies is None
626 assert self._dst_dir is None 673 assert self._dst_dir is None
627 self._copies = {} 674 self._copies = {}
628 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-') 675 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
629 676
630 def _flush_some_on_disk(self): 677 def _flush_some_on_disk(self) -> None:
631 """move some of the open files to tempory files on disk""" 678 """move some of the open files to tempory files on disk"""
632 if self._copies is None: 679 if self._copies is None:
633 self._init_tmp_copies() 680 self._init_tmp_copies()
634 flush_count = self.MAX_OPEN // 2 681 flush_count = self.MAX_OPEN // 2
635 for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]: 682 for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]:
644 fp.seek(0) 691 fp.seek(0)
645 bck.write(fp.read()) 692 bck.write(fp.read())
646 del self._volatile_fps[src] 693 del self._volatile_fps[src]
647 fp.close() 694 fp.close()
648 695
649 def _keep_one(self, src): 696 def _keep_one(self, src: bytes) -> int:
650 """preserve an open file handle for a given path""" 697 """preserve an open file handle for a given path"""
651 # store the file quickly to ensure we close it if any error happens 698 # store the file quickly to ensure we close it if any error happens
652 _, fp = self._volatile_fps[src] = (None, open(src, 'rb')) 699 _, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
653 fp.seek(0, os.SEEK_END) 700 fp.seek(0, os.SEEK_END)
654 size = fp.tell() 701 size = fp.tell()
655 self._volatile_fps[src] = (size, fp) 702 self._volatile_fps[src] = (size, fp)
656 703 return size
657 def __call__(self, src): 704
705 def __call__(self, src: bytes) -> None:
658 """preserve the volatile file at src""" 706 """preserve the volatile file at src"""
659 assert 0 < self._counter 707 assert 0 < self._counter
660 if len(self._volatile_fps) >= (self.MAX_OPEN - 1): 708 if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
661 self._flush_some_on_disk() 709 self._flush_some_on_disk()
662 self._keep_one(src) 710 self._keep_one(src)
711
712 def try_keep(self, src: bytes) -> Optional[int]:
713 """record a volatile file and returns it size
714
715 return None if the file does not exists.
716
717 Used for cache file that are not lock protected.
718 """
719 assert 0 < self._counter
720 if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
721 self._flush_some_on_disk()
722 try:
723 return self._keep_one(src)
724 except OSError as err:
725 if err.errno not in (errno.ENOENT, errno.EPERM):
726 raise
727 return None
663 728
664 @contextlib.contextmanager 729 @contextlib.contextmanager
665 def open(self, src): 730 def open(self, src):
666 assert 0 < self._counter 731 assert 0 < self._counter
667 entry = self._volatile_fps.get(src) 732 entry = self._volatile_fps.get(src)
699 # 764 #
700 # this assert is duplicated (from _makemap) as author might think this is 765 # this assert is duplicated (from _makemap) as author might think this is
701 # fine, while this is really not fine. 766 # fine, while this is really not fine.
702 if repo.vfs in vfsmap.values(): 767 if repo.vfs in vfsmap.values():
703 raise error.ProgrammingError( 768 raise error.ProgrammingError(
704 b'repo.vfs must not be added to vfsmap for security reasons' 769 'repo.vfs must not be added to vfsmap for security reasons'
705 ) 770 )
706 771
707 # translate the vfs one 772 # translate the vfs one
708 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] 773 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
774 _test_sync_point_walk_1_2(repo)
709 775
710 max_linkrev = len(repo) 776 max_linkrev = len(repo)
711 file_count = totalfilesize = 0 777 file_count = totalfilesize = 0
712 with util.nogc(): 778 with VolatileManager() as volatiles:
713 # record the expected size of every file
714 for k, vfs, e in entries:
715 for f in e.files():
716 file_count += 1
717 totalfilesize += f.file_size(vfs)
718
719 progress = repo.ui.makeprogress(
720 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
721 )
722 progress.update(0)
723 with VolatileManager() as volatiles, progress:
724 # make sure we preserve volatile files 779 # make sure we preserve volatile files
725 for k, vfs, e in entries: 780 with util.nogc():
726 for f in e.files(): 781 # record the expected size of every file
727 if f.is_volatile: 782 for k, vfs, e in entries:
728 volatiles(vfs.join(f.unencoded_path)) 783 e.preserve_volatiles(vfs, volatiles)
729 # the first yield release the lock on the repository 784 for f in e.files():
730 yield file_count, totalfilesize 785 file_count += 1
731 totalbytecount = 0 786 totalfilesize += f.file_size(vfs)
732 787
733 for src, vfs, e in entries: 788 progress = repo.ui.makeprogress(
734 entry_streams = e.get_streams( 789 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
735 repo=repo, 790 )
736 vfs=vfs, 791 progress.update(0)
737 volatiles=volatiles, 792 with progress:
738 max_changeset=max_linkrev, 793 # the first yield release the lock on the repository
739 preserve_file_count=True, 794 yield file_count, totalfilesize
740 ) 795 totalbytecount = 0
741 for name, stream, size in entry_streams: 796
742 yield src 797 for src, vfs, e in entries:
743 yield util.uvarintencode(len(name)) 798 entry_streams = e.get_streams(
744 yield util.uvarintencode(size) 799 repo=repo,
745 yield name 800 vfs=vfs,
746 bytecount = 0 801 volatiles=volatiles,
747 for chunk in stream: 802 max_changeset=max_linkrev,
748 bytecount += len(chunk) 803 preserve_file_count=True,
749 totalbytecount += len(chunk) 804 )
750 progress.update(totalbytecount) 805 for name, stream, size in entry_streams:
751 yield chunk 806 yield src
752 if bytecount != size: 807 yield util.uvarintencode(len(name))
753 # Would most likely be caused by a race due to `hg 808 yield util.uvarintencode(size)
754 # strip` or a revlog split 809 yield name
755 msg = _( 810 bytecount = 0
756 b'clone could only read %d bytes from %s, but ' 811 for chunk in stream:
757 b'expected %d bytes' 812 bytecount += len(chunk)
758 ) 813 totalbytecount += len(chunk)
759 raise error.Abort(msg % (bytecount, name, size)) 814 progress.update(totalbytecount)
760 815 yield chunk
761 816 if bytecount != size:
762 def _emit3(repo, entries): 817 # Would most likely be caused by a race due to `hg
818 # strip` or a revlog split
819 msg = _(
820 b'clone could only read %d bytes from %s, but '
821 b'expected %d bytes'
822 )
823 raise error.Abort(msg % (bytecount, name, size))
824
825
826 def _emit3(repo, entries) -> Iterator[bytes | None]:
763 """actually emit the stream bundle (v3)""" 827 """actually emit the stream bundle (v3)"""
764 vfsmap = _makemap(repo) 828 vfsmap = _makemap(repo)
765 # we keep repo.vfs out of the map on purpose, ther are too many dangers 829 # we keep repo.vfs out of the map on purpose, ther are too many dangers
766 # there (eg: .hg/hgrc), 830 # there (eg: .hg/hgrc),
767 # 831 #
768 # this assert is duplicated (from _makemap) as authors might think this is 832 # this assert is duplicated (from _makemap) as authors might think this is
769 # fine, while this is really not fine. 833 # fine, while this is really not fine.
770 if repo.vfs in vfsmap.values(): 834 if repo.vfs in vfsmap.values():
771 raise error.ProgrammingError( 835 raise error.ProgrammingError(
772 b'repo.vfs must not be added to vfsmap for security reasons' 836 'repo.vfs must not be added to vfsmap for security reasons'
773 ) 837 )
774 838
775 # translate the vfs once 839 # translate the vfs once
776 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] 840 # we only turn this into a list for the `_test_sync`, this is not ideal
777 total_entry_count = len(entries) 841 base_entries = list(entries)
778 842 _test_sync_point_walk_1_2(repo)
779 max_linkrev = len(repo) 843 entries = []
780 progress = repo.ui.makeprogress( 844 with VolatileManager() as volatiles:
781 _(b'bundle'),
782 total=total_entry_count,
783 unit=_(b'entry'),
784 )
785 progress.update(0)
786 with VolatileManager() as volatiles, progress:
787 # make sure we preserve volatile files 845 # make sure we preserve volatile files
788 for k, vfs, e in entries: 846 for vfs_key, e in base_entries:
847 vfs = vfsmap[vfs_key]
848 any_files = True
789 if e.maybe_volatile: 849 if e.maybe_volatile:
850 any_files = False
851 e.preserve_volatiles(vfs, volatiles)
790 for f in e.files(): 852 for f in e.files():
791 if f.is_volatile: 853 if f.is_volatile:
792 # record the expected size under lock 854 # record the expected size under lock
793 f.file_size(vfs) 855 f.file_size(vfs)
794 volatiles(vfs.join(f.unencoded_path)) 856 any_files = True
857 if any_files:
858 entries.append((vfs_key, vfsmap[vfs_key], e))
859
860 total_entry_count = len(entries)
861
862 max_linkrev = len(repo)
863 progress = repo.ui.makeprogress(
864 _(b'bundle'),
865 total=total_entry_count,
866 unit=_(b'entry'),
867 )
868 progress.update(0)
795 # the first yield release the lock on the repository 869 # the first yield release the lock on the repository
796 yield None 870 yield None
797 871 with progress:
798 yield util.uvarintencode(total_entry_count) 872 yield util.uvarintencode(total_entry_count)
799 873
800 for src, vfs, e in entries: 874 for src, vfs, e in entries:
801 entry_streams = e.get_streams( 875 entry_streams = e.get_streams(
802 repo=repo, 876 repo=repo,
803 vfs=vfs, 877 vfs=vfs,
804 volatiles=volatiles, 878 volatiles=volatiles,
805 max_changeset=max_linkrev, 879 max_changeset=max_linkrev,
806 ) 880 )
807 yield util.uvarintencode(len(entry_streams)) 881 yield util.uvarintencode(len(entry_streams))
808 for name, stream, size in entry_streams: 882 for name, stream, size in entry_streams:
809 yield src 883 yield src
810 yield util.uvarintencode(len(name)) 884 yield util.uvarintencode(len(name))
811 yield util.uvarintencode(size) 885 yield util.uvarintencode(size)
812 yield name 886 yield name
813 yield from stream 887 yield from stream
814 progress.increment() 888 progress.increment()
815 889
816 890
817 def _test_sync_point_walk_1(repo): 891 def _test_sync_point_walk_1_2(repo):
818 """a function for synchronisation during tests""" 892 """a function for synchronisation during tests
819 893
820 894 Triggered after gather entry, but before starting to process/preserve them
821 def _test_sync_point_walk_2(repo): 895 under lock.
822 """a function for synchronisation during tests""" 896
823 897 (on v1 is triggered before the actual walk start)
824 898 """
825 def _entries_walk(repo, includes, excludes, includeobsmarkers): 899
900
901 def _test_sync_point_walk_3(repo):
902 """a function for synchronisation during tests
903
904 Triggered right before releasing the lock, but after computing what need
905 needed to compute under lock.
906 """
907
908
909 def _test_sync_point_walk_4(repo):
910 """a function for synchronisation during tests
911
912 Triggered right after releasing the lock.
913 """
914
915
916 # not really a StoreEntry, but close enough
917 class CacheEntry(store.SimpleStoreEntry):
918 """Represent an entry for Cache files
919
920 It has special logic to preserve cache file early and accept optional
921 presence.
922
923
924 (Yes... this is not really a StoreEntry, but close enough. We could have a
925 BaseEntry base class, bbut the store one would be identical)
926 """
927
928 def __init__(self, entry_path) -> None:
929 super().__init__(
930 entry_path,
931 # we will directly deal with that in `setup_cache_file`
932 is_volatile=True,
933 )
934
935 def preserve_volatiles(self, vfs, volatiles) -> None:
936 self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
937 if self._file_size is None:
938 self._files = []
939 else:
940 assert self._is_volatile
941 self._files = [
942 CacheFile(
943 unencoded_path=self._entry_path,
944 file_size=self._file_size,
945 is_volatile=self._is_volatile,
946 )
947 ]
948
949 def files(self) -> list[store.StoreFile]:
950 if self._files is None:
951 self._files = [
952 CacheFile(
953 unencoded_path=self._entry_path,
954 is_volatile=self._is_volatile,
955 )
956 ]
957 return super().files()
958
959
960 class CacheFile(store.StoreFile):
961 # inform the "copy/hardlink" version that this file might be missing
962 # without consequences.
963 optional: bool = True
964
965
966 def _entries_walk(repo, includes, excludes, includeobsmarkers: bool):
826 """emit a seris of files information useful to clone a repo 967 """emit a seris of files information useful to clone a repo
827 968
828 return (vfs-key, entry) iterator 969 return (vfs-key, entry) iterator
829 970
830 Where `entry` is StoreEntry. (used even for cache entries) 971 Where `entry` is StoreEntry. (used even for cache entries)
849 yield (_srcstore, entry) 990 yield (_srcstore, entry)
850 991
851 for name in cacheutil.cachetocopy(repo): 992 for name in cacheutil.cachetocopy(repo):
852 if repo.cachevfs.exists(name): 993 if repo.cachevfs.exists(name):
853 # not really a StoreEntry, but close enough 994 # not really a StoreEntry, but close enough
854 entry = store.SimpleStoreEntry( 995 yield (_srccache, CacheEntry(entry_path=name))
855 entry_path=name, 996
856 is_volatile=True, 997
857 ) 998 def generatev2(repo, includes, excludes, includeobsmarkers: bool):
858 yield (_srccache, entry)
859
860
861 def generatev2(repo, includes, excludes, includeobsmarkers):
862 """Emit content for version 2 of a streaming clone. 999 """Emit content for version 2 of a streaming clone.
863 1000
864 the data stream consists the following entries: 1001 the data stream consists the following entries:
865 1) A char representing the file destination (eg: store or cache) 1002 1) A char representing the file destination (eg: store or cache)
866 2) A varint containing the length of the filename 1003 2) A varint containing the length of the filename
878 repo, 1015 repo,
879 includes=includes, 1016 includes=includes,
880 excludes=excludes, 1017 excludes=excludes,
881 includeobsmarkers=includeobsmarkers, 1018 includeobsmarkers=includeobsmarkers,
882 ) 1019 )
883
884 chunks = _emit2(repo, entries) 1020 chunks = _emit2(repo, entries)
885 first = next(chunks) 1021 first = next(chunks)
886 file_count, total_file_size = first 1022 file_count, total_file_size = first
887 _test_sync_point_walk_1(repo) 1023 _test_sync_point_walk_3(repo)
888 _test_sync_point_walk_2(repo) 1024 _test_sync_point_walk_4(repo)
889 1025
890 return file_count, total_file_size, chunks 1026 return file_count, total_file_size, chunks
891 1027
892 1028
893 def generatev3(repo, includes, excludes, includeobsmarkers): 1029 def generatev3(
1030 repo, includes, excludes, includeobsmarkers: bool
1031 ) -> Iterator[bytes | None]:
894 """Emit content for version 3 of a streaming clone. 1032 """Emit content for version 3 of a streaming clone.
895 1033
896 the data stream consists the following: 1034 the data stream consists the following:
897 1) A varint E containing the number of entries (can be 0), then E entries follow 1035 1) A varint E containing the number of entries (can be 0), then E entries follow
898 2) For each entry: 1036 2) For each entry:
928 includeobsmarkers=includeobsmarkers, 1066 includeobsmarkers=includeobsmarkers,
929 ) 1067 )
930 chunks = _emit3(repo, list(entries)) 1068 chunks = _emit3(repo, list(entries))
931 first = next(chunks) 1069 first = next(chunks)
932 assert first is None 1070 assert first is None
933 _test_sync_point_walk_1(repo) 1071 _test_sync_point_walk_3(repo)
934 _test_sync_point_walk_2(repo) 1072 _test_sync_point_walk_4(repo)
935 1073
936 return chunks 1074 return chunks
937 1075
938 1076
939 @contextlib.contextmanager 1077 @contextlib.contextmanager
946 yield 1084 yield
947 else: 1085 else:
948 yield 1086 yield
949 1087
950 1088
951 def consumev2(repo, fp, filecount, filesize): 1089 class V2Report:
1090 """a small class to track the data we saw within the stream"""
1091
1092 def __init__(self):
1093 self.byte_count = 0
1094
1095
1096 def consumev2(repo, fp, filecount: int, filesize: int) -> None:
952 """Apply the contents from a version 2 streaming clone. 1097 """Apply the contents from a version 2 streaming clone.
953 1098
954 Data is read from an object that only needs to provide a ``read(size)`` 1099 Data is read from an object that only needs to provide a ``read(size)``
955 method. 1100 method.
956 """ 1101 """
957 with repo.lock(): 1102 with repo.lock():
958 repo.ui.status( 1103 repo.ui.status(
959 _(b'%d files to transfer, %s of data\n') 1104 _(b'%d files to transfer, %s of data\n')
960 % (filecount, util.bytecount(filesize)) 1105 % (filecount, util.bytecount(filesize))
961 ) 1106 )
962 1107 progress = repo.ui.makeprogress(
1108 _(b'clone'),
1109 total=filesize,
1110 unit=_(b'bytes'),
1111 )
963 start = util.timer() 1112 start = util.timer()
964 progress = repo.ui.makeprogress( 1113 report = V2Report()
965 _(b'clone'), total=filesize, unit=_(b'bytes')
966 )
967 progress.update(0)
968 1114
969 vfsmap = _makemap(repo) 1115 vfsmap = _makemap(repo)
970 # we keep repo.vfs out of the on purpose, ther are too many danger 1116 # we keep repo.vfs out of the on purpose, ther are too many danger
971 # there (eg: .hg/hgrc), 1117 # there (eg: .hg/hgrc),
972 # 1118 #
973 # this assert is duplicated (from _makemap) as author might think this 1119 # this assert is duplicated (from _makemap) as author might think this
974 # is fine, while this is really not fine. 1120 # is fine, while this is really not fine.
975 if repo.vfs in vfsmap.values(): 1121 if repo.vfs in vfsmap.values():
976 raise error.ProgrammingError( 1122 raise error.ProgrammingError(
977 b'repo.vfs must not be added to vfsmap for security reasons' 1123 'repo.vfs must not be added to vfsmap for security reasons'
978 ) 1124 )
979 1125
1126 cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu')
1127 mem_profile = scmutil.get_resource_profile(repo.ui, b'memory')
1128 threaded = repo.ui.configbool(
1129 b"worker", b"parallel-stream-bundle-processing"
1130 )
1131 num_writer = repo.ui.configint(
1132 b"worker",
1133 b"parallel-stream-bundle-processing.num-writer",
1134 )
1135 if num_writer <= 0:
1136 num_writer = DEFAULT_NUM_WRITER[cpu_profile]
1137 memory_target = repo.ui.configbytes(
1138 b"worker",
1139 b"parallel-stream-bundle-processing.memory-target",
1140 )
1141 if memory_target < 0:
1142 memory_target = None
1143 elif memory_target == 0:
1144 memory_target = DEFAULT_MEMORY_TARGET[mem_profile]
980 with repo.transaction(b'clone'): 1145 with repo.transaction(b'clone'):
981 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) 1146 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
982 with nested(*ctxs): 1147 with nested(*ctxs):
983 for i in range(filecount): 1148 workers = []
984 src = util.readexactly(fp, 1) 1149 info_queue = None
985 vfs = vfsmap[src] 1150 data_queue = None
986 namelen = util.uvarintdecodestream(fp) 1151 mark_used = None
987 datalen = util.uvarintdecodestream(fp) 1152 try:
988 1153 if not threaded:
989 name = util.readexactly(fp, namelen) 1154 fc = _FileChunker
990 1155 raw_data = fp
991 if repo.ui.debugflag: 1156 else:
992 repo.ui.debug( 1157 fc = _ThreadSafeFileChunker
993 b'adding [%s] %s (%s)\n' 1158 data_queue = _DataQueue(memory_target=memory_target)
994 % (src, name, util.bytecount(datalen)) 1159 if memory_target is not None:
1160 mark_used = data_queue.mark_used
1161 raw_data = util.chunkbuffer(data_queue)
1162
1163 w = threading.Thread(
1164 target=data_queue.fill_from,
1165 args=(fp,),
995 ) 1166 )
996 1167 workers.append(w)
997 with vfs(name, b'w') as ofp: 1168 w.start()
998 for chunk in util.filechunkiter(fp, limit=datalen): 1169 files = _v2_parse_files(
999 progress.increment(step=len(chunk)) 1170 repo,
1000 ofp.write(chunk) 1171 raw_data,
1172 vfsmap,
1173 filecount,
1174 progress,
1175 report,
1176 file_chunker=fc,
1177 mark_used=mark_used,
1178 )
1179 if not threaded:
1180 _write_files(files)
1181 else:
1182 info_queue = _FileInfoQueue(files)
1183
1184 for __ in range(num_writer):
1185 w = threading.Thread(
1186 target=_write_files,
1187 args=(info_queue,),
1188 )
1189 workers.append(w)
1190 w.start()
1191 info_queue.fill()
1192 except: # re-raises
1193 if data_queue is not None:
1194 data_queue.abort()
1195 raise
1196 finally:
1197 # shut down all the workers
1198 if info_queue is not None:
1199 # this is strictly speaking one too many worker for
1200 # this queu, but closing too many is not a problem.
1201 info_queue.close(len(workers))
1202 for w in workers:
1203 w.join()
1001 1204
1002 # force @filecache properties to be reloaded from 1205 # force @filecache properties to be reloaded from
1003 # streamclone-ed file at next access 1206 # streamclone-ed file at next access
1004 repo.invalidate(clearfilecache=True) 1207 repo.invalidate(clearfilecache=True)
1005 1208
1006 elapsed = util.timer() - start 1209 progress.complete()
1007 if elapsed <= 0: 1210 # acknowledge the end of the bundle2 part, this help aligning
1008 elapsed = 0.001 1211 # sequential and parallel behavior.
1009 repo.ui.status( 1212 remains = fp.read(1)
1010 _(b'transferred %s in %.1f seconds (%s/sec)\n') 1213 assert not remains
1011 % ( 1214 _report_transferred(repo, start, filecount, report.byte_count)
1012 util.bytecount(progress.pos), 1215
1013 elapsed, 1216
1014 util.bytecount(progress.pos / elapsed), 1217 # iterator of chunk of bytes that constitute a file content.
1218 FileChunksT = Iterator[bytes]
1219 # Contains the information necessary to write stream file on disk
1220 FileInfoT = Tuple[
1221 bytes, # real fs path
1222 Optional[int], # permission to give to chmod
1223 FileChunksT, # content
1224 ]
1225
1226
1227 class _Queue:
1228 """a reimplementation of queue.Queue which doesn't use thread.Condition"""
1229
1230 def __init__(self):
1231 self._queue = collections.deque()
1232
1233 # the "_lock" protect manipulation of the "_queue" deque
1234 # the "_wait" is used to have the "get" thread waits for the
1235 # "put" thread when the queue is empty.
1236 #
1237 # This is similar to the "threading.Condition", but without the absurd
1238 # slowness of the stdlib implementation.
1239 #
1240 # the "_wait" is always released while holding the "_lock".
1241 self._lock = threading.Lock()
1242 self._wait = threading.Lock()
1243
1244 def put(self, item):
1245 with self._lock:
1246 self._queue.append(item)
1247 # if anyone is waiting on item, unblock it.
1248 if self._wait.locked():
1249 self._wait.release()
1250
1251 def get(self):
1252 with self._lock:
1253 while len(self._queue) == 0:
1254 # "arm" the waiting lock
1255 self._wait.acquire(blocking=False)
1256 # release the lock to let other touch the queue
1257 # (especially the put call we wait on)
1258 self._lock.release()
1259 # wait for for a `put` call to release the lock
1260 self._wait.acquire()
1261 # grab the lock to look at a possible available value
1262 self._lock.acquire()
1263 # disarm the lock if necessary.
1264 #
1265 # If the queue only constains one item, keep the _wait lock
1266 # armed, as there is no need to wake another waiter anyway.
1267 if self._wait.locked() and len(self._queue) > 1:
1268 self._wait.release()
1269 return self._queue.popleft()
1270
1271
1272 class _DataQueue:
1273 """A queue passing data from the bundle stream to other thread
1274
1275 It has a "memory_target" optional parameter to avoid buffering too much
1276 information. The implementation is not exact and the memory target might be
1277 exceed for a time in some situation.
1278 """
1279
1280 def __init__(self, memory_target=None):
1281 self._q = _Queue()
1282 self._abort = False
1283 self._memory_target = memory_target
1284 if self._memory_target is not None and self._memory_target <= 0:
1285 raise error.ProgrammingError("memory target should be > 0")
1286
1287 # the "_lock" protect manipulation of the _current_used" variable
1288 # the "_wait" is used to have the "reading" thread waits for the
1289 # "using" thread when the buffer is full.
1290 #
1291 # This is similar to the "threading.Condition", but without the absurd
1292 # slowness of the stdlib implementation.
1293 #
1294 # the "_wait" is always released while holding the "_lock".
1295 self._lock = threading.Lock()
1296 self._wait = threading.Lock()
1297 # only the stream reader touch this, it is find to touch without the lock
1298 self._current_read = 0
1299 # do not touch this without the lock
1300 self._current_used = 0
1301
1302 def _has_free_space(self):
1303 """True if more data can be read without further exceeding memory target
1304
1305 Must be called under the lock.
1306 """
1307 if self._memory_target is None:
1308 # Ideally we should not even get into the locking business in that
1309 # case, but we keep the implementation simple for now.
1310 return True
1311 return (self._current_read - self._current_used) < self._memory_target
1312
1313 def mark_used(self, offset):
1314 """Notify we have used the buffer up to "offset"
1315
1316 This is meant to be used from another thread than the one filler the queue.
1317 """
1318 if self._memory_target is not None:
1319 with self._lock:
1320 if offset > self._current_used:
1321 self._current_used = offset
1322 # If the reader is waiting for room, unblock it.
1323 if self._wait.locked() and self._has_free_space():
1324 self._wait.release()
1325
1326 def fill_from(self, data):
1327 """fill the data queue from a bundle2 part object
1328
1329 This is meant to be called by the data reading thread
1330 """
1331 q = self._q
1332 try:
1333 for item in data:
1334 self._current_read += len(item)
1335 q.put(item)
1336 if self._abort:
1337 break
1338 if self._memory_target is not None:
1339 with self._lock:
1340 while not self._has_free_space():
1341 # make sure the _wait lock is locked
1342 # this is done under lock, so there case be no race with the release logic
1343 self._wait.acquire(blocking=False)
1344 self._lock.release()
1345 # acquiring the lock will block until some other thread release it.
1346 self._wait.acquire()
1347 # lets dive into the locked section again
1348 self._lock.acquire()
1349 # make sure we release the lock we just grabed if
1350 # needed.
1351 if self._wait.locked():
1352 self._wait.release()
1353 finally:
1354 q.put(None)
1355
1356 def __iter__(self):
1357 """Iterate over the bundle chunkgs
1358
1359 This is meant to be called by the data parsing thread."""
1360 q = self._q
1361 while (i := q.get()) is not None:
1362 yield i
1363 if self._abort:
1364 break
1365
1366 def abort(self):
1367 """stop the data-reading thread and interrupt the comsuming iteration
1368
1369 This is meant to be called on errors.
1370 """
1371 self._abort = True
1372 self._q.put(None)
1373 if self._memory_target is not None:
1374 with self._lock:
1375 # make sure we unstuck the reader thread.
1376 if self._wait.locked():
1377 self._wait.release()
1378
1379
1380 class _FileInfoQueue:
1381 """A thread-safe queue to passer parsed file information to the writers"""
1382
1383 def __init__(self, info: Iterable[FileInfoT]):
1384 self._info = info
1385 self._q = _Queue()
1386
1387 def fill(self):
1388 """iterate over the parsed information to file the queue
1389
1390 This is meant to be call from the thread parsing the stream information.
1391 """
1392 q = self._q
1393 for i in self._info:
1394 q.put(i)
1395
1396 def close(self, number_worker):
1397 """signal all the workers that we no longer have any file info coming
1398
1399 Called from the thread parsing the stream information (and/or the main
1400 thread if different).
1401 """
1402 for __ in range(number_worker):
1403 self._q.put(None)
1404
1405 def __iter__(self):
1406 """iterate over the available file info
1407
1408 This is meant to be called from the writer threads.
1409 """
1410 q = self._q
1411 while (i := q.get()) is not None:
1412 yield i
1413
1414
1415 class _FileChunker:
1416 """yield the chunk that constitute a file
1417
1418 This class exists as the counterpart of the threaded version and
1419 would not be very useful on its own.
1420 """
1421
1422 def __init__(
1423 self,
1424 fp: bundle2mod.unbundlepart,
1425 data_len: int,
1426 progress: scmutil.progress,
1427 report: V2Report,
1428 mark_used: Optional[Callable[[int], None]] = None,
1429 ):
1430 self.report = report
1431 self.progress = progress
1432 self._chunks = util.filechunkiter(fp, limit=data_len)
1433
1434 def fill(self) -> None:
1435 """Do nothing in non-threading context"""
1436
1437 def __iter__(self) -> FileChunksT:
1438 for chunk in self._chunks:
1439 self.report.byte_count += len(chunk)
1440 self.progress.increment(step=len(chunk))
1441 yield chunk
1442
1443
1444 class _ThreadSafeFileChunker(_FileChunker):
1445 """yield the chunk that constitute a file
1446
1447 Make sure you call the "fill" function in the main thread to read the
1448 right data at the right time.
1449 """
1450
1451 def __init__(
1452 self,
1453 fp: bundle2mod.unbundlepart,
1454 data_len: int,
1455 progress: scmutil.progress,
1456 report: V2Report,
1457 mark_used: Optional[Callable[[int], None]] = None,
1458 ):
1459 super().__init__(fp, data_len, progress, report)
1460 self._fp = fp
1461 self._queue = _Queue()
1462 self._mark_used = mark_used
1463
1464 def fill(self) -> None:
1465 """fill the file chunker queue with data read from the stream
1466
1467 This is meant to be called from the thread parsing information (and
1468 consuming the stream data).
1469 """
1470 try:
1471 for chunk in super().__iter__():
1472 offset = self._fp.tell()
1473 self._queue.put((chunk, offset))
1474 finally:
1475 self._queue.put(None)
1476
1477 def __iter__(self) -> FileChunksT:
1478 """Iterate over all the file chunk
1479
1480 This is meant to be called from the writer threads.
1481 """
1482 while (info := self._queue.get()) is not None:
1483 chunk, offset = info
1484 if self._mark_used is not None:
1485 self._mark_used(offset)
1486 yield chunk
1487
1488
1489 def _trivial_file(
1490 chunk: bytes,
1491 mark_used: Optional[Callable[[int], None]],
1492 offset: int,
1493 ) -> FileChunksT:
1494 """used for single chunk file,"""
1495 if mark_used is not None:
1496 mark_used(offset)
1497 yield chunk
1498
1499
1500 def _v2_parse_files(
1501 repo,
1502 fp: bundle2mod.unbundlepart,
1503 vfs_map,
1504 file_count: int,
1505 progress: scmutil.progress,
1506 report: V2Report,
1507 file_chunker: Type[_FileChunker] = _FileChunker,
1508 mark_used: Optional[Callable[[int], None]] = None,
1509 ) -> Iterator[FileInfoT]:
1510 """do the "stream-parsing" part of stream v2
1511
1512 The parsed information are yield result for consumption by the "writer"
1513 """
1514 known_dirs = set() # set of directory that we know to exists
1515 progress.update(0)
1516 for i in range(file_count):
1517 src = util.readexactly(fp, 1)
1518 namelen = util.uvarintdecodestream(fp)
1519 datalen = util.uvarintdecodestream(fp)
1520
1521 name = util.readexactly(fp, namelen)
1522
1523 if repo.ui.debugflag:
1524 repo.ui.debug(
1525 b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
1015 ) 1526 )
1016 ) 1527 vfs = vfs_map[src]
1017 progress.complete() 1528 path, mode = vfs.prepare_streamed_file(name, known_dirs)
1018 1529 if datalen <= util.DEFAULT_FILE_CHUNK:
1019 1530 c = fp.read(datalen)
1020 def consumev3(repo, fp): 1531 offset = fp.tell()
1532 report.byte_count += len(c)
1533 progress.increment(step=len(c))
1534 chunks = _trivial_file(c, mark_used, offset)
1535 yield (path, mode, iter(chunks))
1536 else:
1537 chunks = file_chunker(
1538 fp,
1539 datalen,
1540 progress,
1541 report,
1542 mark_used=mark_used,
1543 )
1544 yield (path, mode, iter(chunks))
1545 # make sure we read all the chunk before moving to the next file
1546 chunks.fill()
1547
1548
1549 def _write_files(info: Iterable[FileInfoT]):
1550 """write files from parsed data"""
1551 io_flags = os.O_WRONLY | os.O_CREAT
1552 if pycompat.iswindows:
1553 io_flags |= os.O_BINARY
1554 for path, mode, data in info:
1555 if mode is None:
1556 fd = os.open(path, io_flags)
1557 else:
1558 fd = os.open(path, io_flags, mode=mode)
1559 try:
1560 for chunk in data:
1561 written = os.write(fd, chunk)
1562 # write missing pieces if the write was interrupted
1563 while written < len(chunk):
1564 written = os.write(fd, chunk[written:])
1565 finally:
1566 os.close(fd)
1567
1568
1569 def consumev3(repo, fp) -> None:
1021 """Apply the contents from a version 3 streaming clone. 1570 """Apply the contents from a version 3 streaming clone.
1022 1571
1023 Data is read from an object that only needs to provide a ``read(size)`` 1572 Data is read from an object that only needs to provide a ``read(size)``
1024 method. 1573 method.
1025 """ 1574 """
1043 # 1592 #
1044 # this assert is duplicated (from _makemap) as authors might think this 1593 # this assert is duplicated (from _makemap) as authors might think this
1045 # is fine, while this is really not fine. 1594 # is fine, while this is really not fine.
1046 if repo.vfs in vfsmap.values(): 1595 if repo.vfs in vfsmap.values():
1047 raise error.ProgrammingError( 1596 raise error.ProgrammingError(
1048 b'repo.vfs must not be added to vfsmap for security reasons' 1597 'repo.vfs must not be added to vfsmap for security reasons'
1049 ) 1598 )
1050 1599 total_file_count = 0
1051 with repo.transaction(b'clone'): 1600 with repo.transaction(b'clone'):
1052 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) 1601 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
1053 with nested(*ctxs): 1602 with nested(*ctxs):
1054 for i in range(entrycount): 1603 for i in range(entrycount):
1055 filecount = util.uvarintdecodestream(fp) 1604 filecount = util.uvarintdecodestream(fp)
1056 if filecount == 0: 1605 if filecount == 0:
1057 if repo.ui.debugflag: 1606 if repo.ui.debugflag:
1058 repo.ui.debug(b'entry with no files [%d]\n' % (i)) 1607 repo.ui.debug(b'entry with no files [%d]\n' % (i))
1608 total_file_count += filecount
1059 for i in range(filecount): 1609 for i in range(filecount):
1060 src = util.readexactly(fp, 1) 1610 src = util.readexactly(fp, 1)
1061 vfs = vfsmap[src] 1611 vfs = vfsmap[src]
1062 namelen = util.uvarintdecodestream(fp) 1612 namelen = util.uvarintdecodestream(fp)
1063 datalen = util.uvarintdecodestream(fp) 1613 datalen = util.uvarintdecodestream(fp)
1077 1627
1078 # force @filecache properties to be reloaded from 1628 # force @filecache properties to be reloaded from
1079 # streamclone-ed file at next access 1629 # streamclone-ed file at next access
1080 repo.invalidate(clearfilecache=True) 1630 repo.invalidate(clearfilecache=True)
1081 1631
1082 elapsed = util.timer() - start
1083 if elapsed <= 0:
1084 elapsed = 0.001
1085 msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
1086 byte_count = util.bytecount(bytes_transferred)
1087 bytes_sec = util.bytecount(bytes_transferred / elapsed)
1088 msg %= (byte_count, elapsed, bytes_sec)
1089 repo.ui.status(msg)
1090 progress.complete() 1632 progress.complete()
1091 1633 _report_transferred(repo, start, total_file_count, bytes_transferred)
1092 1634
1093 def applybundlev2(repo, fp, filecount, filesize, requirements): 1635
1636 def applybundlev2(
1637 repo, fp, filecount: int, filesize: int, requirements: Iterable[bytes]
1638 ) -> None:
1094 from . import localrepo 1639 from . import localrepo
1095 1640
1096 missingreqs = [r for r in requirements if r not in repo.supported] 1641 missingreqs = [r for r in requirements if r not in repo.supported]
1097 if missingreqs: 1642 if missingreqs:
1098 raise error.Abort( 1643 raise error.Abort(
1099 _(b'unable to apply stream clone: unsupported format: %s') 1644 _(b'unable to apply stream clone: unsupported format: %s')
1100 % b', '.join(sorted(missingreqs)) 1645 % b', '.join(sorted(missingreqs))
1101 ) 1646 )
1102 1647
1103 consumev2(repo, fp, filecount, filesize) 1648 with util.nogc():
1649 consumev2(repo, fp, filecount, filesize)
1104 1650
1105 repo.requirements = new_stream_clone_requirements( 1651 repo.requirements = new_stream_clone_requirements(
1106 repo.requirements, 1652 repo.requirements,
1107 requirements, 1653 requirements,
1108 ) 1654 )
1111 ) 1657 )
1112 scmutil.writereporequirements(repo) 1658 scmutil.writereporequirements(repo)
1113 nodemap.post_stream_cleanup(repo) 1659 nodemap.post_stream_cleanup(repo)
1114 1660
1115 1661
1116 def applybundlev3(repo, fp, requirements): 1662 def applybundlev3(repo, fp, requirements: Iterable[bytes]) -> None:
1117 from . import localrepo 1663 from . import localrepo
1118 1664
1119 missingreqs = [r for r in requirements if r not in repo.supported] 1665 missingreqs = [r for r in requirements if r not in repo.supported]
1120 if missingreqs: 1666 if missingreqs:
1121 msg = _(b'unable to apply stream clone: unsupported format: %s') 1667 msg = _(b'unable to apply stream clone: unsupported format: %s')
1133 ) 1679 )
1134 scmutil.writereporequirements(repo) 1680 scmutil.writereporequirements(repo)
1135 nodemap.post_stream_cleanup(repo) 1681 nodemap.post_stream_cleanup(repo)
1136 1682
1137 1683
1138 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress): 1684 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress) -> bool:
1139 hardlink = [True] 1685 hardlink = [True]
1140 1686
1141 def copy_used(): 1687 def copy_used():
1142 hardlink[0] = False 1688 hardlink[0] = False
1143 progress.topic = _(b'copying') 1689 progress.topic = _(b'copying')
1144 1690
1145 for k, path in entries: 1691 for k, path, optional in entries:
1146 src_vfs = src_vfs_map[k] 1692 src_vfs = src_vfs_map[k]
1147 dst_vfs = dst_vfs_map[k] 1693 dst_vfs = dst_vfs_map[k]
1148 src_path = src_vfs.join(path) 1694 src_path = src_vfs.join(path)
1149 dst_path = dst_vfs.join(path) 1695 dst_path = dst_vfs.join(path)
1150 # We cannot use dirname and makedirs of dst_vfs here because the store 1696 # We cannot use dirname and makedirs of dst_vfs here because the store
1152 dirname = os.path.dirname(dst_path) 1698 dirname = os.path.dirname(dst_path)
1153 if not os.path.exists(dirname): 1699 if not os.path.exists(dirname):
1154 util.makedirs(dirname) 1700 util.makedirs(dirname)
1155 dst_vfs.register_file(path) 1701 dst_vfs.register_file(path)
1156 # XXX we could use the #nb_bytes argument. 1702 # XXX we could use the #nb_bytes argument.
1157 util.copyfile( 1703 try:
1158 src_path, 1704 util.copyfile(
1159 dst_path, 1705 src_path,
1160 hardlink=hardlink[0], 1706 dst_path,
1161 no_hardlink_cb=copy_used, 1707 hardlink=hardlink[0],
1162 check_fs_hardlink=False, 1708 no_hardlink_cb=copy_used,
1163 ) 1709 check_fs_hardlink=False,
1710 )
1711 except FileNotFoundError:
1712 if not optional:
1713 raise
1164 progress.increment() 1714 progress.increment()
1165 return hardlink[0] 1715 return hardlink[0]
1166 1716
1167 1717
1168 def local_copy(src_repo, dest_repo): 1718 def local_copy(src_repo, dest_repo) -> None:
1169 """copy all content from one local repository to another 1719 """copy all content from one local repository to another
1170 1720
1171 This is useful for local clone""" 1721 This is useful for local clone"""
1172 src_store_requirements = { 1722 src_store_requirements = {
1173 r 1723 r
1213 # and the other one without the lock. However, in the linking case, 1763 # and the other one without the lock. However, in the linking case,
1214 # this would also requires checks that nobody is appending any data 1764 # this would also requires checks that nobody is appending any data
1215 # to the files while we do the clone, so this is not done yet. We 1765 # to the files while we do the clone, so this is not done yet. We
1216 # could do this blindly when copying files. 1766 # could do this blindly when copying files.
1217 files = [ 1767 files = [
1218 (vfs_key, f.unencoded_path) 1768 (vfs_key, f.unencoded_path, f.optional)
1219 for vfs_key, e in entries 1769 for vfs_key, e in entries
1220 for f in e.files() 1770 for f in e.files()
1221 ] 1771 ]
1222 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress) 1772 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
1223 1773