Mercurial > public > mercurial-scm > hg-stable
diff mercurial/exchangev2.py @ 39647:b9e453d683a1
exchangev2: fetch changeset revisions
All Mercurial repository data is derived from changesets:
you can't do anything unless you have changesets. Therefore,
it makes sense for changesets to be the first piece of data
that we transfer as part of pull.
To do this, we call our new "changesetdata" command, requesting
parents and revision data. This gives us all the data that a
changegroup delta group would give us. We simply normalize
this data into what addgroup() expects and call that API on
the changelog to bulk insert revisions into the changelog.
Code in this commit is heavily borrowed from
changegroup.cg1unpacker.apply().
Differential Revision: https://phab.mercurial-scm.org/D4482
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 12 Sep 2018 10:01:36 -0700 |
parents | a86d21e70b2b |
children | ff2de4f2eb3c |
line wrap: on
line diff
--- a/mercurial/exchangev2.py Wed Sep 12 10:01:16 2018 -0700 +++ b/mercurial/exchangev2.py Wed Sep 12 10:01:36 2018 -0700 @@ -7,10 +7,16 @@ from __future__ import absolute_import +import weakref + +from .i18n import _ from .node import ( nullid, + short, ) from . import ( + mdiff, + pycompat, setdiscovery, ) @@ -18,11 +24,15 @@ """Pull using wire protocol version 2.""" repo = pullop.repo remote = pullop.remote + tr = pullop.trmanager.transaction() # Figure out what needs to be fetched. common, fetch, remoteheads = _pullchangesetdiscovery( repo, remote, pullop.heads, abortwhenunrelated=pullop.force) + pullheads = pullop.heads or remoteheads + _fetchchangesets(repo, tr, remote, common, fetch, pullheads) + def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): """Determine which changesets need to be pulled.""" @@ -53,3 +63,76 @@ common.discard(nullid) return common, fetch, remoteheads + +def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads): + if not fetch: + return + + # TODO consider adding a step here where we obtain the DAG shape first + # (or ask the server to slice changesets into chunks for us) so that + # we can perform multiple fetches in batches. This will facilitate + # resuming interrupted clones, higher server-side cache hit rates due + # to smaller segments, etc. + with remote.commandexecutor() as e: + objs = e.callcommand(b'changesetdata', { + b'noderange': [sorted(common), sorted(remoteheads)], + b'fields': {b'parents', b'revision'}, + }).result() + + # The context manager waits on all response data when exiting. So + # we need to remain in the context manager in order to stream data. + return _processchangesetdata(repo, tr, objs) + +def _processchangesetdata(repo, tr, objs): + repo.hook('prechangegroup', throw=True, + **pycompat.strkwargs(tr.hookargs)) + + urepo = repo.unfiltered() + cl = urepo.changelog + + cl.delayupdate(tr) + + # The first emitted object is a header describing the data that + # follows. + meta = next(objs) + + progress = repo.ui.makeprogress(_('changesets'), + unit=_('chunks'), + total=meta.get(b'totalitems')) + + def linkrev(node): + repo.ui.debug('add changeset %s\n' % short(node)) + # Linkrev for changelog is always self. + return len(cl) + + def onchangeset(cl, node): + progress.increment() + + # addgroup() expects a 7-tuple describing revisions. This normalizes + # the wire data to that format. + def iterrevisions(): + for cset in objs: + assert b'revisionsize' in cset + data = next(objs) + + yield ( + cset[b'node'], + cset[b'parents'][0], + cset[b'parents'][1], + # Linknode is always itself for changesets. + cset[b'node'], + # We always send full revisions. So delta base is not set. + nullid, + mdiff.trivialdiffheader(len(data)) + data, + # Flags not yet supported. + 0, + ) + + added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr), + addrevisioncb=onchangeset) + + progress.complete() + + return { + 'added': added, + }