comparison mercurial/bundlerepo.py @ 52844:dabec69bd6fc

refactor: split `bundlerepo.getremotechanges` into three pieces Split the function in three and introduce an explicit class that tracks state. The goal here is to improve error handling, in particular so it's easy to wrap things in a try/with without having to grow an already-significant indentation level. The code can probably be cleaned up further, but I don't want to bite off a piece too large to chew.
author Arseniy Alekseyev <aalekseyev@janestreet.com>
date Mon, 03 Feb 2025 18:26:26 +0000
parents 5cc8deb96b48
children b25208655467
comparison
equal deleted inserted replaced
52843:189491cea922 52844:dabec69bd6fc
615 615
616 def release(self): 616 def release(self):
617 raise NotImplementedError 617 raise NotImplementedError
618 618
619 619
620 class getremotechanges_state_tracker:
621 def __init__(self, peer, incoming, common, rheads):
622 # bundle file to be deleted
623 self.bundle = None
624 # bundle repo to be closed
625 self.bundlerepo = None
626 # remote peer connection to be closed
627 self.peer = peer
628 # if peer is remote, `localrepo` will be equal to
629 # `bundlerepo` when bundle is created.
630 self.localrepo = peer.local()
631
632 # `incoming` operation parameters:
633 # (these get mutated by _create_bundle)
634 self.incoming = incoming
635 self.common = common
636 self.rheads = rheads
637
638 def cleanup(self):
639 try:
640 if self.bundlerepo:
641 self.bundlerepo.close()
642 finally:
643 try:
644 if self.bundle:
645 os.unlink(self.bundle)
646 finally:
647 self.peer.close()
648
649
620 def getremotechanges( 650 def getremotechanges(
621 ui, repo, peer, onlyheads=None, bundlename=None, force=False 651 ui, repo, peer, onlyheads=None, bundlename=None, force=False
622 ): 652 ):
623 """obtains a bundle of changes incoming from peer 653 """obtains a bundle of changes incoming from peer
624 654
650 return repo, [], peer.close 680 return repo, [], peer.close
651 681
652 commonset = set(common) 682 commonset = set(common)
653 rheads = [x for x in rheads if x not in commonset] 683 rheads = [x for x in rheads if x not in commonset]
654 684
655 bundle = None 685 state = getremotechanges_state_tracker(peer, incoming, common, rheads)
656 bundlerepo = None 686 csets = _getremotechanges_slowpath(
657 localrepo = peer.local() 687 state, ui, repo, bundlename=bundlename, onlyheads=onlyheads
658 if bundlename or not localrepo: 688 )
689
690 return (state.localrepo, csets, state.cleanup)
691
692
693 def _create_bundle(state, ui, repo, bundlename, onlyheads):
694 if True:
659 # create a bundle (uncompressed if peer repo is not local) 695 # create a bundle (uncompressed if peer repo is not local)
660 696
661 # developer config: devel.legacy.exchange 697 # developer config: devel.legacy.exchange
662 legexc = ui.configlist(b'devel', b'legacy.exchange') 698 legexc = ui.configlist(b'devel', b'legacy.exchange')
663 forcebundle1 = b'bundle2' not in legexc and b'bundle1' in legexc 699 forcebundle1 = b'bundle2' not in legexc and b'bundle1' in legexc
664 canbundle2 = ( 700 canbundle2 = (
665 not forcebundle1 701 not forcebundle1
666 and peer.capable(b'getbundle') 702 and state.peer.capable(b'getbundle')
667 and peer.capable(b'bundle2') 703 and state.peer.capable(b'bundle2')
668 ) 704 )
669 if canbundle2: 705 if canbundle2:
670 with peer.commandexecutor() as e: 706 with state.peer.commandexecutor() as e:
671 b2 = e.callcommand( 707 b2 = e.callcommand(
672 b'getbundle', 708 b'getbundle',
673 { 709 {
674 b'source': b'incoming', 710 b'source': b'incoming',
675 b'common': common, 711 b'common': state.common,
676 b'heads': rheads, 712 b'heads': state.rheads,
677 b'bundlecaps': exchange.caps20to10( 713 b'bundlecaps': exchange.caps20to10(
678 repo, role=b'client' 714 repo, role=b'client'
679 ), 715 ),
680 b'cg': True, 716 b'cg': True,
681 }, 717 },
682 ).result() 718 ).result()
683 719
684 fname = bundle = changegroup.writechunks( 720 fname = state.bundle = changegroup.writechunks(
685 ui, b2._forwardchunks(), bundlename 721 ui, b2._forwardchunks(), bundlename
686 ) 722 )
687 else: 723 else:
688 if peer.capable(b'getbundle'): 724 if state.peer.capable(b'getbundle'):
689 with peer.commandexecutor() as e: 725 with state.peer.commandexecutor() as e:
690 cg = e.callcommand( 726 cg = e.callcommand(
691 b'getbundle', 727 b'getbundle',
692 { 728 {
693 b'source': b'incoming', 729 b'source': b'incoming',
694 b'common': common, 730 b'common': state.common,
695 b'heads': rheads, 731 b'heads': state.rheads,
696 }, 732 },
697 ).result() 733 ).result()
698 elif onlyheads is None and not peer.capable(b'changegroupsubset'): 734 elif onlyheads is None and not state.peer.capable(
735 b'changegroupsubset'
736 ):
699 # compat with older servers when pulling all remote heads 737 # compat with older servers when pulling all remote heads
700 738
701 with peer.commandexecutor() as e: 739 with state.peer.commandexecutor() as e:
702 cg = e.callcommand( 740 cg = e.callcommand(
703 b'changegroup', 741 b'changegroup',
704 { 742 {
705 b'nodes': incoming, 743 b'nodes': state.incoming,
706 b'source': b'incoming', 744 b'source': b'incoming',
707 }, 745 },
708 ).result() 746 ).result()
709 747
710 rheads = None 748 state.rheads = None
711 else: 749 else:
712 with peer.commandexecutor() as e: 750 with state.peer.commandexecutor() as e:
713 cg = e.callcommand( 751 cg = e.callcommand(
714 b'changegroupsubset', 752 b'changegroupsubset',
715 { 753 {
716 b'bases': incoming, 754 b'bases': state.incoming,
717 b'heads': rheads, 755 b'heads': state.rheads,
718 b'source': b'incoming', 756 b'source': b'incoming',
719 }, 757 },
720 ).result() 758 ).result()
721 759
722 if localrepo: 760 if state.localrepo:
723 bundletype = b"HG10BZ" 761 bundletype = b"HG10BZ"
724 else: 762 else:
725 bundletype = b"HG10UN" 763 bundletype = b"HG10UN"
726 fname = bundle = bundle2.writebundle(ui, cg, bundlename, bundletype) 764 fname = state.bundle = bundle2.writebundle(
765 ui, cg, bundlename, bundletype
766 )
727 # keep written bundle? 767 # keep written bundle?
728 if bundlename: 768 if bundlename:
729 bundle = None 769 state.bundle = None
730 if not localrepo: 770
771 return fname
772
773
774 def _getremotechanges_slowpath(
775 state, ui, repo, bundlename=None, onlyheads=None
776 ):
777 if bundlename or not state.localrepo:
778 fname = _create_bundle(
779 state,
780 ui,
781 repo,
782 bundlename=bundlename,
783 onlyheads=onlyheads,
784 )
785 if not state.localrepo:
731 # use the created uncompressed bundlerepo 786 # use the created uncompressed bundlerepo
732 localrepo = bundlerepo = makebundlerepository( 787 state.localrepo = state.bundlerepo = makebundlerepository(
733 repo.baseui, repo.root, fname 788 repo.baseui, repo.root, fname
734 ) 789 )
735 790
736 # this repo contains local and peer now, so filter out local again 791 # this repo contains local and peer now, so filter out local again
737 common = repo.heads() 792 state.common = repo.heads()
738 if localrepo: 793
794 if state.localrepo:
739 # Part of common may be remotely filtered 795 # Part of common may be remotely filtered
740 # So use an unfiltered version 796 # So use an unfiltered version
741 # The discovery process probably need cleanup to avoid that 797 # The discovery process probably need cleanup to avoid that
742 localrepo = localrepo.unfiltered() 798 state.localrepo = state.localrepo.unfiltered()
743 799
744 csets = localrepo.changelog.findmissing(common, rheads) 800 csets = state.localrepo.changelog.findmissing(state.common, state.rheads)
745 801
746 if bundlerepo: 802 if state.bundlerepo:
803 bundlerepo = state.bundlerepo
747 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev :]] 804 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev :]]
748 805
749 with peer.commandexecutor() as e: 806 with state.peer.commandexecutor() as e:
750 remotephases = e.callcommand( 807 remotephases = e.callcommand(
751 b'listkeys', 808 b'listkeys',
752 { 809 {
753 b'namespace': b'phases', 810 b'namespace': b'phases',
754 }, 811 },
755 ).result() 812 ).result()
756 813
757 pullop = exchange.pulloperation( 814 pullop = exchange.pulloperation(
758 bundlerepo, peer, path=None, heads=reponodes 815 bundlerepo, state.peer, path=None, heads=reponodes
759 ) 816 )
760 pullop.trmanager = bundletransactionmanager() 817 pullop.trmanager = bundletransactionmanager()
761 exchange._pullapplyphases(pullop, remotephases) 818 exchange._pullapplyphases(pullop, remotephases)
762 819
763 def cleanup(): 820 return csets
764 if bundlerepo:
765 bundlerepo.close()
766 if bundle:
767 os.unlink(bundle)
768 peer.close()
769
770 return (localrepo, csets, cleanup)