mercurial/exchangev2.py
changeset 43076 2372284d9457
parent 42813 268662aac075
child 43077 687b865b95ad
--- a/mercurial/exchangev2.py	Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/exchangev2.py	Sun Oct 06 09:45:02 2019 -0400
@@ -24,9 +24,8 @@
     pycompat,
     setdiscovery,
 )
-from .interfaces import (
-    repository,
-)
+from .interfaces import repository
+
 
 def pull(pullop):
     """Pull using wire protocol version 2."""
@@ -48,11 +47,13 @@
 
     # We don't use the repo's narrow matcher here because the patterns passed
     # to exchange.pull() could be different.
-    narrowmatcher = narrowspec.match(repo.root,
-                                     # Empty maps to nevermatcher. So always
-                                     # set includes if missing.
-                                     pullop.includepats or {'path:.'},
-                                     pullop.excludepats)
+    narrowmatcher = narrowspec.match(
+        repo.root,
+        # Empty maps to nevermatcher. So always
+        # set includes if missing.
+        pullop.includepats or {'path:.'},
+        pullop.excludepats,
+    )
 
     if pullop.includepats or pullop.excludepats:
         pathfilter = {}
@@ -65,7 +66,8 @@
 
     # Figure out what needs to be fetched.
     common, fetch, remoteheads = _pullchangesetdiscovery(
-        repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
+        repo, remote, pullop.heads, abortwhenunrelated=pullop.force
+    )
 
     # And fetch the data.
     pullheads = pullop.heads or remoteheads
@@ -84,13 +86,22 @@
         if phase == b'secret' or not csetres['nodesbyphase'][phase]:
             continue
 
-        phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
-                               csetres['nodesbyphase'][phase])
+        phases.advanceboundary(
+            repo,
+            tr,
+            phases.phasenames.index(phase),
+            csetres['nodesbyphase'][phase],
+        )
 
     # Write bookmark updates.
-    bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
-                               remote.url(), pullop.gettransaction,
-                               explicit=pullop.explicitbookmarks)
+    bookmarks.updatefromremote(
+        repo.ui,
+        repo,
+        csetres['bookmarks'],
+        remote.url(),
+        pullop.gettransaction,
+        explicit=pullop.explicitbookmarks,
+    )
 
     manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
 
@@ -100,8 +111,9 @@
         relevantcsetnodes = set()
         clnode = repo.changelog.node
 
-        for rev in repo.revs(b'ancestors(%ln, %s)',
-                             pullheads, pullop.depth - 1):
+        for rev in repo.revs(
+            b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
+        ):
             relevantcsetnodes.add(clnode(rev))
 
         csetrelevantfilter = lambda n: n in relevantcsetnodes
@@ -137,8 +149,17 @@
     # Find all file nodes referenced by added manifests and fetch those
     # revisions.
     fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
-    _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles,
-                         manifestlinkrevs, shallow=bool(pullop.depth))
+    _fetchfilesfromcsets(
+        repo,
+        tr,
+        remote,
+        pathfilter,
+        fnodes,
+        csetsforfiles,
+        manifestlinkrevs,
+        shallow=bool(pullop.depth),
+    )
+
 
 def _checkuserawstorefiledata(pullop):
     """Check whether we should use rawstorefiledata command to retrieve data."""
@@ -165,17 +186,19 @@
 
     return True
 
+
 def _fetchrawstorefiles(repo, remote):
     with remote.commandexecutor() as e:
-        objs = e.callcommand(b'rawstorefiledata', {
-            b'files': [b'changelog', b'manifestlog'],
-        }).result()
+        objs = e.callcommand(
+            b'rawstorefiledata', {b'files': [b'changelog', b'manifestlog'],}
+        ).result()
 
         # First object is a summary of files data that follows.
         overall = next(objs)
 
-        progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'],
-                                        unit=_('bytes'))
+        progress = repo.ui.makeprogress(
+            _('clone'), total=overall[b'totalsize'], unit=_('bytes')
+        )
         with progress:
             progress.update(0)
 
@@ -188,14 +211,17 @@
 
                 for k in (b'location', b'path', b'size'):
                     if k not in filemeta:
-                        raise error.Abort(_(b'remote file data missing key: %s')
-                                          % k)
+                        raise error.Abort(
+                            _(b'remote file data missing key: %s') % k
+                        )
 
                 if filemeta[b'location'] == b'store':
                     vfs = repo.svfs
                 else:
-                    raise error.Abort(_(b'invalid location for raw file data: '
-                                        b'%s') % filemeta[b'location'])
+                    raise error.Abort(
+                        _(b'invalid location for raw file data: ' b'%s')
+                        % filemeta[b'location']
+                    )
 
                 bytesremaining = filemeta[b'size']
 
@@ -209,10 +235,13 @@
                         bytesremaining -= len(chunk)
 
                         if bytesremaining < 0:
-                            raise error.Abort(_(
-                                b'received invalid number of bytes for file '
-                                b'data; expected %d, got extra') %
-                                              filemeta[b'size'])
+                            raise error.Abort(
+                                _(
+                                    b'received invalid number of bytes for file '
+                                    b'data; expected %d, got extra'
+                                )
+                                % filemeta[b'size']
+                            )
 
                         progress.increment(step=len(chunk))
                         fh.write(chunk)
