Mercurial > public > mercurial-scm > hg
comparison mercurial/streamclone.py @ 53031:e705fec4a03f stable
branching: merging with 7.0 changes
Since 6.9.3 was made after 7.0rc0 we need to deal with more branching than
usual.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 05 Mar 2025 23:02:19 +0100 |
parents | 89ab2459f62a e75ed9ae5fb9 |
children |
comparison
equal
deleted
inserted
replaced
53030:74439d1cbeba | 53031:e705fec4a03f |
---|---|
5 # This software may be used and distributed according to the terms of the | 5 # This software may be used and distributed according to the terms of the |
6 # GNU General Public License version 2 or any later version. | 6 # GNU General Public License version 2 or any later version. |
7 | 7 |
8 from __future__ import annotations | 8 from __future__ import annotations |
9 | 9 |
10 import collections | |
10 import contextlib | 11 import contextlib |
12 import errno | |
11 import os | 13 import os |
12 import struct | 14 import struct |
15 import threading | |
16 | |
17 from typing import ( | |
18 Callable, | |
19 Iterable, | |
20 Iterator, | |
21 Optional, | |
22 Set, | |
23 Tuple, | |
24 Type, | |
25 ) | |
13 | 26 |
14 from .i18n import _ | 27 from .i18n import _ |
15 from .interfaces import repository | 28 from .interfaces import repository |
16 from . import ( | 29 from . import ( |
17 bookmarks, | 30 bookmarks, |
30 from .revlogutils import ( | 43 from .revlogutils import ( |
31 nodemap, | 44 nodemap, |
32 ) | 45 ) |
33 | 46 |
34 | 47 |
35 def new_stream_clone_requirements(default_requirements, streamed_requirements): | 48 # Number arbitrarily picked, feel free to change them (but the LOW one) |
49 # | |
50 # update the configuration documentation if you touch this. | |
51 DEFAULT_NUM_WRITER = { | |
52 scmutil.RESOURCE_LOW: 1, | |
53 scmutil.RESOURCE_MEDIUM: 4, | |
54 scmutil.RESOURCE_HIGH: 8, | |
55 } | |
56 | |
57 | |
58 # Number arbitrarily picked, feel free to adjust them. Do update the | |
59 # documentation if you do so | |
60 DEFAULT_MEMORY_TARGET = { | |
61 scmutil.RESOURCE_LOW: 50 * (2**20), # 100 MB | |
62 scmutil.RESOURCE_MEDIUM: 500 * 2**20, # 500 MB | |
63 scmutil.RESOURCE_HIGH: 2 * 2**30, # 2 GB | |
64 } | |
65 | |
66 | |
67 def new_stream_clone_requirements( | |
68 default_requirements: Iterable[bytes], | |
69 streamed_requirements: Iterable[bytes], | |
70 ) -> Set[bytes]: | |
36 """determine the final set of requirement for a new stream clone | 71 """determine the final set of requirement for a new stream clone |
37 | 72 |
38 this method combine the "default" requirements that a new repository would | 73 this method combine the "default" requirements that a new repository would |
39 use with the constaint we get from the stream clone content. We keep local | 74 use with the constaint we get from the stream clone content. We keep local |
40 configuration choice when possible. | 75 configuration choice when possible. |
43 requirements & requirementsmod.STREAM_IGNORABLE_REQUIREMENTS | 78 requirements & requirementsmod.STREAM_IGNORABLE_REQUIREMENTS |
44 requirements.update(streamed_requirements) | 79 requirements.update(streamed_requirements) |
45 return requirements | 80 return requirements |
46 | 81 |
47 | 82 |
48 def streamed_requirements(repo): | 83 def streamed_requirements(repo) -> Set[bytes]: |
49 """the set of requirement the new clone will have to support | 84 """the set of requirement the new clone will have to support |
50 | 85 |
51 This is used for advertising the stream options and to generate the actual | 86 This is used for advertising the stream options and to generate the actual |
52 stream content.""" | 87 stream content.""" |
53 requiredformats = ( | 88 requiredformats = ( |
54 repo.requirements - requirementsmod.STREAM_IGNORABLE_REQUIREMENTS | 89 repo.requirements - requirementsmod.STREAM_IGNORABLE_REQUIREMENTS |
55 ) | 90 ) |
56 return requiredformats | 91 return requiredformats |
57 | 92 |
58 | 93 |
59 def canperformstreamclone(pullop, bundle2=False): | 94 def canperformstreamclone(pullop, bundle2: bool = False): |
60 """Whether it is possible to perform a streaming clone as part of pull. | 95 """Whether it is possible to perform a streaming clone as part of pull. |
61 | 96 |
62 ``bundle2`` will cause the function to consider stream clone through | 97 ``bundle2`` will cause the function to consider stream clone through |
63 bundle2 and only through bundle2. | 98 bundle2 and only through bundle2. |
64 | 99 |
148 requirements = streamreqs | 183 requirements = streamreqs |
149 | 184 |
150 return True, requirements | 185 return True, requirements |
151 | 186 |
152 | 187 |
153 def maybeperformlegacystreamclone(pullop): | 188 def maybeperformlegacystreamclone(pullop) -> None: |
154 """Possibly perform a legacy stream clone operation. | 189 """Possibly perform a legacy stream clone operation. |
155 | 190 |
156 Legacy stream clones are performed as part of pull but before all other | 191 Legacy stream clones are performed as part of pull but before all other |
157 operations. | 192 operations. |
158 | 193 |
223 repo._branchcaches.replace(repo, rbranchmap) | 258 repo._branchcaches.replace(repo, rbranchmap) |
224 | 259 |
225 repo.invalidate() | 260 repo.invalidate() |
226 | 261 |
227 | 262 |
228 def allowservergeneration(repo): | 263 def allowservergeneration(repo) -> bool: |
229 """Whether streaming clones are allowed from the server.""" | 264 """Whether streaming clones are allowed from the server.""" |
230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: | 265 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: |
231 return False | 266 return False |
232 | 267 |
233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True): | 268 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True): |
241 | 276 |
242 return True | 277 return True |
243 | 278 |
244 | 279 |
245 # This is it's own function so extensions can override it. | 280 # This is it's own function so extensions can override it. |
246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False): | 281 def _walkstreamfiles( |
282 repo, matcher=None, phase: bool = False, obsolescence: bool = False | |
283 ): | |
247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence) | 284 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence) |
248 | 285 |
249 | 286 |
250 def generatev1(repo): | 287 def _report_transferred( |
288 repo, start_time: float, file_count: int, byte_count: int | |
289 ): | |
290 """common utility to report time it took to apply the stream bundle""" | |
291 elapsed = util.timer() - start_time | |
292 if elapsed <= 0: | |
293 elapsed = 0.001 | |
294 m = _(b'stream-cloned %d files / %s in %.1f seconds (%s/sec)\n') | |
295 m %= ( | |
296 file_count, | |
297 util.bytecount(byte_count), | |
298 elapsed, | |
299 util.bytecount(byte_count / elapsed), | |
300 ) | |
301 repo.ui.status(m) | |
302 | |
303 | |
304 def generatev1(repo) -> tuple[int, int, Iterator[bytes]]: | |
251 """Emit content for version 1 of a streaming clone. | 305 """Emit content for version 1 of a streaming clone. |
252 | 306 |
253 This returns a 3-tuple of (file count, byte size, data iterator). | 307 This returns a 3-tuple of (file count, byte size, data iterator). |
254 | 308 |
255 The data iterator consists of N entries for each file being transferred. | 309 The data iterator consists of N entries for each file being transferred. |
269 entries = [] | 323 entries = [] |
270 total_bytes = 0 | 324 total_bytes = 0 |
271 # Get consistent snapshot of repo, lock during scan. | 325 # Get consistent snapshot of repo, lock during scan. |
272 with repo.lock(): | 326 with repo.lock(): |
273 repo.ui.debug(b'scanning\n') | 327 repo.ui.debug(b'scanning\n') |
328 _test_sync_point_walk_1_2(repo) | |
274 for entry in _walkstreamfiles(repo): | 329 for entry in _walkstreamfiles(repo): |
275 for f in entry.files(): | 330 for f in entry.files(): |
276 file_size = f.file_size(repo.store.vfs) | 331 file_size = f.file_size(repo.store.vfs) |
277 if file_size: | 332 if file_size: |
278 entries.append((f.unencoded_path, file_size)) | 333 entries.append((f.unencoded_path, file_size)) |
279 total_bytes += file_size | 334 total_bytes += file_size |
280 _test_sync_point_walk_1(repo) | 335 _test_sync_point_walk_3(repo) |
281 _test_sync_point_walk_2(repo) | 336 _test_sync_point_walk_4(repo) |
282 | 337 |
283 repo.ui.debug( | 338 repo.ui.debug( |
284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) | 339 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) |
285 ) | 340 ) |
286 | 341 |
287 svfs = repo.svfs | 342 svfs = repo.svfs |
288 debugflag = repo.ui.debugflag | 343 debugflag = repo.ui.debugflag |
289 | 344 |
290 def emitrevlogdata(): | 345 def emitrevlogdata() -> Iterator[bytes]: |
291 for name, size in entries: | 346 for name, size in entries: |
292 if debugflag: | 347 if debugflag: |
293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size)) | 348 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size)) |
294 # partially encode name over the wire for backwards compat | 349 # partially encode name over the wire for backwards compat |
295 yield b'%s\0%d\n' % (store.encodedir(name), size) | 350 yield b'%s\0%d\n' % (store.encodedir(name), size) |
297 # trusted by the local repo) and expensive | 352 # trusted by the local repo) and expensive |
298 with svfs(name, b'rb', auditpath=False) as fp: | 353 with svfs(name, b'rb', auditpath=False) as fp: |
299 if size <= 65536: | 354 if size <= 65536: |
300 yield fp.read(size) | 355 yield fp.read(size) |
301 else: | 356 else: |
302 for chunk in util.filechunkiter(fp, limit=size): | 357 yield from util.filechunkiter(fp, limit=size) |
303 yield chunk | |
304 | 358 |
305 return len(entries), total_bytes, emitrevlogdata() | 359 return len(entries), total_bytes, emitrevlogdata() |
306 | 360 |
307 | 361 |
308 def generatev1wireproto(repo): | 362 def generatev1wireproto(repo) -> Iterator[bytes]: |
309 """Emit content for version 1 of streaming clone suitable for the wire. | 363 """Emit content for version 1 of streaming clone suitable for the wire. |
310 | 364 |
311 This is the data output from ``generatev1()`` with 2 header lines. The | 365 This is the data output from ``generatev1()`` with 2 header lines. The |
312 first line indicates overall success. The 2nd contains the file count and | 366 first line indicates overall success. The 2nd contains the file count and |
313 byte size of payload. | 367 byte size of payload. |
327 return | 381 return |
328 | 382 |
329 # Indicates successful response. | 383 # Indicates successful response. |
330 yield b'0\n' | 384 yield b'0\n' |
331 yield b'%d %d\n' % (filecount, bytecount) | 385 yield b'%d %d\n' % (filecount, bytecount) |
332 for chunk in it: | 386 yield from it |
333 yield chunk | 387 |
334 | 388 |
335 | 389 def generatebundlev1( |
336 def generatebundlev1(repo, compression=b'UN'): | 390 repo, compression: bytes = b'UN' |
391 ) -> tuple[Set[bytes], Iterator[bytes]]: | |
337 """Emit content for version 1 of a stream clone bundle. | 392 """Emit content for version 1 of a stream clone bundle. |
338 | 393 |
339 The first 4 bytes of the output ("HGS1") denote this as stream clone | 394 The first 4 bytes of the output ("HGS1") denote this as stream clone |
340 bundle version 1. | 395 bundle version 1. |
341 | 396 |
354 compressed in the future). | 409 compressed in the future). |
355 | 410 |
356 Returns a tuple of (requirements, data generator). | 411 Returns a tuple of (requirements, data generator). |
357 """ | 412 """ |
358 if compression != b'UN': | 413 if compression != b'UN': |
359 raise ValueError(b'we do not support the compression argument yet') | 414 raise ValueError('we do not support the compression argument yet') |
360 | 415 |
361 requirements = streamed_requirements(repo) | 416 requirements = streamed_requirements(repo) |
362 requires = b','.join(sorted(requirements)) | 417 requires = b','.join(sorted(requirements)) |
363 | 418 |
364 def gen(): | 419 def gen() -> Iterator[bytes]: |
365 yield b'HGS1' | 420 yield b'HGS1' |
366 yield compression | 421 yield compression |
367 | 422 |
368 filecount, bytecount, it = generatev1(repo) | 423 filecount, bytecount, it = generatev1(repo) |
369 repo.ui.status( | 424 repo.ui.status( |
389 progress.complete() | 444 progress.complete() |
390 | 445 |
391 return requirements, gen() | 446 return requirements, gen() |
392 | 447 |
393 | 448 |
394 def consumev1(repo, fp, filecount, bytecount): | 449 def consumev1(repo, fp, filecount: int, bytecount: int) -> None: |
395 """Apply the contents from version 1 of a streaming clone file handle. | 450 """Apply the contents from version 1 of a streaming clone file handle. |
396 | 451 |
397 This takes the output from "stream_out" and applies it to the specified | 452 This takes the output from "stream_out" and applies it to the specified |
398 repository. | 453 repository. |
399 | 454 |
423 # | 478 # |
424 # But transaction nesting can't be simply prohibited, because | 479 # But transaction nesting can't be simply prohibited, because |
425 # nesting occurs also in ordinary case (e.g. enabling | 480 # nesting occurs also in ordinary case (e.g. enabling |
426 # clonebundles). | 481 # clonebundles). |
427 | 482 |
483 total_file_count = 0 | |
428 with repo.transaction(b'clone'): | 484 with repo.transaction(b'clone'): |
429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount): | 485 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount): |
430 for i in range(filecount): | 486 for i in range(filecount): |
431 # XXX doesn't support '\n' or '\r' in filenames | 487 # XXX doesn't support '\n' or '\r' in filenames |
432 if hasattr(fp, 'readline'): | 488 if hasattr(fp, 'readline'): |
451 b'adding %s (%s)\n' % (name, util.bytecount(size)) | 507 b'adding %s (%s)\n' % (name, util.bytecount(size)) |
452 ) | 508 ) |
453 # for backwards compat, name was partially encoded | 509 # for backwards compat, name was partially encoded |
454 path = store.decodedir(name) | 510 path = store.decodedir(name) |
455 with repo.svfs(path, b'w', backgroundclose=True) as ofp: | 511 with repo.svfs(path, b'w', backgroundclose=True) as ofp: |
512 total_file_count += 1 | |
456 for chunk in util.filechunkiter(fp, limit=size): | 513 for chunk in util.filechunkiter(fp, limit=size): |
457 progress.increment(step=len(chunk)) | 514 progress.increment(step=len(chunk)) |
458 ofp.write(chunk) | 515 ofp.write(chunk) |
459 | 516 |
460 # force @filecache properties to be reloaded from | 517 # force @filecache properties to be reloaded from |
461 # streamclone-ed file at next access | 518 # streamclone-ed file at next access |
462 repo.invalidate(clearfilecache=True) | 519 repo.invalidate(clearfilecache=True) |
463 | 520 |
464 elapsed = util.timer() - start | |
465 if elapsed <= 0: | |
466 elapsed = 0.001 | |
467 progress.complete() | 521 progress.complete() |
468 repo.ui.status( | 522 _report_transferred(repo, start, total_file_count, bytecount) |
469 _(b'transferred %s in %.1f seconds (%s/sec)\n') | 523 |
470 % ( | 524 |
471 util.bytecount(bytecount), | 525 def readbundle1header(fp) -> tuple[int, int, Set[bytes]]: |
472 elapsed, | |
473 util.bytecount(bytecount / elapsed), | |
474 ) | |
475 ) | |
476 | |
477 | |
478 def readbundle1header(fp): | |
479 compression = fp.read(2) | 526 compression = fp.read(2) |
480 if compression != b'UN': | 527 if compression != b'UN': |
481 raise error.Abort( | 528 raise error.Abort( |
482 _( | 529 _( |
483 b'only uncompressed stream clone bundles are ' | 530 b'only uncompressed stream clone bundles are ' |
501 requirements = set(requires.rstrip(b'\0').split(b',')) | 548 requirements = set(requires.rstrip(b'\0').split(b',')) |
502 | 549 |
503 return filecount, bytecount, requirements | 550 return filecount, bytecount, requirements |
504 | 551 |
505 | 552 |
506 def applybundlev1(repo, fp): | 553 def applybundlev1(repo, fp) -> None: |
507 """Apply the content from a stream clone bundle version 1. | 554 """Apply the content from a stream clone bundle version 1. |
508 | 555 |
509 We assume the 4 byte header has been read and validated and the file handle | 556 We assume the 4 byte header has been read and validated and the file handle |
510 is at the 2 byte compression identifier. | 557 is at the 2 byte compression identifier. |
511 """ | 558 """ |
531 | 578 |
532 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle | 579 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle |
533 readers to perform bundle type-specific functionality. | 580 readers to perform bundle type-specific functionality. |
534 """ | 581 """ |
535 | 582 |
536 def __init__(self, fh): | 583 def __init__(self, fh) -> None: |
537 self._fh = fh | 584 self._fh = fh |
538 | 585 |
539 def apply(self, repo): | 586 def apply(self, repo) -> None: |
540 return applybundlev1(repo, self._fh) | 587 return applybundlev1(repo, self._fh) |
541 | 588 |
542 | 589 |
543 # type of file to stream | 590 # type of file to stream |
544 _fileappend = 0 # append only file | 591 _fileappend = 0 # append only file |
548 _srcstore = b's' # store (svfs) | 595 _srcstore = b's' # store (svfs) |
549 _srccache = b'c' # cache (cache) | 596 _srccache = b'c' # cache (cache) |
550 | 597 |
551 | 598 |
552 # This is it's own function so extensions can override it. | 599 # This is it's own function so extensions can override it. |
553 def _walkstreamfullstorefiles(repo): | 600 def _walkstreamfullstorefiles(repo) -> list[bytes]: |
554 """list snapshot file from the store""" | 601 """list snapshot file from the store""" |
555 fnames = [] | 602 fnames = [] |
556 if not repo.publishing(): | 603 if not repo.publishing(): |
557 fnames.append(b'phaseroots') | 604 fnames.append(b'phaseroots') |
558 return fnames | 605 return fnames |
585 # usage. The Windows value of 2 is actually 1 file open at a time, due to | 632 # usage. The Windows value of 2 is actually 1 file open at a time, due to |
586 # the `flush_count = self.MAX_OPEN // 2` and `self.MAX_OPEN - 1` threshold | 633 # the `flush_count = self.MAX_OPEN // 2` and `self.MAX_OPEN - 1` threshold |
587 # for flushing to disk in __call__(). | 634 # for flushing to disk in __call__(). |
588 MAX_OPEN = 2 if pycompat.iswindows else 100 | 635 MAX_OPEN = 2 if pycompat.iswindows else 100 |
589 | 636 |
590 def __init__(self): | 637 def __init__(self) -> None: |
591 self._counter = 0 | 638 self._counter = 0 |
592 self._volatile_fps = None | 639 self._volatile_fps = None |
593 self._copies = None | 640 self._copies = None |
594 self._dst_dir = None | 641 self._dst_dir = None |
595 | 642 |
615 self._dst_dir = None | 662 self._dst_dir = None |
616 assert self._volatile_fps is None | 663 assert self._volatile_fps is None |
617 assert self._copies is None | 664 assert self._copies is None |
618 assert self._dst_dir is None | 665 assert self._dst_dir is None |
619 | 666 |
620 def _init_tmp_copies(self): | 667 def _init_tmp_copies(self) -> None: |
621 """prepare a temporary directory to save volatile files | 668 """prepare a temporary directory to save volatile files |
622 | 669 |
623 This will be used as backup if we have too many files open""" | 670 This will be used as backup if we have too many files open""" |
624 assert 0 < self._counter | 671 assert 0 < self._counter |
625 assert self._copies is None | 672 assert self._copies is None |
626 assert self._dst_dir is None | 673 assert self._dst_dir is None |
627 self._copies = {} | 674 self._copies = {} |
628 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-') | 675 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-') |
629 | 676 |
630 def _flush_some_on_disk(self): | 677 def _flush_some_on_disk(self) -> None: |
631 """move some of the open files to tempory files on disk""" | 678 """move some of the open files to tempory files on disk""" |
632 if self._copies is None: | 679 if self._copies is None: |
633 self._init_tmp_copies() | 680 self._init_tmp_copies() |
634 flush_count = self.MAX_OPEN // 2 | 681 flush_count = self.MAX_OPEN // 2 |
635 for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]: | 682 for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]: |
644 fp.seek(0) | 691 fp.seek(0) |
645 bck.write(fp.read()) | 692 bck.write(fp.read()) |
646 del self._volatile_fps[src] | 693 del self._volatile_fps[src] |
647 fp.close() | 694 fp.close() |
648 | 695 |
649 def _keep_one(self, src): | 696 def _keep_one(self, src: bytes) -> int: |
650 """preserve an open file handle for a given path""" | 697 """preserve an open file handle for a given path""" |
651 # store the file quickly to ensure we close it if any error happens | 698 # store the file quickly to ensure we close it if any error happens |
652 _, fp = self._volatile_fps[src] = (None, open(src, 'rb')) | 699 _, fp = self._volatile_fps[src] = (None, open(src, 'rb')) |
653 fp.seek(0, os.SEEK_END) | 700 fp.seek(0, os.SEEK_END) |
654 size = fp.tell() | 701 size = fp.tell() |
655 self._volatile_fps[src] = (size, fp) | 702 self._volatile_fps[src] = (size, fp) |
656 | 703 return size |
657 def __call__(self, src): | 704 |
705 def __call__(self, src: bytes) -> None: | |
658 """preserve the volatile file at src""" | 706 """preserve the volatile file at src""" |
659 assert 0 < self._counter | 707 assert 0 < self._counter |
660 if len(self._volatile_fps) >= (self.MAX_OPEN - 1): | 708 if len(self._volatile_fps) >= (self.MAX_OPEN - 1): |
661 self._flush_some_on_disk() | 709 self._flush_some_on_disk() |
662 self._keep_one(src) | 710 self._keep_one(src) |
711 | |
712 def try_keep(self, src: bytes) -> Optional[int]: | |
713 """record a volatile file and returns it size | |
714 | |
715 return None if the file does not exists. | |
716 | |
717 Used for cache file that are not lock protected. | |
718 """ | |
719 assert 0 < self._counter | |
720 if len(self._volatile_fps) >= (self.MAX_OPEN - 1): | |
721 self._flush_some_on_disk() | |
722 try: | |
723 return self._keep_one(src) | |
724 except OSError as err: | |
725 if err.errno not in (errno.ENOENT, errno.EPERM): | |
726 raise | |
727 return None | |
663 | 728 |
664 @contextlib.contextmanager | 729 @contextlib.contextmanager |
665 def open(self, src): | 730 def open(self, src): |
666 assert 0 < self._counter | 731 assert 0 < self._counter |
667 entry = self._volatile_fps.get(src) | 732 entry = self._volatile_fps.get(src) |
699 # | 764 # |
700 # this assert is duplicated (from _makemap) as author might think this is | 765 # this assert is duplicated (from _makemap) as author might think this is |
701 # fine, while this is really not fine. | 766 # fine, while this is really not fine. |
702 if repo.vfs in vfsmap.values(): | 767 if repo.vfs in vfsmap.values(): |
703 raise error.ProgrammingError( | 768 raise error.ProgrammingError( |
704 b'repo.vfs must not be added to vfsmap for security reasons' | 769 'repo.vfs must not be added to vfsmap for security reasons' |
705 ) | 770 ) |
706 | 771 |
707 # translate the vfs one | 772 # translate the vfs one |
708 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] | 773 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] |
774 _test_sync_point_walk_1_2(repo) | |
709 | 775 |
710 max_linkrev = len(repo) | 776 max_linkrev = len(repo) |
711 file_count = totalfilesize = 0 | 777 file_count = totalfilesize = 0 |
712 with util.nogc(): | 778 with VolatileManager() as volatiles: |
713 # record the expected size of every file | |
714 for k, vfs, e in entries: | |
715 for f in e.files(): | |
716 file_count += 1 | |
717 totalfilesize += f.file_size(vfs) | |
718 | |
719 progress = repo.ui.makeprogress( | |
720 _(b'bundle'), total=totalfilesize, unit=_(b'bytes') | |
721 ) | |
722 progress.update(0) | |
723 with VolatileManager() as volatiles, progress: | |
724 # make sure we preserve volatile files | 779 # make sure we preserve volatile files |
725 for k, vfs, e in entries: | 780 with util.nogc(): |
726 for f in e.files(): | 781 # record the expected size of every file |
727 if f.is_volatile: | 782 for k, vfs, e in entries: |
728 volatiles(vfs.join(f.unencoded_path)) | 783 e.preserve_volatiles(vfs, volatiles) |
729 # the first yield release the lock on the repository | 784 for f in e.files(): |
730 yield file_count, totalfilesize | 785 file_count += 1 |
731 totalbytecount = 0 | 786 totalfilesize += f.file_size(vfs) |
732 | 787 |
733 for src, vfs, e in entries: | 788 progress = repo.ui.makeprogress( |
734 entry_streams = e.get_streams( | 789 _(b'bundle'), total=totalfilesize, unit=_(b'bytes') |
735 repo=repo, | 790 ) |
736 vfs=vfs, | 791 progress.update(0) |
737 volatiles=volatiles, | 792 with progress: |
738 max_changeset=max_linkrev, | 793 # the first yield release the lock on the repository |
739 preserve_file_count=True, | 794 yield file_count, totalfilesize |
740 ) | 795 totalbytecount = 0 |
741 for name, stream, size in entry_streams: | 796 |
742 yield src | 797 for src, vfs, e in entries: |
743 yield util.uvarintencode(len(name)) | 798 entry_streams = e.get_streams( |
744 yield util.uvarintencode(size) | 799 repo=repo, |
745 yield name | 800 vfs=vfs, |
746 bytecount = 0 | 801 volatiles=volatiles, |
747 for chunk in stream: | 802 max_changeset=max_linkrev, |
748 bytecount += len(chunk) | 803 preserve_file_count=True, |
749 totalbytecount += len(chunk) | 804 ) |
750 progress.update(totalbytecount) | 805 for name, stream, size in entry_streams: |
751 yield chunk | 806 yield src |
752 if bytecount != size: | 807 yield util.uvarintencode(len(name)) |
753 # Would most likely be caused by a race due to `hg | 808 yield util.uvarintencode(size) |
754 # strip` or a revlog split | 809 yield name |
755 msg = _( | 810 bytecount = 0 |
756 b'clone could only read %d bytes from %s, but ' | 811 for chunk in stream: |
757 b'expected %d bytes' | 812 bytecount += len(chunk) |
758 ) | 813 totalbytecount += len(chunk) |
759 raise error.Abort(msg % (bytecount, name, size)) | 814 progress.update(totalbytecount) |
760 | 815 yield chunk |
761 | 816 if bytecount != size: |
762 def _emit3(repo, entries): | 817 # Would most likely be caused by a race due to `hg |
818 # strip` or a revlog split | |
819 msg = _( | |
820 b'clone could only read %d bytes from %s, but ' | |
821 b'expected %d bytes' | |
822 ) | |
823 raise error.Abort(msg % (bytecount, name, size)) | |
824 | |
825 | |
826 def _emit3(repo, entries) -> Iterator[bytes | None]: | |
763 """actually emit the stream bundle (v3)""" | 827 """actually emit the stream bundle (v3)""" |
764 vfsmap = _makemap(repo) | 828 vfsmap = _makemap(repo) |
765 # we keep repo.vfs out of the map on purpose, ther are too many dangers | 829 # we keep repo.vfs out of the map on purpose, ther are too many dangers |
766 # there (eg: .hg/hgrc), | 830 # there (eg: .hg/hgrc), |
767 # | 831 # |
768 # this assert is duplicated (from _makemap) as authors might think this is | 832 # this assert is duplicated (from _makemap) as authors might think this is |
769 # fine, while this is really not fine. | 833 # fine, while this is really not fine. |
770 if repo.vfs in vfsmap.values(): | 834 if repo.vfs in vfsmap.values(): |
771 raise error.ProgrammingError( | 835 raise error.ProgrammingError( |
772 b'repo.vfs must not be added to vfsmap for security reasons' | 836 'repo.vfs must not be added to vfsmap for security reasons' |
773 ) | 837 ) |
774 | 838 |
775 # translate the vfs once | 839 # translate the vfs once |
776 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] | 840 # we only turn this into a list for the `_test_sync`, this is not ideal |
777 total_entry_count = len(entries) | 841 base_entries = list(entries) |
778 | 842 _test_sync_point_walk_1_2(repo) |
779 max_linkrev = len(repo) | 843 entries = [] |
780 progress = repo.ui.makeprogress( | 844 with VolatileManager() as volatiles: |
781 _(b'bundle'), | |
782 total=total_entry_count, | |
783 unit=_(b'entry'), | |
784 ) | |
785 progress.update(0) | |
786 with VolatileManager() as volatiles, progress: | |
787 # make sure we preserve volatile files | 845 # make sure we preserve volatile files |
788 for k, vfs, e in entries: | 846 for vfs_key, e in base_entries: |
847 vfs = vfsmap[vfs_key] | |
848 any_files = True | |
789 if e.maybe_volatile: | 849 if e.maybe_volatile: |
850 any_files = False | |
851 e.preserve_volatiles(vfs, volatiles) | |
790 for f in e.files(): | 852 for f in e.files(): |
791 if f.is_volatile: | 853 if f.is_volatile: |
792 # record the expected size under lock | 854 # record the expected size under lock |
793 f.file_size(vfs) | 855 f.file_size(vfs) |
794 volatiles(vfs.join(f.unencoded_path)) | 856 any_files = True |
857 if any_files: | |
858 entries.append((vfs_key, vfsmap[vfs_key], e)) | |
859 | |
860 total_entry_count = len(entries) | |
861 | |
862 max_linkrev = len(repo) | |
863 progress = repo.ui.makeprogress( | |
864 _(b'bundle'), | |
865 total=total_entry_count, | |
866 unit=_(b'entry'), | |
867 ) | |
868 progress.update(0) | |
795 # the first yield release the lock on the repository | 869 # the first yield release the lock on the repository |
796 yield None | 870 yield None |
797 | 871 with progress: |
798 yield util.uvarintencode(total_entry_count) | 872 yield util.uvarintencode(total_entry_count) |
799 | 873 |
800 for src, vfs, e in entries: | 874 for src, vfs, e in entries: |
801 entry_streams = e.get_streams( | 875 entry_streams = e.get_streams( |
802 repo=repo, | 876 repo=repo, |
803 vfs=vfs, | 877 vfs=vfs, |
804 volatiles=volatiles, | 878 volatiles=volatiles, |
805 max_changeset=max_linkrev, | 879 max_changeset=max_linkrev, |
806 ) | 880 ) |
807 yield util.uvarintencode(len(entry_streams)) | 881 yield util.uvarintencode(len(entry_streams)) |
808 for name, stream, size in entry_streams: | 882 for name, stream, size in entry_streams: |
809 yield src | 883 yield src |
810 yield util.uvarintencode(len(name)) | 884 yield util.uvarintencode(len(name)) |
811 yield util.uvarintencode(size) | 885 yield util.uvarintencode(size) |
812 yield name | 886 yield name |
813 yield from stream | 887 yield from stream |
814 progress.increment() | 888 progress.increment() |
815 | 889 |
816 | 890 |
817 def _test_sync_point_walk_1(repo): | 891 def _test_sync_point_walk_1_2(repo): |
818 """a function for synchronisation during tests""" | 892 """a function for synchronisation during tests |
819 | 893 |
820 | 894 Triggered after gather entry, but before starting to process/preserve them |
821 def _test_sync_point_walk_2(repo): | 895 under lock. |
822 """a function for synchronisation during tests""" | 896 |
823 | 897 (on v1 is triggered before the actual walk start) |
824 | 898 """ |
825 def _entries_walk(repo, includes, excludes, includeobsmarkers): | 899 |
900 | |
901 def _test_sync_point_walk_3(repo): | |
902 """a function for synchronisation during tests | |
903 | |
904 Triggered right before releasing the lock, but after computing what need | |
905 needed to compute under lock. | |
906 """ | |
907 | |
908 | |
909 def _test_sync_point_walk_4(repo): | |
910 """a function for synchronisation during tests | |
911 | |
912 Triggered right after releasing the lock. | |
913 """ | |
914 | |
915 | |
916 # not really a StoreEntry, but close enough | |
917 class CacheEntry(store.SimpleStoreEntry): | |
918 """Represent an entry for Cache files | |
919 | |
920 It has special logic to preserve cache file early and accept optional | |
921 presence. | |
922 | |
923 | |
924 (Yes... this is not really a StoreEntry, but close enough. We could have a | |
925 BaseEntry base class, bbut the store one would be identical) | |
926 """ | |
927 | |
928 def __init__(self, entry_path) -> None: | |
929 super().__init__( | |
930 entry_path, | |
931 # we will directly deal with that in `setup_cache_file` | |
932 is_volatile=True, | |
933 ) | |
934 | |
935 def preserve_volatiles(self, vfs, volatiles) -> None: | |
936 self._file_size = volatiles.try_keep(vfs.join(self._entry_path)) | |
937 if self._file_size is None: | |
938 self._files = [] | |
939 else: | |
940 assert self._is_volatile | |
941 self._files = [ | |
942 CacheFile( | |
943 unencoded_path=self._entry_path, | |
944 file_size=self._file_size, | |
945 is_volatile=self._is_volatile, | |
946 ) | |
947 ] | |
948 | |
949 def files(self) -> list[store.StoreFile]: | |
950 if self._files is None: | |
951 self._files = [ | |
952 CacheFile( | |
953 unencoded_path=self._entry_path, | |
954 is_volatile=self._is_volatile, | |
955 ) | |
956 ] | |
957 return super().files() | |
958 | |
959 | |
960 class CacheFile(store.StoreFile): | |
961 # inform the "copy/hardlink" version that this file might be missing | |
962 # without consequences. | |
963 optional: bool = True | |
964 | |
965 | |
966 def _entries_walk(repo, includes, excludes, includeobsmarkers: bool): | |
826 """emit a seris of files information useful to clone a repo | 967 """emit a seris of files information useful to clone a repo |
827 | 968 |
828 return (vfs-key, entry) iterator | 969 return (vfs-key, entry) iterator |
829 | 970 |
830 Where `entry` is StoreEntry. (used even for cache entries) | 971 Where `entry` is StoreEntry. (used even for cache entries) |
849 yield (_srcstore, entry) | 990 yield (_srcstore, entry) |
850 | 991 |
851 for name in cacheutil.cachetocopy(repo): | 992 for name in cacheutil.cachetocopy(repo): |
852 if repo.cachevfs.exists(name): | 993 if repo.cachevfs.exists(name): |
853 # not really a StoreEntry, but close enough | 994 # not really a StoreEntry, but close enough |
854 entry = store.SimpleStoreEntry( | 995 yield (_srccache, CacheEntry(entry_path=name)) |
855 entry_path=name, | 996 |
856 is_volatile=True, | 997 |
857 ) | 998 def generatev2(repo, includes, excludes, includeobsmarkers: bool): |
858 yield (_srccache, entry) | |
859 | |
860 | |
861 def generatev2(repo, includes, excludes, includeobsmarkers): | |
862 """Emit content for version 2 of a streaming clone. | 999 """Emit content for version 2 of a streaming clone. |
863 | 1000 |
864 the data stream consists the following entries: | 1001 the data stream consists the following entries: |
865 1) A char representing the file destination (eg: store or cache) | 1002 1) A char representing the file destination (eg: store or cache) |
866 2) A varint containing the length of the filename | 1003 2) A varint containing the length of the filename |
878 repo, | 1015 repo, |
879 includes=includes, | 1016 includes=includes, |
880 excludes=excludes, | 1017 excludes=excludes, |
881 includeobsmarkers=includeobsmarkers, | 1018 includeobsmarkers=includeobsmarkers, |
882 ) | 1019 ) |
883 | |
884 chunks = _emit2(repo, entries) | 1020 chunks = _emit2(repo, entries) |
885 first = next(chunks) | 1021 first = next(chunks) |
886 file_count, total_file_size = first | 1022 file_count, total_file_size = first |
887 _test_sync_point_walk_1(repo) | 1023 _test_sync_point_walk_3(repo) |
888 _test_sync_point_walk_2(repo) | 1024 _test_sync_point_walk_4(repo) |
889 | 1025 |
890 return file_count, total_file_size, chunks | 1026 return file_count, total_file_size, chunks |
891 | 1027 |
892 | 1028 |
893 def generatev3(repo, includes, excludes, includeobsmarkers): | 1029 def generatev3( |
1030 repo, includes, excludes, includeobsmarkers: bool | |
1031 ) -> Iterator[bytes | None]: | |
894 """Emit content for version 3 of a streaming clone. | 1032 """Emit content for version 3 of a streaming clone. |
895 | 1033 |
896 the data stream consists the following: | 1034 the data stream consists the following: |
897 1) A varint E containing the number of entries (can be 0), then E entries follow | 1035 1) A varint E containing the number of entries (can be 0), then E entries follow |
898 2) For each entry: | 1036 2) For each entry: |
928 includeobsmarkers=includeobsmarkers, | 1066 includeobsmarkers=includeobsmarkers, |
929 ) | 1067 ) |
930 chunks = _emit3(repo, list(entries)) | 1068 chunks = _emit3(repo, list(entries)) |
931 first = next(chunks) | 1069 first = next(chunks) |
932 assert first is None | 1070 assert first is None |
933 _test_sync_point_walk_1(repo) | 1071 _test_sync_point_walk_3(repo) |
934 _test_sync_point_walk_2(repo) | 1072 _test_sync_point_walk_4(repo) |
935 | 1073 |
936 return chunks | 1074 return chunks |
937 | 1075 |
938 | 1076 |
939 @contextlib.contextmanager | 1077 @contextlib.contextmanager |
946 yield | 1084 yield |
947 else: | 1085 else: |
948 yield | 1086 yield |
949 | 1087 |
950 | 1088 |
951 def consumev2(repo, fp, filecount, filesize): | 1089 class V2Report: |
1090 """a small class to track the data we saw within the stream""" | |
1091 | |
1092 def __init__(self): | |
1093 self.byte_count = 0 | |
1094 | |
1095 | |
1096 def consumev2(repo, fp, filecount: int, filesize: int) -> None: | |
952 """Apply the contents from a version 2 streaming clone. | 1097 """Apply the contents from a version 2 streaming clone. |
953 | 1098 |
954 Data is read from an object that only needs to provide a ``read(size)`` | 1099 Data is read from an object that only needs to provide a ``read(size)`` |
955 method. | 1100 method. |
956 """ | 1101 """ |
957 with repo.lock(): | 1102 with repo.lock(): |
958 repo.ui.status( | 1103 repo.ui.status( |
959 _(b'%d files to transfer, %s of data\n') | 1104 _(b'%d files to transfer, %s of data\n') |
960 % (filecount, util.bytecount(filesize)) | 1105 % (filecount, util.bytecount(filesize)) |
961 ) | 1106 ) |
962 | 1107 progress = repo.ui.makeprogress( |
1108 _(b'clone'), | |
1109 total=filesize, | |
1110 unit=_(b'bytes'), | |
1111 ) | |
963 start = util.timer() | 1112 start = util.timer() |
964 progress = repo.ui.makeprogress( | 1113 report = V2Report() |
965 _(b'clone'), total=filesize, unit=_(b'bytes') | |
966 ) | |
967 progress.update(0) | |
968 | 1114 |
969 vfsmap = _makemap(repo) | 1115 vfsmap = _makemap(repo) |
970 # we keep repo.vfs out of the on purpose, ther are too many danger | 1116 # we keep repo.vfs out of the on purpose, ther are too many danger |
971 # there (eg: .hg/hgrc), | 1117 # there (eg: .hg/hgrc), |
972 # | 1118 # |
973 # this assert is duplicated (from _makemap) as author might think this | 1119 # this assert is duplicated (from _makemap) as author might think this |
974 # is fine, while this is really not fine. | 1120 # is fine, while this is really not fine. |
975 if repo.vfs in vfsmap.values(): | 1121 if repo.vfs in vfsmap.values(): |
976 raise error.ProgrammingError( | 1122 raise error.ProgrammingError( |
977 b'repo.vfs must not be added to vfsmap for security reasons' | 1123 'repo.vfs must not be added to vfsmap for security reasons' |
978 ) | 1124 ) |
979 | 1125 |
1126 cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu') | |
1127 mem_profile = scmutil.get_resource_profile(repo.ui, b'memory') | |
1128 threaded = repo.ui.configbool( | |
1129 b"worker", b"parallel-stream-bundle-processing" | |
1130 ) | |
1131 num_writer = repo.ui.configint( | |
1132 b"worker", | |
1133 b"parallel-stream-bundle-processing.num-writer", | |
1134 ) | |
1135 if num_writer <= 0: | |
1136 num_writer = DEFAULT_NUM_WRITER[cpu_profile] | |
1137 memory_target = repo.ui.configbytes( | |
1138 b"worker", | |
1139 b"parallel-stream-bundle-processing.memory-target", | |
1140 ) | |
1141 if memory_target < 0: | |
1142 memory_target = None | |
1143 elif memory_target == 0: | |
1144 memory_target = DEFAULT_MEMORY_TARGET[mem_profile] | |
980 with repo.transaction(b'clone'): | 1145 with repo.transaction(b'clone'): |
981 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) | 1146 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) |
982 with nested(*ctxs): | 1147 with nested(*ctxs): |
983 for i in range(filecount): | 1148 workers = [] |
984 src = util.readexactly(fp, 1) | 1149 info_queue = None |
985 vfs = vfsmap[src] | 1150 data_queue = None |
986 namelen = util.uvarintdecodestream(fp) | 1151 mark_used = None |
987 datalen = util.uvarintdecodestream(fp) | 1152 try: |
988 | 1153 if not threaded: |
989 name = util.readexactly(fp, namelen) | 1154 fc = _FileChunker |
990 | 1155 raw_data = fp |
991 if repo.ui.debugflag: | 1156 else: |
992 repo.ui.debug( | 1157 fc = _ThreadSafeFileChunker |
993 b'adding [%s] %s (%s)\n' | 1158 data_queue = _DataQueue(memory_target=memory_target) |
994 % (src, name, util.bytecount(datalen)) | 1159 if memory_target is not None: |
1160 mark_used = data_queue.mark_used | |
1161 raw_data = util.chunkbuffer(data_queue) | |
1162 | |
1163 w = threading.Thread( | |
1164 target=data_queue.fill_from, | |
1165 args=(fp,), | |
995 ) | 1166 ) |
996 | 1167 workers.append(w) |
997 with vfs(name, b'w') as ofp: | 1168 w.start() |
998 for chunk in util.filechunkiter(fp, limit=datalen): | 1169 files = _v2_parse_files( |
999 progress.increment(step=len(chunk)) | 1170 repo, |
1000 ofp.write(chunk) | 1171 raw_data, |
1172 vfsmap, | |
1173 filecount, | |
1174 progress, | |
1175 report, | |
1176 file_chunker=fc, | |
1177 mark_used=mark_used, | |
1178 ) | |
1179 if not threaded: | |
1180 _write_files(files) | |
1181 else: | |
1182 info_queue = _FileInfoQueue(files) | |
1183 | |
1184 for __ in range(num_writer): | |
1185 w = threading.Thread( | |
1186 target=_write_files, | |
1187 args=(info_queue,), | |
1188 ) | |
1189 workers.append(w) | |
1190 w.start() | |
1191 info_queue.fill() | |
1192 except: # re-raises | |
1193 if data_queue is not None: | |
1194 data_queue.abort() | |
1195 raise | |
1196 finally: | |
1197 # shut down all the workers | |
1198 if info_queue is not None: | |
1199 # this is strictly speaking one too many worker for | |
1200 # this queu, but closing too many is not a problem. | |
1201 info_queue.close(len(workers)) | |
1202 for w in workers: | |
1203 w.join() | |
1001 | 1204 |
1002 # force @filecache properties to be reloaded from | 1205 # force @filecache properties to be reloaded from |
1003 # streamclone-ed file at next access | 1206 # streamclone-ed file at next access |
1004 repo.invalidate(clearfilecache=True) | 1207 repo.invalidate(clearfilecache=True) |
1005 | 1208 |
1006 elapsed = util.timer() - start | 1209 progress.complete() |
1007 if elapsed <= 0: | 1210 # acknowledge the end of the bundle2 part, this help aligning |
1008 elapsed = 0.001 | 1211 # sequential and parallel behavior. |
1009 repo.ui.status( | 1212 remains = fp.read(1) |
1010 _(b'transferred %s in %.1f seconds (%s/sec)\n') | 1213 assert not remains |
1011 % ( | 1214 _report_transferred(repo, start, filecount, report.byte_count) |
1012 util.bytecount(progress.pos), | 1215 |
1013 elapsed, | 1216 |
1014 util.bytecount(progress.pos / elapsed), | 1217 # iterator of chunk of bytes that constitute a file content. |
1218 FileChunksT = Iterator[bytes] | |
1219 # Contains the information necessary to write stream file on disk | |
1220 FileInfoT = Tuple[ | |
1221 bytes, # real fs path | |
1222 Optional[int], # permission to give to chmod | |
1223 FileChunksT, # content | |
1224 ] | |
1225 | |
1226 | |
1227 class _Queue: | |
1228 """a reimplementation of queue.Queue which doesn't use thread.Condition""" | |
1229 | |
1230 def __init__(self): | |
1231 self._queue = collections.deque() | |
1232 | |
1233 # the "_lock" protect manipulation of the "_queue" deque | |
1234 # the "_wait" is used to have the "get" thread waits for the | |
1235 # "put" thread when the queue is empty. | |
1236 # | |
1237 # This is similar to the "threading.Condition", but without the absurd | |
1238 # slowness of the stdlib implementation. | |
1239 # | |
1240 # the "_wait" is always released while holding the "_lock". | |
1241 self._lock = threading.Lock() | |
1242 self._wait = threading.Lock() | |
1243 | |
1244 def put(self, item): | |
1245 with self._lock: | |
1246 self._queue.append(item) | |
1247 # if anyone is waiting on item, unblock it. | |
1248 if self._wait.locked(): | |
1249 self._wait.release() | |
1250 | |
1251 def get(self): | |
1252 with self._lock: | |
1253 while len(self._queue) == 0: | |
1254 # "arm" the waiting lock | |
1255 self._wait.acquire(blocking=False) | |
1256 # release the lock to let other touch the queue | |
1257 # (especially the put call we wait on) | |
1258 self._lock.release() | |
1259 # wait for for a `put` call to release the lock | |
1260 self._wait.acquire() | |
1261 # grab the lock to look at a possible available value | |
1262 self._lock.acquire() | |
1263 # disarm the lock if necessary. | |
1264 # | |
1265 # If the queue only constains one item, keep the _wait lock | |
1266 # armed, as there is no need to wake another waiter anyway. | |
1267 if self._wait.locked() and len(self._queue) > 1: | |
1268 self._wait.release() | |
1269 return self._queue.popleft() | |
1270 | |
1271 | |
1272 class _DataQueue: | |
1273 """A queue passing data from the bundle stream to other thread | |
1274 | |
1275 It has a "memory_target" optional parameter to avoid buffering too much | |
1276 information. The implementation is not exact and the memory target might be | |
1277 exceed for a time in some situation. | |
1278 """ | |
1279 | |
1280 def __init__(self, memory_target=None): | |
1281 self._q = _Queue() | |
1282 self._abort = False | |
1283 self._memory_target = memory_target | |
1284 if self._memory_target is not None and self._memory_target <= 0: | |
1285 raise error.ProgrammingError("memory target should be > 0") | |
1286 | |
1287 # the "_lock" protect manipulation of the _current_used" variable | |
1288 # the "_wait" is used to have the "reading" thread waits for the | |
1289 # "using" thread when the buffer is full. | |
1290 # | |
1291 # This is similar to the "threading.Condition", but without the absurd | |
1292 # slowness of the stdlib implementation. | |
1293 # | |
1294 # the "_wait" is always released while holding the "_lock". | |
1295 self._lock = threading.Lock() | |
1296 self._wait = threading.Lock() | |
1297 # only the stream reader touch this, it is find to touch without the lock | |
1298 self._current_read = 0 | |
1299 # do not touch this without the lock | |
1300 self._current_used = 0 | |
1301 | |
1302 def _has_free_space(self): | |
1303 """True if more data can be read without further exceeding memory target | |
1304 | |
1305 Must be called under the lock. | |
1306 """ | |
1307 if self._memory_target is None: | |
1308 # Ideally we should not even get into the locking business in that | |
1309 # case, but we keep the implementation simple for now. | |
1310 return True | |
1311 return (self._current_read - self._current_used) < self._memory_target | |
1312 | |
1313 def mark_used(self, offset): | |
1314 """Notify we have used the buffer up to "offset" | |
1315 | |
1316 This is meant to be used from another thread than the one filler the queue. | |
1317 """ | |
1318 if self._memory_target is not None: | |
1319 with self._lock: | |
1320 if offset > self._current_used: | |
1321 self._current_used = offset | |
1322 # If the reader is waiting for room, unblock it. | |
1323 if self._wait.locked() and self._has_free_space(): | |
1324 self._wait.release() | |
1325 | |
1326 def fill_from(self, data): | |
1327 """fill the data queue from a bundle2 part object | |
1328 | |
1329 This is meant to be called by the data reading thread | |
1330 """ | |
1331 q = self._q | |
1332 try: | |
1333 for item in data: | |
1334 self._current_read += len(item) | |
1335 q.put(item) | |
1336 if self._abort: | |
1337 break | |
1338 if self._memory_target is not None: | |
1339 with self._lock: | |
1340 while not self._has_free_space(): | |
1341 # make sure the _wait lock is locked | |
1342 # this is done under lock, so there case be no race with the release logic | |
1343 self._wait.acquire(blocking=False) | |
1344 self._lock.release() | |
1345 # acquiring the lock will block until some other thread release it. | |
1346 self._wait.acquire() | |
1347 # lets dive into the locked section again | |
1348 self._lock.acquire() | |
1349 # make sure we release the lock we just grabed if | |
1350 # needed. | |
1351 if self._wait.locked(): | |
1352 self._wait.release() | |
1353 finally: | |
1354 q.put(None) | |
1355 | |
1356 def __iter__(self): | |
1357 """Iterate over the bundle chunkgs | |
1358 | |
1359 This is meant to be called by the data parsing thread.""" | |
1360 q = self._q | |
1361 while (i := q.get()) is not None: | |
1362 yield i | |
1363 if self._abort: | |
1364 break | |
1365 | |
1366 def abort(self): | |
1367 """stop the data-reading thread and interrupt the comsuming iteration | |
1368 | |
1369 This is meant to be called on errors. | |
1370 """ | |
1371 self._abort = True | |
1372 self._q.put(None) | |
1373 if self._memory_target is not None: | |
1374 with self._lock: | |
1375 # make sure we unstuck the reader thread. | |
1376 if self._wait.locked(): | |
1377 self._wait.release() | |
1378 | |
1379 | |
1380 class _FileInfoQueue: | |
1381 """A thread-safe queue to passer parsed file information to the writers""" | |
1382 | |
1383 def __init__(self, info: Iterable[FileInfoT]): | |
1384 self._info = info | |
1385 self._q = _Queue() | |
1386 | |
1387 def fill(self): | |
1388 """iterate over the parsed information to file the queue | |
1389 | |
1390 This is meant to be call from the thread parsing the stream information. | |
1391 """ | |
1392 q = self._q | |
1393 for i in self._info: | |
1394 q.put(i) | |
1395 | |
1396 def close(self, number_worker): | |
1397 """signal all the workers that we no longer have any file info coming | |
1398 | |
1399 Called from the thread parsing the stream information (and/or the main | |
1400 thread if different). | |
1401 """ | |
1402 for __ in range(number_worker): | |
1403 self._q.put(None) | |
1404 | |
1405 def __iter__(self): | |
1406 """iterate over the available file info | |
1407 | |
1408 This is meant to be called from the writer threads. | |
1409 """ | |
1410 q = self._q | |
1411 while (i := q.get()) is not None: | |
1412 yield i | |
1413 | |
1414 | |
1415 class _FileChunker: | |
1416 """yield the chunk that constitute a file | |
1417 | |
1418 This class exists as the counterpart of the threaded version and | |
1419 would not be very useful on its own. | |
1420 """ | |
1421 | |
1422 def __init__( | |
1423 self, | |
1424 fp: bundle2mod.unbundlepart, | |
1425 data_len: int, | |
1426 progress: scmutil.progress, | |
1427 report: V2Report, | |
1428 mark_used: Optional[Callable[[int], None]] = None, | |
1429 ): | |
1430 self.report = report | |
1431 self.progress = progress | |
1432 self._chunks = util.filechunkiter(fp, limit=data_len) | |
1433 | |
1434 def fill(self) -> None: | |
1435 """Do nothing in non-threading context""" | |
1436 | |
1437 def __iter__(self) -> FileChunksT: | |
1438 for chunk in self._chunks: | |
1439 self.report.byte_count += len(chunk) | |
1440 self.progress.increment(step=len(chunk)) | |
1441 yield chunk | |
1442 | |
1443 | |
1444 class _ThreadSafeFileChunker(_FileChunker): | |
1445 """yield the chunk that constitute a file | |
1446 | |
1447 Make sure you call the "fill" function in the main thread to read the | |
1448 right data at the right time. | |
1449 """ | |
1450 | |
1451 def __init__( | |
1452 self, | |
1453 fp: bundle2mod.unbundlepart, | |
1454 data_len: int, | |
1455 progress: scmutil.progress, | |
1456 report: V2Report, | |
1457 mark_used: Optional[Callable[[int], None]] = None, | |
1458 ): | |
1459 super().__init__(fp, data_len, progress, report) | |
1460 self._fp = fp | |
1461 self._queue = _Queue() | |
1462 self._mark_used = mark_used | |
1463 | |
1464 def fill(self) -> None: | |
1465 """fill the file chunker queue with data read from the stream | |
1466 | |
1467 This is meant to be called from the thread parsing information (and | |
1468 consuming the stream data). | |
1469 """ | |
1470 try: | |
1471 for chunk in super().__iter__(): | |
1472 offset = self._fp.tell() | |
1473 self._queue.put((chunk, offset)) | |
1474 finally: | |
1475 self._queue.put(None) | |
1476 | |
1477 def __iter__(self) -> FileChunksT: | |
1478 """Iterate over all the file chunk | |
1479 | |
1480 This is meant to be called from the writer threads. | |
1481 """ | |
1482 while (info := self._queue.get()) is not None: | |
1483 chunk, offset = info | |
1484 if self._mark_used is not None: | |
1485 self._mark_used(offset) | |
1486 yield chunk | |
1487 | |
1488 | |
1489 def _trivial_file( | |
1490 chunk: bytes, | |
1491 mark_used: Optional[Callable[[int], None]], | |
1492 offset: int, | |
1493 ) -> FileChunksT: | |
1494 """used for single chunk file,""" | |
1495 if mark_used is not None: | |
1496 mark_used(offset) | |
1497 yield chunk | |
1498 | |
1499 | |
1500 def _v2_parse_files( | |
1501 repo, | |
1502 fp: bundle2mod.unbundlepart, | |
1503 vfs_map, | |
1504 file_count: int, | |
1505 progress: scmutil.progress, | |
1506 report: V2Report, | |
1507 file_chunker: Type[_FileChunker] = _FileChunker, | |
1508 mark_used: Optional[Callable[[int], None]] = None, | |
1509 ) -> Iterator[FileInfoT]: | |
1510 """do the "stream-parsing" part of stream v2 | |
1511 | |
1512 The parsed information are yield result for consumption by the "writer" | |
1513 """ | |
1514 known_dirs = set() # set of directory that we know to exists | |
1515 progress.update(0) | |
1516 for i in range(file_count): | |
1517 src = util.readexactly(fp, 1) | |
1518 namelen = util.uvarintdecodestream(fp) | |
1519 datalen = util.uvarintdecodestream(fp) | |
1520 | |
1521 name = util.readexactly(fp, namelen) | |
1522 | |
1523 if repo.ui.debugflag: | |
1524 repo.ui.debug( | |
1525 b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen)) | |
1015 ) | 1526 ) |
1016 ) | 1527 vfs = vfs_map[src] |
1017 progress.complete() | 1528 path, mode = vfs.prepare_streamed_file(name, known_dirs) |
1018 | 1529 if datalen <= util.DEFAULT_FILE_CHUNK: |
1019 | 1530 c = fp.read(datalen) |
1020 def consumev3(repo, fp): | 1531 offset = fp.tell() |
1532 report.byte_count += len(c) | |
1533 progress.increment(step=len(c)) | |
1534 chunks = _trivial_file(c, mark_used, offset) | |
1535 yield (path, mode, iter(chunks)) | |
1536 else: | |
1537 chunks = file_chunker( | |
1538 fp, | |
1539 datalen, | |
1540 progress, | |
1541 report, | |
1542 mark_used=mark_used, | |
1543 ) | |
1544 yield (path, mode, iter(chunks)) | |
1545 # make sure we read all the chunk before moving to the next file | |
1546 chunks.fill() | |
1547 | |
1548 | |
1549 def _write_files(info: Iterable[FileInfoT]): | |
1550 """write files from parsed data""" | |
1551 io_flags = os.O_WRONLY | os.O_CREAT | |
1552 if pycompat.iswindows: | |
1553 io_flags |= os.O_BINARY | |
1554 for path, mode, data in info: | |
1555 if mode is None: | |
1556 fd = os.open(path, io_flags) | |
1557 else: | |
1558 fd = os.open(path, io_flags, mode=mode) | |
1559 try: | |
1560 for chunk in data: | |
1561 written = os.write(fd, chunk) | |
1562 # write missing pieces if the write was interrupted | |
1563 while written < len(chunk): | |
1564 written = os.write(fd, chunk[written:]) | |
1565 finally: | |
1566 os.close(fd) | |
1567 | |
1568 | |
1569 def consumev3(repo, fp) -> None: | |
1021 """Apply the contents from a version 3 streaming clone. | 1570 """Apply the contents from a version 3 streaming clone. |
1022 | 1571 |
1023 Data is read from an object that only needs to provide a ``read(size)`` | 1572 Data is read from an object that only needs to provide a ``read(size)`` |
1024 method. | 1573 method. |
1025 """ | 1574 """ |
1043 # | 1592 # |
1044 # this assert is duplicated (from _makemap) as authors might think this | 1593 # this assert is duplicated (from _makemap) as authors might think this |
1045 # is fine, while this is really not fine. | 1594 # is fine, while this is really not fine. |
1046 if repo.vfs in vfsmap.values(): | 1595 if repo.vfs in vfsmap.values(): |
1047 raise error.ProgrammingError( | 1596 raise error.ProgrammingError( |
1048 b'repo.vfs must not be added to vfsmap for security reasons' | 1597 'repo.vfs must not be added to vfsmap for security reasons' |
1049 ) | 1598 ) |
1050 | 1599 total_file_count = 0 |
1051 with repo.transaction(b'clone'): | 1600 with repo.transaction(b'clone'): |
1052 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) | 1601 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) |
1053 with nested(*ctxs): | 1602 with nested(*ctxs): |
1054 for i in range(entrycount): | 1603 for i in range(entrycount): |
1055 filecount = util.uvarintdecodestream(fp) | 1604 filecount = util.uvarintdecodestream(fp) |
1056 if filecount == 0: | 1605 if filecount == 0: |
1057 if repo.ui.debugflag: | 1606 if repo.ui.debugflag: |
1058 repo.ui.debug(b'entry with no files [%d]\n' % (i)) | 1607 repo.ui.debug(b'entry with no files [%d]\n' % (i)) |
1608 total_file_count += filecount | |
1059 for i in range(filecount): | 1609 for i in range(filecount): |
1060 src = util.readexactly(fp, 1) | 1610 src = util.readexactly(fp, 1) |
1061 vfs = vfsmap[src] | 1611 vfs = vfsmap[src] |
1062 namelen = util.uvarintdecodestream(fp) | 1612 namelen = util.uvarintdecodestream(fp) |
1063 datalen = util.uvarintdecodestream(fp) | 1613 datalen = util.uvarintdecodestream(fp) |
1077 | 1627 |
1078 # force @filecache properties to be reloaded from | 1628 # force @filecache properties to be reloaded from |
1079 # streamclone-ed file at next access | 1629 # streamclone-ed file at next access |
1080 repo.invalidate(clearfilecache=True) | 1630 repo.invalidate(clearfilecache=True) |
1081 | 1631 |
1082 elapsed = util.timer() - start | |
1083 if elapsed <= 0: | |
1084 elapsed = 0.001 | |
1085 msg = _(b'transferred %s in %.1f seconds (%s/sec)\n') | |
1086 byte_count = util.bytecount(bytes_transferred) | |
1087 bytes_sec = util.bytecount(bytes_transferred / elapsed) | |
1088 msg %= (byte_count, elapsed, bytes_sec) | |
1089 repo.ui.status(msg) | |
1090 progress.complete() | 1632 progress.complete() |
1091 | 1633 _report_transferred(repo, start, total_file_count, bytes_transferred) |
1092 | 1634 |
1093 def applybundlev2(repo, fp, filecount, filesize, requirements): | 1635 |
1636 def applybundlev2( | |
1637 repo, fp, filecount: int, filesize: int, requirements: Iterable[bytes] | |
1638 ) -> None: | |
1094 from . import localrepo | 1639 from . import localrepo |
1095 | 1640 |
1096 missingreqs = [r for r in requirements if r not in repo.supported] | 1641 missingreqs = [r for r in requirements if r not in repo.supported] |
1097 if missingreqs: | 1642 if missingreqs: |
1098 raise error.Abort( | 1643 raise error.Abort( |
1099 _(b'unable to apply stream clone: unsupported format: %s') | 1644 _(b'unable to apply stream clone: unsupported format: %s') |
1100 % b', '.join(sorted(missingreqs)) | 1645 % b', '.join(sorted(missingreqs)) |
1101 ) | 1646 ) |
1102 | 1647 |
1103 consumev2(repo, fp, filecount, filesize) | 1648 with util.nogc(): |
1649 consumev2(repo, fp, filecount, filesize) | |
1104 | 1650 |
1105 repo.requirements = new_stream_clone_requirements( | 1651 repo.requirements = new_stream_clone_requirements( |
1106 repo.requirements, | 1652 repo.requirements, |
1107 requirements, | 1653 requirements, |
1108 ) | 1654 ) |
1111 ) | 1657 ) |
1112 scmutil.writereporequirements(repo) | 1658 scmutil.writereporequirements(repo) |
1113 nodemap.post_stream_cleanup(repo) | 1659 nodemap.post_stream_cleanup(repo) |
1114 | 1660 |
1115 | 1661 |
1116 def applybundlev3(repo, fp, requirements): | 1662 def applybundlev3(repo, fp, requirements: Iterable[bytes]) -> None: |
1117 from . import localrepo | 1663 from . import localrepo |
1118 | 1664 |
1119 missingreqs = [r for r in requirements if r not in repo.supported] | 1665 missingreqs = [r for r in requirements if r not in repo.supported] |
1120 if missingreqs: | 1666 if missingreqs: |
1121 msg = _(b'unable to apply stream clone: unsupported format: %s') | 1667 msg = _(b'unable to apply stream clone: unsupported format: %s') |
1133 ) | 1679 ) |
1134 scmutil.writereporequirements(repo) | 1680 scmutil.writereporequirements(repo) |
1135 nodemap.post_stream_cleanup(repo) | 1681 nodemap.post_stream_cleanup(repo) |
1136 | 1682 |
1137 | 1683 |
1138 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress): | 1684 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress) -> bool: |
1139 hardlink = [True] | 1685 hardlink = [True] |
1140 | 1686 |
1141 def copy_used(): | 1687 def copy_used(): |
1142 hardlink[0] = False | 1688 hardlink[0] = False |
1143 progress.topic = _(b'copying') | 1689 progress.topic = _(b'copying') |
1144 | 1690 |
1145 for k, path in entries: | 1691 for k, path, optional in entries: |
1146 src_vfs = src_vfs_map[k] | 1692 src_vfs = src_vfs_map[k] |
1147 dst_vfs = dst_vfs_map[k] | 1693 dst_vfs = dst_vfs_map[k] |
1148 src_path = src_vfs.join(path) | 1694 src_path = src_vfs.join(path) |
1149 dst_path = dst_vfs.join(path) | 1695 dst_path = dst_vfs.join(path) |
1150 # We cannot use dirname and makedirs of dst_vfs here because the store | 1696 # We cannot use dirname and makedirs of dst_vfs here because the store |
1152 dirname = os.path.dirname(dst_path) | 1698 dirname = os.path.dirname(dst_path) |
1153 if not os.path.exists(dirname): | 1699 if not os.path.exists(dirname): |
1154 util.makedirs(dirname) | 1700 util.makedirs(dirname) |
1155 dst_vfs.register_file(path) | 1701 dst_vfs.register_file(path) |
1156 # XXX we could use the #nb_bytes argument. | 1702 # XXX we could use the #nb_bytes argument. |
1157 util.copyfile( | 1703 try: |
1158 src_path, | 1704 util.copyfile( |
1159 dst_path, | 1705 src_path, |
1160 hardlink=hardlink[0], | 1706 dst_path, |
1161 no_hardlink_cb=copy_used, | 1707 hardlink=hardlink[0], |
1162 check_fs_hardlink=False, | 1708 no_hardlink_cb=copy_used, |
1163 ) | 1709 check_fs_hardlink=False, |
1710 ) | |
1711 except FileNotFoundError: | |
1712 if not optional: | |
1713 raise | |
1164 progress.increment() | 1714 progress.increment() |
1165 return hardlink[0] | 1715 return hardlink[0] |
1166 | 1716 |
1167 | 1717 |
1168 def local_copy(src_repo, dest_repo): | 1718 def local_copy(src_repo, dest_repo) -> None: |
1169 """copy all content from one local repository to another | 1719 """copy all content from one local repository to another |
1170 | 1720 |
1171 This is useful for local clone""" | 1721 This is useful for local clone""" |
1172 src_store_requirements = { | 1722 src_store_requirements = { |
1173 r | 1723 r |
1213 # and the other one without the lock. However, in the linking case, | 1763 # and the other one without the lock. However, in the linking case, |
1214 # this would also requires checks that nobody is appending any data | 1764 # this would also requires checks that nobody is appending any data |
1215 # to the files while we do the clone, so this is not done yet. We | 1765 # to the files while we do the clone, so this is not done yet. We |
1216 # could do this blindly when copying files. | 1766 # could do this blindly when copying files. |
1217 files = [ | 1767 files = [ |
1218 (vfs_key, f.unencoded_path) | 1768 (vfs_key, f.unencoded_path, f.optional) |
1219 for vfs_key, e in entries | 1769 for vfs_key, e in entries |
1220 for f in e.files() | 1770 for f in e.files() |
1221 ] | 1771 ] |
1222 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress) | 1772 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress) |
1223 | 1773 |