--- a/mercurial/exchangev2.py Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/exchangev2.py Sun Oct 06 09:45:02 2019 -0400
@@ -24,9 +24,8 @@
pycompat,
setdiscovery,
)
-from .interfaces import (
- repository,
-)
+from .interfaces import repository
+
def pull(pullop):
"""Pull using wire protocol version 2."""
@@ -48,11 +47,13 @@
# We don't use the repo's narrow matcher here because the patterns passed
# to exchange.pull() could be different.
- narrowmatcher = narrowspec.match(repo.root,
- # Empty maps to nevermatcher. So always
- # set includes if missing.
- pullop.includepats or {'path:.'},
- pullop.excludepats)
+ narrowmatcher = narrowspec.match(
+ repo.root,
+ # Empty maps to nevermatcher. So always
+ # set includes if missing.
+ pullop.includepats or {'path:.'},
+ pullop.excludepats,
+ )
if pullop.includepats or pullop.excludepats:
pathfilter = {}
@@ -65,7 +66,8 @@
# Figure out what needs to be fetched.
common, fetch, remoteheads = _pullchangesetdiscovery(
- repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
+ repo, remote, pullop.heads, abortwhenunrelated=pullop.force
+ )
# And fetch the data.
pullheads = pullop.heads or remoteheads
@@ -84,13 +86,22 @@
if phase == b'secret' or not csetres['nodesbyphase'][phase]:
continue
- phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
- csetres['nodesbyphase'][phase])
+ phases.advanceboundary(
+ repo,
+ tr,
+ phases.phasenames.index(phase),
+ csetres['nodesbyphase'][phase],
+ )
# Write bookmark updates.
- bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
- remote.url(), pullop.gettransaction,
- explicit=pullop.explicitbookmarks)
+ bookmarks.updatefromremote(
+ repo.ui,
+ repo,
+ csetres['bookmarks'],
+ remote.url(),
+ pullop.gettransaction,
+ explicit=pullop.explicitbookmarks,
+ )
manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
@@ -100,8 +111,9 @@
relevantcsetnodes = set()
clnode = repo.changelog.node
- for rev in repo.revs(b'ancestors(%ln, %s)',
- pullheads, pullop.depth - 1):
+ for rev in repo.revs(
+ b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
+ ):
relevantcsetnodes.add(clnode(rev))
csetrelevantfilter = lambda n: n in relevantcsetnodes
@@ -137,8 +149,17 @@
# Find all file nodes referenced by added manifests and fetch those
# revisions.
fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
- _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles,
- manifestlinkrevs, shallow=bool(pullop.depth))
+ _fetchfilesfromcsets(
+ repo,
+ tr,
+ remote,
+ pathfilter,
+ fnodes,
+ csetsforfiles,
+ manifestlinkrevs,
+ shallow=bool(pullop.depth),
+ )
+
def _checkuserawstorefiledata(pullop):
"""Check whether we should use rawstorefiledata command to retrieve data."""
@@ -165,17 +186,19 @@
return True
+
def _fetchrawstorefiles(repo, remote):
with remote.commandexecutor() as e:
- objs = e.callcommand(b'rawstorefiledata', {
- b'files': [b'changelog', b'manifestlog'],
- }).result()
+ objs = e.callcommand(
+ b'rawstorefiledata', {b'files': [b'changelog', b'manifestlog'],}
+ ).result()
# First object is a summary of files data that follows.
overall = next(objs)
- progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'],
- unit=_('bytes'))
+ progress = repo.ui.makeprogress(
+ _('clone'), total=overall[b'totalsize'], unit=_('bytes')
+ )
with progress:
progress.update(0)
@@ -188,14 +211,17 @@
for k in (b'location', b'path', b'size'):
if k not in filemeta:
- raise error.Abort(_(b'remote file data missing key: %s')
- % k)
+ raise error.Abort(
+ _(b'remote file data missing key: %s') % k
+ )
if filemeta[b'location'] == b'store':
vfs = repo.svfs
else:
- raise error.Abort(_(b'invalid location for raw file data: '
- b'%s') % filemeta[b'location'])
+ raise error.Abort(
+ _(b'invalid location for raw file data: ' b'%s')
+ % filemeta[b'location']
+ )
bytesremaining = filemeta[b'size']
@@ -209,10 +235,13 @@
bytesremaining -= len(chunk)
if bytesremaining < 0:
- raise error.Abort(_(
- b'received invalid number of bytes for file '
- b'data; expected %d, got extra') %
- filemeta[b'size'])
+ raise error.Abort(
+ _(
+ b'received invalid number of bytes for file '
+ b'data; expected %d, got extra'
+ )
+ % filemeta[b'size']
+ )
progress.increment(step=len(chunk))
fh.write(chunk)
@@ -221,15 +250,25 @@
if chunk.islast:
break
except AttributeError:
- raise error.Abort(_(
- b'did not receive indefinite length bytestring '
- b'for file data'))
+ raise error.Abort(
+ _(
+ b'did not receive indefinite length bytestring '
+ b'for file data'
+ )
+ )
if bytesremaining:
- raise error.Abort(_(b'received invalid number of bytes for'
- b'file data; expected %d got %d') %
- (filemeta[b'size'],
- filemeta[b'size'] - bytesremaining))
+ raise error.Abort(
+ _(
+ b'received invalid number of bytes for'
+ b'file data; expected %d got %d'
+ )
+ % (
+ filemeta[b'size'],
+ filemeta[b'size'] - bytesremaining,
+ )
+ )
+
def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
@@ -242,7 +281,8 @@
# TODO wire protocol version 2 is capable of more efficient discovery
# than setdiscovery. Consider implementing something better.
common, fetch, remoteheads = setdiscovery.findcommonheads(
- repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
+ repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
+ )
common = set(common)
remoteheads = set(remoteheads)
@@ -262,6 +302,7 @@
return common, fetch, remoteheads
+
def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
# TODO consider adding a step here where we obtain the DAG shape first
# (or ask the server to slice changesets into chunks for us) so that
@@ -269,22 +310,27 @@
# resuming interrupted clones, higher server-side cache hit rates due
# to smaller segments, etc.
with remote.commandexecutor() as e:
- objs = e.callcommand(b'changesetdata', {
- b'revisions': [{
- b'type': b'changesetdagrange',
- b'roots': sorted(common),
- b'heads': sorted(remoteheads),
- }],
- b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
- }).result()
+ objs = e.callcommand(
+ b'changesetdata',
+ {
+ b'revisions': [
+ {
+ b'type': b'changesetdagrange',
+ b'roots': sorted(common),
+ b'heads': sorted(remoteheads),
+ }
+ ],
+ b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
+ },
+ ).result()
# The context manager waits on all response data when exiting. So
# we need to remain in the context manager in order to stream data.
return _processchangesetdata(repo, tr, objs)
+
def _processchangesetdata(repo, tr, objs):
- repo.hook('prechangegroup', throw=True,
- **pycompat.strkwargs(tr.hookargs))
+ repo.hook('prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
urepo = repo.unfiltered()
cl = urepo.changelog
@@ -295,9 +341,9 @@
# follows.
meta = next(objs)
- progress = repo.ui.makeprogress(_('changesets'),
- unit=_('chunks'),
- total=meta.get(b'totalitems'))
+ progress = repo.ui.makeprogress(
+ _('changesets'), unit=_('chunks'), total=meta.get(b'totalitems')
+ )
manifestnodes = {}
@@ -360,8 +406,9 @@
0,
)
- added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
- addrevisioncb=onchangeset)
+ added = cl.addgroup(
+ iterrevisions(), linkrev, weakref.proxy(tr), addrevisioncb=onchangeset
+ )
progress.complete()
@@ -372,6 +419,7 @@
'manifestnodes': manifestnodes,
}
+
def _fetchmanifests(repo, tr, remote, manifestnodes):
rootmanifest = repo.manifestlog.getstorage(b'')
@@ -429,13 +477,14 @@
basenode,
delta,
# Flags not yet supported.
- 0
+ 0,
)
progress.increment()
- progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
- total=len(fetchnodes))
+ progress = repo.ui.makeprogress(
+ _('manifests'), unit=_('chunks'), total=len(fetchnodes)
+ )
commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
@@ -452,25 +501,31 @@
added = []
for i in pycompat.xrange(0, len(fetchnodes), batchsize):
- batch = [node for node in fetchnodes[i:i + batchsize]]
+ batch = [node for node in fetchnodes[i : i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
- objs = e.callcommand(b'manifestdata', {
- b'tree': b'',
- b'nodes': batch,
- b'fields': {b'parents', b'revision'},
- b'haveparents': True,
- }).result()
+ objs = e.callcommand(
+ b'manifestdata',
+ {
+ b'tree': b'',
+ b'nodes': batch,
+ b'fields': {b'parents', b'revision'},
+ b'haveparents': True,
+ },
+ ).result()
# Chomp off header object.
next(objs)
- added.extend(rootmanifest.addgroup(
- iterrevisions(objs, progress),
- linkrevs.__getitem__,
- weakref.proxy(tr)))
+ added.extend(
+ rootmanifest.addgroup(
+ iterrevisions(objs, progress),
+ linkrevs.__getitem__,
+ weakref.proxy(tr),
+ )
+ )
progress.complete()
@@ -479,6 +534,7 @@
'linkrevs': linkrevs,
}
+
def _derivefilesfrommanifests(repo, matcher, manifestnodes):
"""Determine what file nodes are relevant given a set of manifest nodes.
@@ -489,7 +545,8 @@
fnodes = collections.defaultdict(dict)
progress = repo.ui.makeprogress(
- _('scanning manifests'), total=len(manifestnodes))
+ _('scanning manifests'), total=len(manifestnodes)
+ )
with progress:
for manifestnode in manifestnodes:
@@ -511,8 +568,10 @@
return fnodes
+
def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
"""Fetch file data from explicit file revisions."""
+
def iterrevisions(objs, progress):
for filerevision in objs:
node = filerevision[b'node']
@@ -546,15 +605,17 @@
progress.increment()
progress = repo.ui.makeprogress(
- _('files'), unit=_('chunks'),
- total=sum(len(v) for v in fnodes.itervalues()))
+ _('files'),
+ unit=_('chunks'),
+ total=sum(len(v) for v in fnodes.itervalues()),
+ )
# TODO make batch size configurable
batchsize = 10000
fnodeslist = [x for x in sorted(fnodes.items())]
for i in pycompat.xrange(0, len(fnodeslist), batchsize):
- batch = [x for x in fnodeslist[i:i + batchsize]]
+ batch = [x for x in fnodeslist[i : i + batchsize]]
if not batch:
continue
@@ -563,16 +624,25 @@
locallinkrevs = {}
for path, nodes in batch:
- fs.append((path, e.callcommand(b'filedata', {
- b'path': path,
- b'nodes': sorted(nodes),
- b'fields': {b'parents', b'revision'},
- b'haveparents': True,
- })))
+ fs.append(
+ (
+ path,
+ e.callcommand(
+ b'filedata',
+ {
+ b'path': path,
+ b'nodes': sorted(nodes),
+ b'fields': {b'parents', b'revision'},
+ b'haveparents': True,
+ },
+ ),
+ )
+ )
locallinkrevs[path] = {
node: linkrevs[manifestnode]
- for node, manifestnode in nodes.iteritems()}
+ for node, manifestnode in nodes.iteritems()
+ }
for path, f in fs:
objs = f.result()
@@ -584,10 +654,13 @@
store.addgroup(
iterrevisions(objs, progress),
locallinkrevs[path].__getitem__,
- weakref.proxy(tr))
+ weakref.proxy(tr),
+ )
+
-def _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csets,
- manlinkrevs, shallow=False):
+def _fetchfilesfromcsets(
+ repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
+):
"""Fetch file data from explicit changeset revisions."""
def iterrevisions(objs, remaining, progress):
@@ -631,8 +704,10 @@
remaining -= 1
progress = repo.ui.makeprogress(
- _('files'), unit=_('chunks'),
- total=sum(len(v) for v in fnodes.itervalues()))
+ _('files'),
+ unit=_('chunks'),
+ total=sum(len(v) for v in fnodes.itervalues()),
+ )
commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
@@ -653,16 +728,15 @@
fields.add(b'linknode')
for i in pycompat.xrange(0, len(csets), batchsize):
- batch = [x for x in csets[i:i + batchsize]]
+ batch = [x for x in csets[i : i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
args = {
- b'revisions': [{
- b'type': b'changesetexplicit',
- b'nodes': batch,
- }],
+ b'revisions': [
+ {b'type': b'changesetexplicit', b'nodes': batch,}
+ ],
b'fields': fields,
b'haveparents': haveparents,
}
@@ -684,7 +758,8 @@
linkrevs = {
fnode: manlinkrevs[mnode]
- for fnode, mnode in fnodes[path].iteritems()}
+ for fnode, mnode in fnodes[path].iteritems()
+ }
def getlinkrev(node):
if node in linkrevs:
@@ -692,8 +767,9 @@
else:
return clrev(node)
- store.addgroup(iterrevisions(objs, header[b'totalitems'],
- progress),
- getlinkrev,
- weakref.proxy(tr),
- maybemissingparents=shallow)
+ store.addgroup(
+ iterrevisions(objs, header[b'totalitems'], progress),
+ getlinkrev,
+ weakref.proxy(tr),
+ maybemissingparents=shallow,
+ )