mercurial/exchangev2.py
changeset 39631 b9e453d683a1
parent 39629 a86d21e70b2b
child 39633 ff2de4f2eb3c
--- a/mercurial/exchangev2.py	Wed Sep 12 10:01:16 2018 -0700
+++ b/mercurial/exchangev2.py	Wed Sep 12 10:01:36 2018 -0700
@@ -7,10 +7,16 @@
 
 from __future__ import absolute_import
 
+import weakref
+
+from .i18n import _
 from .node import (
     nullid,
+    short,
 )
 from . import (
+    mdiff,
+    pycompat,
     setdiscovery,
 )
 
@@ -18,11 +24,15 @@
     """Pull using wire protocol version 2."""
     repo = pullop.repo
     remote = pullop.remote
+    tr = pullop.trmanager.transaction()
 
     # Figure out what needs to be fetched.
     common, fetch, remoteheads = _pullchangesetdiscovery(
         repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
 
+    pullheads = pullop.heads or remoteheads
+    _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
+
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""
 
@@ -53,3 +63,76 @@
     common.discard(nullid)
 
     return common, fetch, remoteheads
+
+def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
+    if not fetch:
+        return
+
+    # TODO consider adding a step here where we obtain the DAG shape first
+    # (or ask the server to slice changesets into chunks for us) so that
+    # we can perform multiple fetches in batches. This will facilitate
+    # resuming interrupted clones, higher server-side cache hit rates due
+    # to smaller segments, etc.
+    with remote.commandexecutor() as e:
+        objs = e.callcommand(b'changesetdata', {
+            b'noderange': [sorted(common), sorted(remoteheads)],
+            b'fields': {b'parents', b'revision'},
+        }).result()
+
+        # The context manager waits on all response data when exiting. So
+        # we need to remain in the context manager in order to stream data.
+        return _processchangesetdata(repo, tr, objs)
+
+def _processchangesetdata(repo, tr, objs):
+    repo.hook('prechangegroup', throw=True,
+              **pycompat.strkwargs(tr.hookargs))
+
+    urepo = repo.unfiltered()
+    cl = urepo.changelog
+
+    cl.delayupdate(tr)
+
+    # The first emitted object is a header describing the data that
+    # follows.
+    meta = next(objs)
+
+    progress = repo.ui.makeprogress(_('changesets'),
+                                    unit=_('chunks'),
+                                    total=meta.get(b'totalitems'))
+
+    def linkrev(node):
+        repo.ui.debug('add changeset %s\n' % short(node))
+        # Linkrev for changelog is always self.
+        return len(cl)
+
+    def onchangeset(cl, node):
+        progress.increment()
+
+    # addgroup() expects a 7-tuple describing revisions. This normalizes
+    # the wire data to that format.
+    def iterrevisions():
+        for cset in objs:
+            assert b'revisionsize' in cset
+            data = next(objs)
+
+            yield (
+                cset[b'node'],
+                cset[b'parents'][0],
+                cset[b'parents'][1],
+                # Linknode is always itself for changesets.
+                cset[b'node'],
+                # We always send full revisions. So delta base is not set.
+                nullid,
+                mdiff.trivialdiffheader(len(data)) + data,
+                # Flags not yet supported.
+                0,
+            )
+
+    added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
+                        addrevisioncb=onchangeset)
+
+    progress.complete()
+
+    return {
+        'added': added,
+    }