--- a/mercurial/exchangev2.py Wed Sep 12 10:01:16 2018 -0700
+++ b/mercurial/exchangev2.py Wed Sep 12 10:01:36 2018 -0700
@@ -7,10 +7,16 @@
from __future__ import absolute_import
+import weakref
+
+from .i18n import _
from .node import (
nullid,
+ short,
)
from . import (
+ mdiff,
+ pycompat,
setdiscovery,
)
@@ -18,11 +24,15 @@
"""Pull using wire protocol version 2."""
repo = pullop.repo
remote = pullop.remote
+ tr = pullop.trmanager.transaction()
# Figure out what needs to be fetched.
common, fetch, remoteheads = _pullchangesetdiscovery(
repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
+ pullheads = pullop.heads or remoteheads
+ _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
+
def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
@@ -53,3 +63,76 @@
common.discard(nullid)
return common, fetch, remoteheads
+
+def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
+ if not fetch:
+ return
+
+ # TODO consider adding a step here where we obtain the DAG shape first
+ # (or ask the server to slice changesets into chunks for us) so that
+ # we can perform multiple fetches in batches. This will facilitate
+ # resuming interrupted clones, higher server-side cache hit rates due
+ # to smaller segments, etc.
+ with remote.commandexecutor() as e:
+ objs = e.callcommand(b'changesetdata', {
+ b'noderange': [sorted(common), sorted(remoteheads)],
+ b'fields': {b'parents', b'revision'},
+ }).result()
+
+ # The context manager waits on all response data when exiting. So
+ # we need to remain in the context manager in order to stream data.
+ return _processchangesetdata(repo, tr, objs)
+
+def _processchangesetdata(repo, tr, objs):
+ repo.hook('prechangegroup', throw=True,
+ **pycompat.strkwargs(tr.hookargs))
+
+ urepo = repo.unfiltered()
+ cl = urepo.changelog
+
+ cl.delayupdate(tr)
+
+ # The first emitted object is a header describing the data that
+ # follows.
+ meta = next(objs)
+
+ progress = repo.ui.makeprogress(_('changesets'),
+ unit=_('chunks'),
+ total=meta.get(b'totalitems'))
+
+ def linkrev(node):
+ repo.ui.debug('add changeset %s\n' % short(node))
+ # Linkrev for changelog is always self.
+ return len(cl)
+
+ def onchangeset(cl, node):
+ progress.increment()
+
+ # addgroup() expects a 7-tuple describing revisions. This normalizes
+ # the wire data to that format.
+ def iterrevisions():
+ for cset in objs:
+ assert b'revisionsize' in cset
+ data = next(objs)
+
+ yield (
+ cset[b'node'],
+ cset[b'parents'][0],
+ cset[b'parents'][1],
+ # Linknode is always itself for changesets.
+ cset[b'node'],
+ # We always send full revisions. So delta base is not set.
+ nullid,
+ mdiff.trivialdiffheader(len(data)) + data,
+ # Flags not yet supported.
+ 0,
+ )
+
+ added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
+ addrevisioncb=onchangeset)
+
+ progress.complete()
+
+ return {
+ 'added': added,
+ }