@@ -221,15 +250,25 @@
                             if chunk.islast:
                                 break
                         except AttributeError:
-                            raise error.Abort(_(
-                                b'did not receive indefinite length bytestring '
-                                b'for file data'))
+                            raise error.Abort(
+                                _(
+                                    b'did not receive indefinite length bytestring '
+                                    b'for file data'
+                                )
+                            )
 
                 if bytesremaining:
-                    raise error.Abort(_(b'received invalid number of bytes for'
-                                        b'file data; expected %d got %d') %
-                                      (filemeta[b'size'],
-                                       filemeta[b'size'] - bytesremaining))
+                    raise error.Abort(
+                        _(
+                            b'received invalid number of bytes for'
+                            b'file data; expected %d got %d'
+                        )
+                        % (
+                            filemeta[b'size'],
+                            filemeta[b'size'] - bytesremaining,
+                        )
+                    )
+
 
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""
@@ -242,7 +281,8 @@
     # TODO wire protocol version 2 is capable of more efficient discovery
     # than setdiscovery. Consider implementing something better.
     common, fetch, remoteheads = setdiscovery.findcommonheads(
-        repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
+        repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
+    )
 
     common = set(common)
     remoteheads = set(remoteheads)
@@ -262,6 +302,7 @@
 
     return common, fetch, remoteheads
 
+
 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
     # TODO consider adding a step here where we obtain the DAG shape first
     # (or ask the server to slice changesets into chunks for us) so that
@@ -269,22 +310,27 @@
     # resuming interrupted clones, higher server-side cache hit rates due
     # to smaller segments, etc.
     with remote.commandexecutor() as e:
-        objs = e.callcommand(b'changesetdata', {
-            b'revisions': [{
-                b'type': b'changesetdagrange',
-                b'roots': sorted(common),
-                b'heads': sorted(remoteheads),
-            }],
-            b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
-        }).result()
+        objs = e.callcommand(
+            b'changesetdata',
+            {
+                b'revisions': [
+                    {
+                        b'type': b'changesetdagrange',
+                        b'roots': sorted(common),
+                        b'heads': sorted(remoteheads),
+                    }
+                ],
+                b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
+            },
+        ).result()
 
         # The context manager waits on all response data when exiting. So
         # we need to remain in the context manager in order to stream data.
         return _processchangesetdata(repo, tr, objs)
 
+
 def _processchangesetdata(repo, tr, objs):
-    repo.hook('prechangegroup', throw=True,
-              **pycompat.strkwargs(tr.hookargs))
+    repo.hook('prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
 
     urepo = repo.unfiltered()
     cl = urepo.changelog
@@ -295,9 +341,9 @@
     # follows.
     meta = next(objs)
 
-    progress = repo.ui.makeprogress(_('changesets'),
-                                    unit=_('chunks'),
-                                    total=meta.get(b'totalitems'))
+    progress = repo.ui.makeprogress(
+        _('changesets'), unit=_('chunks'), total=meta.get(b'totalitems')
+    )
 
     manifestnodes = {}
 
@@ -360,8 +406,9 @@
                 0,
             )
 
-    added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
-                        addrevisioncb=onchangeset)
+    added = cl.addgroup(
+        iterrevisions(), linkrev, weakref.proxy(tr), addrevisioncb=onchangeset
+    )
 
     progress.complete()
 
@@ -372,6 +419,7 @@
         'manifestnodes': manifestnodes,
     }
 
+
 def _fetchmanifests(repo, tr, remote, manifestnodes):
     rootmanifest = repo.manifestlog.getstorage(b'')
 
@@ -429,13 +477,14 @@
                 basenode,
                 delta,
                 # Flags not yet supported.
-                0
+                0,
             )
 
             progress.increment()
 
-    progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
-                                    total=len(fetchnodes))
+    progress = repo.ui.makeprogress(
+        _('manifests'), unit=_('chunks'), total=len(fetchnodes)
+    )
 
     commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
     batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
@@ -452,25 +501,31 @@
     added = []
 
     for i in pycompat.xrange(0, len(fetchnodes), batchsize):
-        batch = [node for node in fetchnodes[i:i + batchsize]]
+        batch = [node for node in fetchnodes[i : i + batchsize]]
         if not batch:
             continue
 
         with remote.commandexecutor() as e:
-            objs = e.callcommand(b'manifestdata', {
-                b'tree': b'',
-                b'nodes': batch,
-                b'fields': {b'parents', b'revision'},
-                b'haveparents': True,
-            }).result()
+            objs = e.callcommand(
+                b'manifestdata',
+                {
+                    b'tree': b'',
+                    b'nodes': batch,
+                    b'fields': {b'parents', b'revision'},
+                    b'haveparents': True,
+                },
+            ).result()
 
             # Chomp off header object.
             next(objs)
 
