Mercurial > public > mercurial-scm > hg
comparison mercurial/bundlerepo.py @ 52844:dabec69bd6fc
refactor: split `bundlerepo.getremotechanges` into three pieces
Split the function in three and introduce an explicit class that tracks state.
The goal here is to improve error handling, in particular so it's easy to
wrap things in a try/with without having to grow an already-significant
indentation level.
The code can probably be cleaned up further, but I don't want to bite off
a piece too large to chew.
author | Arseniy Alekseyev <aalekseyev@janestreet.com> |
---|---|
date | Mon, 03 Feb 2025 18:26:26 +0000 |
parents | 5cc8deb96b48 |
children | b25208655467 |
comparison
equal
deleted
inserted
replaced
52843:189491cea922 | 52844:dabec69bd6fc |
---|---|
615 | 615 |
616 def release(self): | 616 def release(self): |
617 raise NotImplementedError | 617 raise NotImplementedError |
618 | 618 |
619 | 619 |
620 class getremotechanges_state_tracker: | |
621 def __init__(self, peer, incoming, common, rheads): | |
622 # bundle file to be deleted | |
623 self.bundle = None | |
624 # bundle repo to be closed | |
625 self.bundlerepo = None | |
626 # remote peer connection to be closed | |
627 self.peer = peer | |
628 # if peer is remote, `localrepo` will be equal to | |
629 # `bundlerepo` when bundle is created. | |
630 self.localrepo = peer.local() | |
631 | |
632 # `incoming` operation parameters: | |
633 # (these get mutated by _create_bundle) | |
634 self.incoming = incoming | |
635 self.common = common | |
636 self.rheads = rheads | |
637 | |
638 def cleanup(self): | |
639 try: | |
640 if self.bundlerepo: | |
641 self.bundlerepo.close() | |
642 finally: | |
643 try: | |
644 if self.bundle: | |
645 os.unlink(self.bundle) | |
646 finally: | |
647 self.peer.close() | |
648 | |
649 | |
620 def getremotechanges( | 650 def getremotechanges( |
621 ui, repo, peer, onlyheads=None, bundlename=None, force=False | 651 ui, repo, peer, onlyheads=None, bundlename=None, force=False |
622 ): | 652 ): |
623 """obtains a bundle of changes incoming from peer | 653 """obtains a bundle of changes incoming from peer |
624 | 654 |
650 return repo, [], peer.close | 680 return repo, [], peer.close |
651 | 681 |
652 commonset = set(common) | 682 commonset = set(common) |
653 rheads = [x for x in rheads if x not in commonset] | 683 rheads = [x for x in rheads if x not in commonset] |
654 | 684 |
655 bundle = None | 685 state = getremotechanges_state_tracker(peer, incoming, common, rheads) |
656 bundlerepo = None | 686 csets = _getremotechanges_slowpath( |
657 localrepo = peer.local() | 687 state, ui, repo, bundlename=bundlename, onlyheads=onlyheads |
658 if bundlename or not localrepo: | 688 ) |
689 | |
690 return (state.localrepo, csets, state.cleanup) | |
691 | |
692 | |
693 def _create_bundle(state, ui, repo, bundlename, onlyheads): | |
694 if True: | |
659 # create a bundle (uncompressed if peer repo is not local) | 695 # create a bundle (uncompressed if peer repo is not local) |
660 | 696 |
661 # developer config: devel.legacy.exchange | 697 # developer config: devel.legacy.exchange |
662 legexc = ui.configlist(b'devel', b'legacy.exchange') | 698 legexc = ui.configlist(b'devel', b'legacy.exchange') |
663 forcebundle1 = b'bundle2' not in legexc and b'bundle1' in legexc | 699 forcebundle1 = b'bundle2' not in legexc and b'bundle1' in legexc |
664 canbundle2 = ( | 700 canbundle2 = ( |
665 not forcebundle1 | 701 not forcebundle1 |
666 and peer.capable(b'getbundle') | 702 and state.peer.capable(b'getbundle') |
667 and peer.capable(b'bundle2') | 703 and state.peer.capable(b'bundle2') |
668 ) | 704 ) |
669 if canbundle2: | 705 if canbundle2: |
670 with peer.commandexecutor() as e: | 706 with state.peer.commandexecutor() as e: |
671 b2 = e.callcommand( | 707 b2 = e.callcommand( |
672 b'getbundle', | 708 b'getbundle', |
673 { | 709 { |
674 b'source': b'incoming', | 710 b'source': b'incoming', |
675 b'common': common, | 711 b'common': state.common, |
676 b'heads': rheads, | 712 b'heads': state.rheads, |
677 b'bundlecaps': exchange.caps20to10( | 713 b'bundlecaps': exchange.caps20to10( |
678 repo, role=b'client' | 714 repo, role=b'client' |
679 ), | 715 ), |
680 b'cg': True, | 716 b'cg': True, |
681 }, | 717 }, |
682 ).result() | 718 ).result() |
683 | 719 |
684 fname = bundle = changegroup.writechunks( | 720 fname = state.bundle = changegroup.writechunks( |
685 ui, b2._forwardchunks(), bundlename | 721 ui, b2._forwardchunks(), bundlename |
686 ) | 722 ) |
687 else: | 723 else: |
688 if peer.capable(b'getbundle'): | 724 if state.peer.capable(b'getbundle'): |
689 with peer.commandexecutor() as e: | 725 with state.peer.commandexecutor() as e: |
690 cg = e.callcommand( | 726 cg = e.callcommand( |
691 b'getbundle', | 727 b'getbundle', |
692 { | 728 { |
693 b'source': b'incoming', | 729 b'source': b'incoming', |
694 b'common': common, | 730 b'common': state.common, |
695 b'heads': rheads, | 731 b'heads': state.rheads, |
696 }, | 732 }, |
697 ).result() | 733 ).result() |
698 elif onlyheads is None and not peer.capable(b'changegroupsubset'): | 734 elif onlyheads is None and not state.peer.capable( |
735 b'changegroupsubset' | |
736 ): | |
699 # compat with older servers when pulling all remote heads | 737 # compat with older servers when pulling all remote heads |
700 | 738 |
701 with peer.commandexecutor() as e: | 739 with state.peer.commandexecutor() as e: |
702 cg = e.callcommand( | 740 cg = e.callcommand( |
703 b'changegroup', | 741 b'changegroup', |
704 { | 742 { |
705 b'nodes': incoming, | 743 b'nodes': state.incoming, |
706 b'source': b'incoming', | 744 b'source': b'incoming', |
707 }, | 745 }, |
708 ).result() | 746 ).result() |
709 | 747 |
710 rheads = None | 748 state.rheads = None |
711 else: | 749 else: |
712 with peer.commandexecutor() as e: | 750 with state.peer.commandexecutor() as e: |
713 cg = e.callcommand( | 751 cg = e.callcommand( |
714 b'changegroupsubset', | 752 b'changegroupsubset', |
715 { | 753 { |
716 b'bases': incoming, | 754 b'bases': state.incoming, |
717 b'heads': rheads, | 755 b'heads': state.rheads, |
718 b'source': b'incoming', | 756 b'source': b'incoming', |
719 }, | 757 }, |
720 ).result() | 758 ).result() |
721 | 759 |
722 if localrepo: | 760 if state.localrepo: |
723 bundletype = b"HG10BZ" | 761 bundletype = b"HG10BZ" |
724 else: | 762 else: |
725 bundletype = b"HG10UN" | 763 bundletype = b"HG10UN" |
726 fname = bundle = bundle2.writebundle(ui, cg, bundlename, bundletype) | 764 fname = state.bundle = bundle2.writebundle( |
765 ui, cg, bundlename, bundletype | |
766 ) | |
727 # keep written bundle? | 767 # keep written bundle? |
728 if bundlename: | 768 if bundlename: |
729 bundle = None | 769 state.bundle = None |
730 if not localrepo: | 770 |
771 return fname | |
772 | |
773 | |
774 def _getremotechanges_slowpath( | |
775 state, ui, repo, bundlename=None, onlyheads=None | |
776 ): | |
777 if bundlename or not state.localrepo: | |
778 fname = _create_bundle( | |
779 state, | |
780 ui, | |
781 repo, | |
782 bundlename=bundlename, | |
783 onlyheads=onlyheads, | |
784 ) | |
785 if not state.localrepo: | |
731 # use the created uncompressed bundlerepo | 786 # use the created uncompressed bundlerepo |
732 localrepo = bundlerepo = makebundlerepository( | 787 state.localrepo = state.bundlerepo = makebundlerepository( |
733 repo.baseui, repo.root, fname | 788 repo.baseui, repo.root, fname |
734 ) | 789 ) |
735 | 790 |
736 # this repo contains local and peer now, so filter out local again | 791 # this repo contains local and peer now, so filter out local again |
737 common = repo.heads() | 792 state.common = repo.heads() |
738 if localrepo: | 793 |
794 if state.localrepo: | |
739 # Part of common may be remotely filtered | 795 # Part of common may be remotely filtered |
740 # So use an unfiltered version | 796 # So use an unfiltered version |
741 # The discovery process probably need cleanup to avoid that | 797 # The discovery process probably need cleanup to avoid that |
742 localrepo = localrepo.unfiltered() | 798 state.localrepo = state.localrepo.unfiltered() |
743 | 799 |
744 csets = localrepo.changelog.findmissing(common, rheads) | 800 csets = state.localrepo.changelog.findmissing(state.common, state.rheads) |
745 | 801 |
746 if bundlerepo: | 802 if state.bundlerepo: |
803 bundlerepo = state.bundlerepo | |
747 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev :]] | 804 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev :]] |
748 | 805 |
749 with peer.commandexecutor() as e: | 806 with state.peer.commandexecutor() as e: |
750 remotephases = e.callcommand( | 807 remotephases = e.callcommand( |
751 b'listkeys', | 808 b'listkeys', |
752 { | 809 { |
753 b'namespace': b'phases', | 810 b'namespace': b'phases', |
754 }, | 811 }, |
755 ).result() | 812 ).result() |
756 | 813 |
757 pullop = exchange.pulloperation( | 814 pullop = exchange.pulloperation( |
758 bundlerepo, peer, path=None, heads=reponodes | 815 bundlerepo, state.peer, path=None, heads=reponodes |
759 ) | 816 ) |
760 pullop.trmanager = bundletransactionmanager() | 817 pullop.trmanager = bundletransactionmanager() |
761 exchange._pullapplyphases(pullop, remotephases) | 818 exchange._pullapplyphases(pullop, remotephases) |
762 | 819 |
763 def cleanup(): | 820 return csets |
764 if bundlerepo: | |
765 bundlerepo.close() | |
766 if bundle: | |
767 os.unlink(bundle) | |
768 peer.close() | |
769 | |
770 return (localrepo, csets, cleanup) |