mercurial/bundlerepo.py
changeset 52844 dabec69bd6fc
parent 52643 5cc8deb96b48
child 52845 b25208655467
equal deleted inserted replaced
52843:189491cea922 52844:dabec69bd6fc
   615 
   615 
   616     def release(self):
   616     def release(self):
   617         raise NotImplementedError
   617         raise NotImplementedError
   618 
   618 
   619 
   619 
       
   620 class getremotechanges_state_tracker:
       
   621     def __init__(self, peer, incoming, common, rheads):
       
   622         # bundle file to be deleted
       
   623         self.bundle = None
       
   624         # bundle repo to be closed
       
   625         self.bundlerepo = None
       
   626         # remote peer connection to be closed
       
   627         self.peer = peer
       
   628         # if peer is remote, `localrepo` will be equal to
       
   629         # `bundlerepo` when bundle is created.
       
   630         self.localrepo = peer.local()
       
   631 
       
   632         # `incoming` operation parameters:
       
   633         # (these get mutated by _create_bundle)
       
   634         self.incoming = incoming
       
   635         self.common = common
       
   636         self.rheads = rheads
       
   637 
       
   638     def cleanup(self):
       
   639         try:
       
   640             if self.bundlerepo:
       
   641                 self.bundlerepo.close()
       
   642         finally:
       
   643             try:
       
   644                 if self.bundle:
       
   645                     os.unlink(self.bundle)
       
   646             finally:
       
   647                 self.peer.close()
       
   648 
       
   649 
   620 def getremotechanges(
   650 def getremotechanges(
   621     ui, repo, peer, onlyheads=None, bundlename=None, force=False
   651     ui, repo, peer, onlyheads=None, bundlename=None, force=False
   622 ):
   652 ):
   623     """obtains a bundle of changes incoming from peer
   653     """obtains a bundle of changes incoming from peer
   624 
   654 
   650         return repo, [], peer.close
   680         return repo, [], peer.close
   651 
   681 
   652     commonset = set(common)
   682     commonset = set(common)
   653     rheads = [x for x in rheads if x not in commonset]
   683     rheads = [x for x in rheads if x not in commonset]
   654 
   684 
   655     bundle = None
   685     state = getremotechanges_state_tracker(peer, incoming, common, rheads)
   656     bundlerepo = None
   686     csets = _getremotechanges_slowpath(
   657     localrepo = peer.local()
   687         state, ui, repo, bundlename=bundlename, onlyheads=onlyheads
   658     if bundlename or not localrepo:
   688     )
       
   689 
       
   690     return (state.localrepo, csets, state.cleanup)
       
   691 
       
   692 
       
   693 def _create_bundle(state, ui, repo, bundlename, onlyheads):
       
   694     if True:
   659         # create a bundle (uncompressed if peer repo is not local)
   695         # create a bundle (uncompressed if peer repo is not local)
   660 
   696 
   661         # developer config: devel.legacy.exchange
   697         # developer config: devel.legacy.exchange
   662         legexc = ui.configlist(b'devel', b'legacy.exchange')
   698         legexc = ui.configlist(b'devel', b'legacy.exchange')
   663         forcebundle1 = b'bundle2' not in legexc and b'bundle1' in legexc
   699         forcebundle1 = b'bundle2' not in legexc and b'bundle1' in legexc
   664         canbundle2 = (
   700         canbundle2 = (
   665             not forcebundle1
   701             not forcebundle1
   666             and peer.capable(b'getbundle')
   702             and state.peer.capable(b'getbundle')
   667             and peer.capable(b'bundle2')
   703             and state.peer.capable(b'bundle2')
   668         )
   704         )
   669         if canbundle2:
   705         if canbundle2:
   670             with peer.commandexecutor() as e:
   706             with state.peer.commandexecutor() as e:
   671                 b2 = e.callcommand(
   707                 b2 = e.callcommand(
   672                     b'getbundle',
   708                     b'getbundle',
   673                     {
   709                     {
   674                         b'source': b'incoming',
   710                         b'source': b'incoming',
   675                         b'common': common,
   711                         b'common': state.common,
   676                         b'heads': rheads,
   712                         b'heads': state.rheads,
   677                         b'bundlecaps': exchange.caps20to10(
   713                         b'bundlecaps': exchange.caps20to10(
   678                             repo, role=b'client'
   714                             repo, role=b'client'
   679                         ),
   715                         ),
   680                         b'cg': True,
   716                         b'cg': True,
   681                     },
   717                     },
   682                 ).result()
   718                 ).result()
   683 
   719 
   684                 fname = bundle = changegroup.writechunks(
   720                 fname = state.bundle = changegroup.writechunks(
   685                     ui, b2._forwardchunks(), bundlename
   721                     ui, b2._forwardchunks(), bundlename
   686                 )
   722                 )
   687         else:
   723         else:
   688             if peer.capable(b'getbundle'):
   724             if state.peer.capable(b'getbundle'):
   689                 with peer.commandexecutor() as e:
   725                 with state.peer.commandexecutor() as e:
   690                     cg = e.callcommand(
   726                     cg = e.callcommand(
   691                         b'getbundle',
   727                         b'getbundle',
   692                         {
   728                         {
   693                             b'source': b'incoming',
   729                             b'source': b'incoming',
   694                             b'common': common,
   730                             b'common': state.common,
   695                             b'heads': rheads,
   731                             b'heads': state.rheads,
   696                         },
   732                         },
   697                     ).result()
   733                     ).result()
   698             elif onlyheads is None and not peer.capable(b'changegroupsubset'):
   734             elif onlyheads is None and not state.peer.capable(
       
   735                 b'changegroupsubset'
       
   736             ):
   699                 # compat with older servers when pulling all remote heads
   737                 # compat with older servers when pulling all remote heads
   700 
   738 
   701                 with peer.commandexecutor() as e:
   739                 with state.peer.commandexecutor() as e:
   702                     cg = e.callcommand(
   740                     cg = e.callcommand(
   703                         b'changegroup',
   741                         b'changegroup',
   704                         {
   742                         {
   705                             b'nodes': incoming,
   743                             b'nodes': state.incoming,
   706                             b'source': b'incoming',
   744                             b'source': b'incoming',
   707                         },
   745                         },
   708                     ).result()
   746                     ).result()
   709 
   747 
   710                 rheads = None
   748                 state.rheads = None
   711             else:
   749             else:
   712                 with peer.commandexecutor() as e:
   750                 with state.peer.commandexecutor() as e:
   713                     cg = e.callcommand(
   751                     cg = e.callcommand(
   714                         b'changegroupsubset',
   752                         b'changegroupsubset',
   715                         {
   753                         {
   716                             b'bases': incoming,
   754                             b'bases': state.incoming,
   717                             b'heads': rheads,
   755                             b'heads': state.rheads,
   718                             b'source': b'incoming',
   756                             b'source': b'incoming',
   719                         },
   757                         },
   720                     ).result()
   758                     ).result()
   721 
   759 
   722             if localrepo:
   760             if state.localrepo:
   723                 bundletype = b"HG10BZ"
   761                 bundletype = b"HG10BZ"
   724             else:
   762             else:
   725                 bundletype = b"HG10UN"
   763                 bundletype = b"HG10UN"
   726             fname = bundle = bundle2.writebundle(ui, cg, bundlename, bundletype)
   764             fname = state.bundle = bundle2.writebundle(
       
   765                 ui, cg, bundlename, bundletype
       
   766             )
   727         # keep written bundle?
   767         # keep written bundle?
   728         if bundlename:
   768         if bundlename:
   729             bundle = None
   769             state.bundle = None
   730         if not localrepo:
   770 
       
   771         return fname
       
   772 
       
   773 
       
   774 def _getremotechanges_slowpath(
       
   775     state, ui, repo, bundlename=None, onlyheads=None
       
   776 ):
       
   777     if bundlename or not state.localrepo:
       
   778         fname = _create_bundle(
       
   779             state,
       
   780             ui,
       
   781             repo,
       
   782             bundlename=bundlename,
       
   783             onlyheads=onlyheads,
       
   784         )
       
   785         if not state.localrepo:
   731             # use the created uncompressed bundlerepo
   786             # use the created uncompressed bundlerepo
   732             localrepo = bundlerepo = makebundlerepository(
   787             state.localrepo = state.bundlerepo = makebundlerepository(
   733                 repo.baseui, repo.root, fname
   788                 repo.baseui, repo.root, fname
   734             )
   789             )
   735 
   790 
   736             # this repo contains local and peer now, so filter out local again
   791             # this repo contains local and peer now, so filter out local again
   737             common = repo.heads()
   792             state.common = repo.heads()
   738     if localrepo:
   793 
       
   794     if state.localrepo:
   739         # Part of common may be remotely filtered
   795         # Part of common may be remotely filtered
   740         # So use an unfiltered version
   796         # So use an unfiltered version
   741         # The discovery process probably need cleanup to avoid that
   797         # The discovery process probably need cleanup to avoid that
   742         localrepo = localrepo.unfiltered()
   798         state.localrepo = state.localrepo.unfiltered()
   743 
   799 
   744     csets = localrepo.changelog.findmissing(common, rheads)
   800     csets = state.localrepo.changelog.findmissing(state.common, state.rheads)
   745 
   801 
   746     if bundlerepo:
   802     if state.bundlerepo:
       
   803         bundlerepo = state.bundlerepo
   747         reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev :]]
   804         reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev :]]
   748 
   805 
   749         with peer.commandexecutor() as e:
   806         with state.peer.commandexecutor() as e:
   750             remotephases = e.callcommand(
   807             remotephases = e.callcommand(
   751                 b'listkeys',
   808                 b'listkeys',
   752                 {
   809                 {
   753                     b'namespace': b'phases',
   810                     b'namespace': b'phases',
   754                 },
   811                 },
   755             ).result()
   812             ).result()
   756 
   813 
   757         pullop = exchange.pulloperation(
   814         pullop = exchange.pulloperation(
   758             bundlerepo, peer, path=None, heads=reponodes
   815             bundlerepo, state.peer, path=None, heads=reponodes
   759         )
   816         )
   760         pullop.trmanager = bundletransactionmanager()
   817         pullop.trmanager = bundletransactionmanager()
   761         exchange._pullapplyphases(pullop, remotephases)
   818         exchange._pullapplyphases(pullop, remotephases)
   762 
   819 
   763     def cleanup():
   820     return csets
   764         if bundlerepo:
       
   765             bundlerepo.close()
       
   766         if bundle:
       
   767             os.unlink(bundle)
       
   768         peer.close()
       
   769 
       
   770     return (localrepo, csets, cleanup)