diff -r 9c2c77c73f23 -r b9e453d683a1 mercurial/exchangev2.py --- a/mercurial/exchangev2.py Wed Sep 12 10:01:16 2018 -0700 +++ b/mercurial/exchangev2.py Wed Sep 12 10:01:36 2018 -0700 @@ -7,10 +7,16 @@ from __future__ import absolute_import +import weakref + +from .i18n import _ from .node import ( nullid, + short, ) from . import ( + mdiff, + pycompat, setdiscovery, ) @@ -18,11 +24,15 @@ """Pull using wire protocol version 2.""" repo = pullop.repo remote = pullop.remote + tr = pullop.trmanager.transaction() # Figure out what needs to be fetched. common, fetch, remoteheads = _pullchangesetdiscovery( repo, remote, pullop.heads, abortwhenunrelated=pullop.force) + pullheads = pullop.heads or remoteheads + _fetchchangesets(repo, tr, remote, common, fetch, pullheads) + def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): """Determine which changesets need to be pulled.""" @@ -53,3 +63,76 @@ common.discard(nullid) return common, fetch, remoteheads + +def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads): + if not fetch: + return + + # TODO consider adding a step here where we obtain the DAG shape first + # (or ask the server to slice changesets into chunks for us) so that + # we can perform multiple fetches in batches. This will facilitate + # resuming interrupted clones, higher server-side cache hit rates due + # to smaller segments, etc. + with remote.commandexecutor() as e: + objs = e.callcommand(b'changesetdata', { + b'noderange': [sorted(common), sorted(remoteheads)], + b'fields': {b'parents', b'revision'}, + }).result() + + # The context manager waits on all response data when exiting. So + # we need to remain in the context manager in order to stream data. + return _processchangesetdata(repo, tr, objs) + +def _processchangesetdata(repo, tr, objs): + repo.hook('prechangegroup', throw=True, + **pycompat.strkwargs(tr.hookargs)) + + urepo = repo.unfiltered() + cl = urepo.changelog + + cl.delayupdate(tr) + + # The first emitted object is a header describing the data that + # follows. + meta = next(objs) + + progress = repo.ui.makeprogress(_('changesets'), + unit=_('chunks'), + total=meta.get(b'totalitems')) + + def linkrev(node): + repo.ui.debug('add changeset %s\n' % short(node)) + # Linkrev for changelog is always self. + return len(cl) + + def onchangeset(cl, node): + progress.increment() + + # addgroup() expects a 7-tuple describing revisions. This normalizes + # the wire data to that format. + def iterrevisions(): + for cset in objs: + assert b'revisionsize' in cset + data = next(objs) + + yield ( + cset[b'node'], + cset[b'parents'][0], + cset[b'parents'][1], + # Linknode is always itself for changesets. + cset[b'node'], + # We always send full revisions. So delta base is not set. + nullid, + mdiff.trivialdiffheader(len(data)) + data, + # Flags not yet supported. + 0, + ) + + added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr), + addrevisioncb=onchangeset) + + progress.complete() + + return { + 'added': added, + }