-            added.extend(rootmanifest.addgroup(
-                iterrevisions(objs, progress),
-                linkrevs.__getitem__,
-                weakref.proxy(tr)))
+            added.extend(
+                rootmanifest.addgroup(
+                    iterrevisions(objs, progress),
+                    linkrevs.__getitem__,
+                    weakref.proxy(tr),
+                )
+            )
 
     progress.complete()
 
@@ -479,6 +534,7 @@
         'linkrevs': linkrevs,
     }
 
+
 def _derivefilesfrommanifests(repo, matcher, manifestnodes):
     """Determine what file nodes are relevant given a set of manifest nodes.
 
@@ -489,7 +545,8 @@
     fnodes = collections.defaultdict(dict)
 
     progress = repo.ui.makeprogress(
-        _('scanning manifests'), total=len(manifestnodes))
+        _('scanning manifests'), total=len(manifestnodes)
+    )
 
     with progress:
         for manifestnode in manifestnodes:
@@ -511,8 +568,10 @@
 
     return fnodes
 
+
 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
     """Fetch file data from explicit file revisions."""
+
     def iterrevisions(objs, progress):
         for filerevision in objs:
             node = filerevision[b'node']
@@ -546,15 +605,17 @@
             progress.increment()
 
     progress = repo.ui.makeprogress(
-        _('files'), unit=_('chunks'),
-         total=sum(len(v) for v in fnodes.itervalues()))
+        _('files'),
+        unit=_('chunks'),
+        total=sum(len(v) for v in fnodes.itervalues()),
+    )
 
     # TODO make batch size configurable
     batchsize = 10000
     fnodeslist = [x for x in sorted(fnodes.items())]
 
     for i in pycompat.xrange(0, len(fnodeslist), batchsize):
-        batch = [x for x in fnodeslist[i:i + batchsize]]
+        batch = [x for x in fnodeslist[i : i + batchsize]]
         if not batch:
             continue
 
@@ -563,16 +624,25 @@
             locallinkrevs = {}
 
             for path, nodes in batch:
-                fs.append((path, e.callcommand(b'filedata', {
-                    b'path': path,
-                    b'nodes': sorted(nodes),
-                    b'fields': {b'parents', b'revision'},
-                    b'haveparents': True,
-                })))
+                fs.append(
+                    (
+                        path,
+                        e.callcommand(
+                            b'filedata',
+                            {
+                                b'path': path,
+                                b'nodes': sorted(nodes),
+                                b'fields': {b'parents', b'revision'},
+                                b'haveparents': True,
+                            },
+                        ),
+                    )
+                )
 
                 locallinkrevs[path] = {
                     node: linkrevs[manifestnode]
-                    for node, manifestnode in nodes.iteritems()}
+                    for node, manifestnode in nodes.iteritems()
+                }
 
             for path, f in fs:
                 objs = f.result()
@@ -584,10 +654,13 @@
                 store.addgroup(
                     iterrevisions(objs, progress),
                     locallinkrevs[path].__getitem__,
-                    weakref.proxy(tr))
+                    weakref.proxy(tr),
+                )
+
 
-def _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csets,
-                         manlinkrevs, shallow=False):
+def _fetchfilesfromcsets(
+    repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
+):
     """Fetch file data from explicit changeset revisions."""
 
     def iterrevisions(objs, remaining, progress):
@@ -631,8 +704,10 @@
             remaining -= 1
 
     progress = repo.ui.makeprogress(
-        _('files'), unit=_('chunks'),
-        total=sum(len(v) for v in fnodes.itervalues()))
+        _('files'),
+        unit=_('chunks'),
+        total=sum(len(v) for v in fnodes.itervalues()),
+    )
 
     commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
     batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
@@ -653,16 +728,15 @@
         fields.add(b'linknode')
 
     for i in pycompat.xrange(0, len(csets), batchsize):
-        batch = [x for x in csets[i:i + batchsize]]
+        batch = [x for x in csets[i : i + batchsize]]
         if not batch:
             continue
 
         with remote.commandexecutor() as e:
             args = {
-                b'revisions': [{
-                    b'type': b'changesetexplicit',
-                    b'nodes': batch,
-                }],
+                b'revisions': [
+                    {b'type': b'changesetexplicit', b'nodes': batch,}
+                ],
                 b'fields': fields,
                 b'haveparents': haveparents,
             }
@@ -684,7 +758,8 @@
 
                 linkrevs = {
                     fnode: manlinkrevs[mnode]
-                    for fnode, mnode in fnodes[path].iteritems()}
+                    for fnode, mnode in fnodes[path].iteritems()
+                }
 
                 def getlinkrev(node):
                     if node in linkrevs:
@@ -692,8 +767,9 @@
                     else:
                         return clrev(node)
 
-                store.addgroup(iterrevisions(objs, header[b'totalitems'],
-                                             progress),
-                               getlinkrev,
-                               weakref.proxy(tr),
-                               maybemissingparents=shallow)
+                store.addgroup(
+                    iterrevisions(objs, header[b'totalitems'], progress),
+                    getlinkrev,
+                    weakref.proxy(tr),
+                    maybemissingparents=shallow,
+                )