mercurial/exchangev2.py
changeset 39631 b9e453d683a1
parent 39629 a86d21e70b2b
child 39633 ff2de4f2eb3c
equal deleted inserted replaced
39630:9c2c77c73f23 39631:b9e453d683a1
     5 # This software may be used and distributed according to the terms of the
     5 # This software may be used and distributed according to the terms of the
     6 # GNU General Public License version 2 or any later version.
     6 # GNU General Public License version 2 or any later version.
     7 
     7 
     8 from __future__ import absolute_import
     8 from __future__ import absolute_import
     9 
     9 
       
    10 import weakref
       
    11 
       
    12 from .i18n import _
    10 from .node import (
    13 from .node import (
    11     nullid,
    14     nullid,
       
    15     short,
    12 )
    16 )
    13 from . import (
    17 from . import (
       
    18     mdiff,
       
    19     pycompat,
    14     setdiscovery,
    20     setdiscovery,
    15 )
    21 )
    16 
    22 
    17 def pull(pullop):
    23 def pull(pullop):
    18     """Pull using wire protocol version 2."""
    24     """Pull using wire protocol version 2."""
    19     repo = pullop.repo
    25     repo = pullop.repo
    20     remote = pullop.remote
    26     remote = pullop.remote
       
    27     tr = pullop.trmanager.transaction()
    21 
    28 
    22     # Figure out what needs to be fetched.
    29     # Figure out what needs to be fetched.
    23     common, fetch, remoteheads = _pullchangesetdiscovery(
    30     common, fetch, remoteheads = _pullchangesetdiscovery(
    24         repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
    31         repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
       
    32 
       
    33     pullheads = pullop.heads or remoteheads
       
    34     _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
    25 
    35 
    26 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
    36 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
    27     """Determine which changesets need to be pulled."""
    37     """Determine which changesets need to be pulled."""
    28 
    38 
    29     if heads:
    39     if heads:
    51             fetch = []
    61             fetch = []
    52 
    62 
    53     common.discard(nullid)
    63     common.discard(nullid)
    54 
    64 
    55     return common, fetch, remoteheads
    65     return common, fetch, remoteheads
       
    66 
       
    67 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
       
    68     if not fetch:
       
    69         return
       
    70 
       
    71     # TODO consider adding a step here where we obtain the DAG shape first
       
    72     # (or ask the server to slice changesets into chunks for us) so that
       
    73     # we can perform multiple fetches in batches. This will facilitate
       
    74     # resuming interrupted clones, higher server-side cache hit rates due
       
    75     # to smaller segments, etc.
       
    76     with remote.commandexecutor() as e:
       
    77         objs = e.callcommand(b'changesetdata', {
       
    78             b'noderange': [sorted(common), sorted(remoteheads)],
       
    79             b'fields': {b'parents', b'revision'},
       
    80         }).result()
       
    81 
       
    82         # The context manager waits on all response data when exiting. So
       
    83         # we need to remain in the context manager in order to stream data.
       
    84         return _processchangesetdata(repo, tr, objs)
       
    85 
       
    86 def _processchangesetdata(repo, tr, objs):
       
    87     repo.hook('prechangegroup', throw=True,
       
    88               **pycompat.strkwargs(tr.hookargs))
       
    89 
       
    90     urepo = repo.unfiltered()
       
    91     cl = urepo.changelog
       
    92 
       
    93     cl.delayupdate(tr)
       
    94 
       
    95     # The first emitted object is a header describing the data that
       
    96     # follows.
       
    97     meta = next(objs)
       
    98 
       
    99     progress = repo.ui.makeprogress(_('changesets'),
       
   100                                     unit=_('chunks'),
       
   101                                     total=meta.get(b'totalitems'))
       
   102 
       
   103     def linkrev(node):
       
   104         repo.ui.debug('add changeset %s\n' % short(node))
       
   105         # Linkrev for changelog is always self.
       
   106         return len(cl)
       
   107 
       
   108     def onchangeset(cl, node):
       
   109         progress.increment()
       
   110 
       
   111     # addgroup() expects a 7-tuple describing revisions. This normalizes
       
   112     # the wire data to that format.
       
   113     def iterrevisions():
       
   114         for cset in objs:
       
   115             assert b'revisionsize' in cset
       
   116             data = next(objs)
       
   117 
       
   118             yield (
       
   119                 cset[b'node'],
       
   120                 cset[b'parents'][0],
       
   121                 cset[b'parents'][1],
       
   122                 # Linknode is always itself for changesets.
       
   123                 cset[b'node'],
       
   124                 # We always send full revisions. So delta base is not set.
       
   125                 nullid,
       
   126                 mdiff.trivialdiffheader(len(data)) + data,
       
   127                 # Flags not yet supported.
       
   128                 0,
       
   129             )
       
   130 
       
   131     added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
       
   132                         addrevisioncb=onchangeset)
       
   133 
       
   134     progress.complete()
       
   135 
       
   136     return {
       
   137         'added': added,
       
   138     }