--- a/mercurial/streamclone.py Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/streamclone.py Sun Oct 06 09:45:02 2019 -0400
@@ -12,9 +12,7 @@
import struct
from .i18n import _
-from .interfaces import (
- repository,
-)
+from .interfaces import repository
from . import (
cacheutil,
error,
@@ -25,6 +23,7 @@
util,
)
+
def canperformstreamclone(pullop, bundle2=False):
"""Whether it is possible to perform a streaming clone as part of pull.
@@ -44,8 +43,8 @@
if 'v2' in pullop.remotebundle2caps.get('stream', []):
bundle2supported = True
# else
- # Server doesn't support bundle2 stream clone or doesn't support
- # the versions we support. Fall back and possibly allow legacy.
+ # Server doesn't support bundle2 stream clone or doesn't support
+ # the versions we support. Fall back and possibly allow legacy.
# Ensures legacy code path uses available bundle2.
if bundle2supported and not bundle2:
@@ -87,26 +86,37 @@
streamreqs = remote.capable('streamreqs')
# This is weird and shouldn't happen with modern servers.
if not streamreqs:
- pullop.repo.ui.warn(_(
- 'warning: stream clone requested but server has them '
- 'disabled\n'))
+ pullop.repo.ui.warn(
+ _(
+ 'warning: stream clone requested but server has them '
+ 'disabled\n'
+ )
+ )
return False, None
streamreqs = set(streamreqs.split(','))
# Server requires something we don't support. Bail.
missingreqs = streamreqs - repo.supportedformats
if missingreqs:
- pullop.repo.ui.warn(_(
- 'warning: stream clone requested but client is missing '
- 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
pullop.repo.ui.warn(
- _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
- 'for more information)\n'))
+ _(
+ 'warning: stream clone requested but client is missing '
+ 'requirements: %s\n'
+ )
+ % ', '.join(sorted(missingreqs))
+ )
+ pullop.repo.ui.warn(
+ _(
+ '(see https://www.mercurial-scm.org/wiki/MissingRequirement '
+ 'for more information)\n'
+ )
+ )
return False, None
requirements = streamreqs
return True, requirements
+
def maybeperformlegacystreamclone(pullop):
"""Possibly perform a legacy stream clone operation.
@@ -147,7 +157,8 @@
resp = int(l)
except ValueError:
raise error.ResponseError(
- _('unexpected response from remote server:'), l)
+ _('unexpected response from remote server:'), l
+ )
if resp == 1:
raise error.Abort(_('operation forbidden by server'))
elif resp == 2:
@@ -160,7 +171,8 @@
filecount, bytecount = map(int, l.split(' ', 1))
except (ValueError, TypeError):
raise error.ResponseError(
- _('unexpected response from remote server:'), l)
+ _('unexpected response from remote server:'), l
+ )
with repo.lock():
consumev1(repo, fp, filecount, bytecount)
@@ -169,9 +181,11 @@
# new format-related remote requirements
# requirements from the streamed-in repository
repo.requirements = requirements | (
- repo.requirements - repo.supportedformats)
+ repo.requirements - repo.supportedformats
+ )
repo.svfs.options = localrepo.resolvestorevfsoptions(
- repo.ui, repo.requirements, repo.features)
+ repo.ui, repo.requirements, repo.features
+ )
repo._writerequirements()
if rbranchmap:
@@ -179,6 +193,7 @@
repo.invalidate()
+
def allowservergeneration(repo):
"""Whether streaming clones are allowed from the server."""
if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
@@ -195,10 +210,12 @@
return True
+
# This is it's own function so extensions can override it.
def _walkstreamfiles(repo, matcher=None):
return repo.store.walk(matcher)
+
def generatev1(repo):
"""Emit content for version 1 of a streaming clone.
@@ -228,8 +245,9 @@
entries.append((name, size))
total_bytes += size
- repo.ui.debug('%d files, %d bytes to transfer\n' %
- (len(entries), total_bytes))
+ repo.ui.debug(
+ '%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
+ )
svfs = repo.svfs
debugflag = repo.ui.debugflag
@@ -251,6 +269,7 @@
return len(entries), total_bytes, emitrevlogdata()
+
def generatev1wireproto(repo):
"""Emit content for version 1 of streaming clone suitable for the wire.
@@ -278,6 +297,7 @@
for chunk in it:
yield chunk
+
def generatebundlev1(repo, compression='UN'):
"""Emit content for version 1 of a stream clone bundle.
@@ -311,8 +331,9 @@
yield compression
filecount, bytecount, it = generatev1(repo)
- repo.ui.status(_('writing %d bytes for %d files\n') %
- (bytecount, filecount))
+ repo.ui.status(
+ _('writing %d bytes for %d files\n') % (bytecount, filecount)
+ )
yield struct.pack('>QQ', filecount, bytecount)
yield struct.pack('>H', len(requires) + 1)
@@ -321,8 +342,9 @@
# This is where we'll add compression in the future.
assert compression == 'UN'
- progress = repo.ui.makeprogress(_('bundle'), total=bytecount,
- unit=_('bytes'))
+ progress = repo.ui.makeprogress(
+ _('bundle'), total=bytecount, unit=_('bytes')
+ )
progress.update(0)
for chunk in it:
@@ -333,6 +355,7 @@
return requirements, gen()
+
def consumev1(repo, fp, filecount, bytecount):
"""Apply the contents from version 1 of a streaming clone file handle.
@@ -343,10 +366,13 @@
handled by this function.
"""
with repo.lock():
- repo.ui.status(_('%d files to transfer, %s of data\n') %
- (filecount, util.bytecount(bytecount)))
- progress = repo.ui.makeprogress(_('clone'), total=bytecount,
- unit=_('bytes'))
+ repo.ui.status(
+ _('%d files to transfer, %s of data\n')
+ % (filecount, util.bytecount(bytecount))
+ )
+ progress = repo.ui.makeprogress(
+ _('clone'), total=bytecount, unit=_('bytes')
+ )
progress.update(0)
start = util.timer()
@@ -374,10 +400,12 @@
size = int(size)
except (ValueError, TypeError):
raise error.ResponseError(
- _('unexpected response from remote server:'), l)
+ _('unexpected response from remote server:'), l
+ )
if repo.ui.debugflag:
- repo.ui.debug('adding %s (%s)\n' %
- (name, util.bytecount(size)))
+ repo.ui.debug(
+ 'adding %s (%s)\n' % (name, util.bytecount(size))
+ )
# for backwards compat, name was partially encoded
path = store.decodedir(name)
with repo.svfs(path, 'w', backgroundclose=True) as ofp:
@@ -393,28 +421,41 @@
if elapsed <= 0:
elapsed = 0.001
progress.complete()
- repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
- (util.bytecount(bytecount), elapsed,
- util.bytecount(bytecount / elapsed)))
+ repo.ui.status(
+ _('transferred %s in %.1f seconds (%s/sec)\n')
+ % (
+ util.bytecount(bytecount),
+ elapsed,
+ util.bytecount(bytecount / elapsed),
+ )
+ )
+
def readbundle1header(fp):
compression = fp.read(2)
if compression != 'UN':
- raise error.Abort(_('only uncompressed stream clone bundles are '
- 'supported; got %s') % compression)
+ raise error.Abort(
+ _('only uncompressed stream clone bundles are ' 'supported; got %s')
+ % compression
+ )
filecount, bytecount = struct.unpack('>QQ', fp.read(16))
requireslen = struct.unpack('>H', fp.read(2))[0]
requires = fp.read(requireslen)
if not requires.endswith('\0'):
- raise error.Abort(_('malformed stream clone bundle: '
- 'requirements not properly encoded'))
+ raise error.Abort(
+ _(
+ 'malformed stream clone bundle: '
+ 'requirements not properly encoded'
+ )
+ )
requirements = set(requires.rstrip('\0').split(','))
return filecount, bytecount, requirements
+
def applybundlev1(repo, fp):
"""Apply the content from a stream clone bundle version 1.
@@ -422,37 +463,42 @@
is at the 2 byte compression identifier.
"""
if len(repo):
- raise error.Abort(_('cannot apply stream clone bundle on non-empty '
- 'repo'))
+ raise error.Abort(
+ _('cannot apply stream clone bundle on non-empty ' 'repo')
+ )
filecount, bytecount, requirements = readbundle1header(fp)
missingreqs = requirements - repo.supportedformats
if missingreqs:
- raise error.Abort(_('unable to apply stream clone: '
- 'unsupported format: %s') %
- ', '.join(sorted(missingreqs)))
+ raise error.Abort(
+ _('unable to apply stream clone: ' 'unsupported format: %s')
+ % ', '.join(sorted(missingreqs))
+ )
consumev1(repo, fp, filecount, bytecount)
+
class streamcloneapplier(object):
"""Class to manage applying streaming clone bundles.
We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
readers to perform bundle type-specific functionality.
"""
+
def __init__(self, fh):
self._fh = fh
def apply(self, repo):
return applybundlev1(repo, self._fh)
+
# type of file to stream
-_fileappend = 0 # append only file
-_filefull = 1 # full snapshot file
+_fileappend = 0 # append only file
+_filefull = 1 # full snapshot file
# Source of the file
-_srcstore = 's' # store (svfs)
-_srccache = 'c' # cache (cache)
+_srcstore = 's' # store (svfs)
+_srccache = 'c' # cache (cache)
# This is it's own function so extensions can override it.
def _walkstreamfullstorefiles(repo):
@@ -462,6 +508,7 @@
fnames.append('phaseroots')
return fnames
+
def _filterfull(entry, copy, vfsmap):
"""actually copy the snapshot files"""
src, name, ftype, data = entry
@@ -469,22 +516,26 @@
return entry
return (src, name, ftype, copy(vfsmap[src].join(name)))
+
@contextlib.contextmanager
def maketempcopies():
"""return a function to temporary copy file"""
files = []
try:
+
def copy(src):
fd, dst = pycompat.mkstemp()
os.close(fd)
files.append(dst)
util.copyfiles(src, dst, hardlink=True)
return dst
+
yield copy
finally:
for tmp in files:
util.tryunlink(tmp)
+
def _makemap(repo):
"""make a (src -> vfs) map for the repo"""
vfsmap = {
@@ -497,16 +548,18 @@
return vfsmap
+
def _emit2(repo, entries, totalfilesize):
"""actually emit the stream bundle"""
vfsmap = _makemap(repo)
- progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize,
- unit=_('bytes'))
+ progress = repo.ui.makeprogress(
+ _('bundle'), total=totalfilesize, unit=_('bytes')
+ )
progress.update(0)
with maketempcopies() as copy, progress:
# copy is delayed until we are in the try
entries = [_filterfull(e, copy, vfsmap) for e in entries]
- yield None # this release the lock on the repository
+ yield None # this release the lock on the repository
seen = 0
for src, name, ftype, data in entries:
@@ -533,6 +586,7 @@
finally:
fp.close()
+
def generatev2(repo, includes, excludes, includeobsmarkers):
"""Emit content for version 2 of a streaming clone.
@@ -578,6 +632,7 @@
return len(entries), totalfilesize, chunks
+
@contextlib.contextmanager
def nested(*ctxs):
this = ctxs[0]
@@ -589,6 +644,7 @@
else:
yield
+
def consumev2(repo, fp, filecount, filesize):
"""Apply the contents from a version 2 streaming clone.
@@ -596,19 +652,21 @@
method.
"""
with repo.lock():
- repo.ui.status(_('%d files to transfer, %s of data\n') %
- (filecount, util.bytecount(filesize)))
+ repo.ui.status(
+ _('%d files to transfer, %s of data\n')
+ % (filecount, util.bytecount(filesize))
+ )
start = util.timer()
- progress = repo.ui.makeprogress(_('clone'), total=filesize,
- unit=_('bytes'))
+ progress = repo.ui.makeprogress(
+ _('clone'), total=filesize, unit=_('bytes')
+ )
progress.update(0)
vfsmap = _makemap(repo)
with repo.transaction('clone'):
- ctxs = (vfs.backgroundclosing(repo.ui)
- for vfs in vfsmap.values())
+ ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
with nested(*ctxs):
for i in range(filecount):
src = util.readexactly(fp, 1)
@@ -619,8 +677,10 @@
name = util.readexactly(fp, namelen)
if repo.ui.debugflag:
- repo.ui.debug('adding [%s] %s (%s)\n' %
- (src, name, util.bytecount(datalen)))
+ repo.ui.debug(
+ 'adding [%s] %s (%s)\n'
+ % (src, name, util.bytecount(datalen))
+ )
with vfs(name, 'w') as ofp:
for chunk in util.filechunkiter(fp, limit=datalen):
@@ -634,19 +694,26 @@
elapsed = util.timer() - start
if elapsed <= 0:
elapsed = 0.001
- repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
- (util.bytecount(progress.pos), elapsed,
- util.bytecount(progress.pos / elapsed)))
+ repo.ui.status(
+ _('transferred %s in %.1f seconds (%s/sec)\n')
+ % (
+ util.bytecount(progress.pos),
+ elapsed,
+ util.bytecount(progress.pos / elapsed),
+ )
+ )
progress.complete()
+
def applybundlev2(repo, fp, filecount, filesize, requirements):
from . import localrepo
missingreqs = [r for r in requirements if r not in repo.supported]
if missingreqs:
- raise error.Abort(_('unable to apply stream clone: '
- 'unsupported format: %s') %
- ', '.join(sorted(missingreqs)))
+ raise error.Abort(
+ _('unable to apply stream clone: ' 'unsupported format: %s')
+ % ', '.join(sorted(missingreqs))
+ )
consumev2(repo, fp, filecount, filesize)
@@ -654,7 +721,9 @@
# new format-related remote requirements
# requirements from the streamed-in repository
repo.requirements = set(requirements) | (
- repo.requirements - repo.supportedformats)
+ repo.requirements - repo.supportedformats
+ )
repo.svfs.options = localrepo.resolvestorevfsoptions(
- repo.ui, repo.requirements, repo.features)
+ repo.ui, repo.requirements, repo.features
+ )
repo._writerequirements()