Mercurial > public > mercurial-scm > hg-stable
diff mercurial/streamclone.py @ 43076:2372284d9457
formatting: blacken the codebase
This is using my patch to black
(https://github.com/psf/black/pull/826) so we don't un-wrap collection
literals.
Done with:
hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S
# skip-blame mass-reformatting only
# no-check-commit reformats foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D6971
author | Augie Fackler <augie@google.com> |
---|---|
date | Sun, 06 Oct 2019 09:45:02 -0400 |
parents | 268662aac075 |
children | 687b865b95ad |
line wrap: on
line diff
--- a/mercurial/streamclone.py Sat Oct 05 10:29:34 2019 -0400 +++ b/mercurial/streamclone.py Sun Oct 06 09:45:02 2019 -0400 @@ -12,9 +12,7 @@ import struct from .i18n import _ -from .interfaces import ( - repository, -) +from .interfaces import repository from . import ( cacheutil, error, @@ -25,6 +23,7 @@ util, ) + def canperformstreamclone(pullop, bundle2=False): """Whether it is possible to perform a streaming clone as part of pull. @@ -44,8 +43,8 @@ if 'v2' in pullop.remotebundle2caps.get('stream', []): bundle2supported = True # else - # Server doesn't support bundle2 stream clone or doesn't support - # the versions we support. Fall back and possibly allow legacy. + # Server doesn't support bundle2 stream clone or doesn't support + # the versions we support. Fall back and possibly allow legacy. # Ensures legacy code path uses available bundle2. if bundle2supported and not bundle2: @@ -87,26 +86,37 @@ streamreqs = remote.capable('streamreqs') # This is weird and shouldn't happen with modern servers. if not streamreqs: - pullop.repo.ui.warn(_( - 'warning: stream clone requested but server has them ' - 'disabled\n')) + pullop.repo.ui.warn( + _( + 'warning: stream clone requested but server has them ' + 'disabled\n' + ) + ) return False, None streamreqs = set(streamreqs.split(',')) # Server requires something we don't support. Bail. missingreqs = streamreqs - repo.supportedformats if missingreqs: - pullop.repo.ui.warn(_( - 'warning: stream clone requested but client is missing ' - 'requirements: %s\n') % ', '.join(sorted(missingreqs))) pullop.repo.ui.warn( - _('(see https://www.mercurial-scm.org/wiki/MissingRequirement ' - 'for more information)\n')) + _( + 'warning: stream clone requested but client is missing ' + 'requirements: %s\n' + ) + % ', '.join(sorted(missingreqs)) + ) + pullop.repo.ui.warn( + _( + '(see https://www.mercurial-scm.org/wiki/MissingRequirement ' + 'for more information)\n' + ) + ) return False, None requirements = streamreqs return True, requirements + def maybeperformlegacystreamclone(pullop): """Possibly perform a legacy stream clone operation. @@ -147,7 +157,8 @@ resp = int(l) except ValueError: raise error.ResponseError( - _('unexpected response from remote server:'), l) + _('unexpected response from remote server:'), l + ) if resp == 1: raise error.Abort(_('operation forbidden by server')) elif resp == 2: @@ -160,7 +171,8 @@ filecount, bytecount = map(int, l.split(' ', 1)) except (ValueError, TypeError): raise error.ResponseError( - _('unexpected response from remote server:'), l) + _('unexpected response from remote server:'), l + ) with repo.lock(): consumev1(repo, fp, filecount, bytecount) @@ -169,9 +181,11 @@ # new format-related remote requirements # requirements from the streamed-in repository repo.requirements = requirements | ( - repo.requirements - repo.supportedformats) + repo.requirements - repo.supportedformats + ) repo.svfs.options = localrepo.resolvestorevfsoptions( - repo.ui, repo.requirements, repo.features) + repo.ui, repo.requirements, repo.features + ) repo._writerequirements() if rbranchmap: @@ -179,6 +193,7 @@ repo.invalidate() + def allowservergeneration(repo): """Whether streaming clones are allowed from the server.""" if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: @@ -195,10 +210,12 @@ return True + # This is it's own function so extensions can override it. def _walkstreamfiles(repo, matcher=None): return repo.store.walk(matcher) + def generatev1(repo): """Emit content for version 1 of a streaming clone. @@ -228,8 +245,9 @@ entries.append((name, size)) total_bytes += size - repo.ui.debug('%d files, %d bytes to transfer\n' % - (len(entries), total_bytes)) + repo.ui.debug( + '%d files, %d bytes to transfer\n' % (len(entries), total_bytes) + ) svfs = repo.svfs debugflag = repo.ui.debugflag @@ -251,6 +269,7 @@ return len(entries), total_bytes, emitrevlogdata() + def generatev1wireproto(repo): """Emit content for version 1 of streaming clone suitable for the wire. @@ -278,6 +297,7 @@ for chunk in it: yield chunk + def generatebundlev1(repo, compression='UN'): """Emit content for version 1 of a stream clone bundle. @@ -311,8 +331,9 @@ yield compression filecount, bytecount, it = generatev1(repo) - repo.ui.status(_('writing %d bytes for %d files\n') % - (bytecount, filecount)) + repo.ui.status( + _('writing %d bytes for %d files\n') % (bytecount, filecount) + ) yield struct.pack('>QQ', filecount, bytecount) yield struct.pack('>H', len(requires) + 1) @@ -321,8 +342,9 @@ # This is where we'll add compression in the future. assert compression == 'UN' - progress = repo.ui.makeprogress(_('bundle'), total=bytecount, - unit=_('bytes')) + progress = repo.ui.makeprogress( + _('bundle'), total=bytecount, unit=_('bytes') + ) progress.update(0) for chunk in it: @@ -333,6 +355,7 @@ return requirements, gen() + def consumev1(repo, fp, filecount, bytecount): """Apply the contents from version 1 of a streaming clone file handle. @@ -343,10 +366,13 @@ handled by this function. """ with repo.lock(): - repo.ui.status(_('%d files to transfer, %s of data\n') % - (filecount, util.bytecount(bytecount))) - progress = repo.ui.makeprogress(_('clone'), total=bytecount, - unit=_('bytes')) + repo.ui.status( + _('%d files to transfer, %s of data\n') + % (filecount, util.bytecount(bytecount)) + ) + progress = repo.ui.makeprogress( + _('clone'), total=bytecount, unit=_('bytes') + ) progress.update(0) start = util.timer() @@ -374,10 +400,12 @@ size = int(size) except (ValueError, TypeError): raise error.ResponseError( - _('unexpected response from remote server:'), l) + _('unexpected response from remote server:'), l + ) if repo.ui.debugflag: - repo.ui.debug('adding %s (%s)\n' % - (name, util.bytecount(size))) + repo.ui.debug( + 'adding %s (%s)\n' % (name, util.bytecount(size)) + ) # for backwards compat, name was partially encoded path = store.decodedir(name) with repo.svfs(path, 'w', backgroundclose=True) as ofp: @@ -393,28 +421,41 @@ if elapsed <= 0: elapsed = 0.001 progress.complete() - repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % - (util.bytecount(bytecount), elapsed, - util.bytecount(bytecount / elapsed))) + repo.ui.status( + _('transferred %s in %.1f seconds (%s/sec)\n') + % ( + util.bytecount(bytecount), + elapsed, + util.bytecount(bytecount / elapsed), + ) + ) + def readbundle1header(fp): compression = fp.read(2) if compression != 'UN': - raise error.Abort(_('only uncompressed stream clone bundles are ' - 'supported; got %s') % compression) + raise error.Abort( + _('only uncompressed stream clone bundles are ' 'supported; got %s') + % compression + ) filecount, bytecount = struct.unpack('>QQ', fp.read(16)) requireslen = struct.unpack('>H', fp.read(2))[0] requires = fp.read(requireslen) if not requires.endswith('\0'): - raise error.Abort(_('malformed stream clone bundle: ' - 'requirements not properly encoded')) + raise error.Abort( + _( + 'malformed stream clone bundle: ' + 'requirements not properly encoded' + ) + ) requirements = set(requires.rstrip('\0').split(',')) return filecount, bytecount, requirements + def applybundlev1(repo, fp): """Apply the content from a stream clone bundle version 1. @@ -422,37 +463,42 @@ is at the 2 byte compression identifier. """ if len(repo): - raise error.Abort(_('cannot apply stream clone bundle on non-empty ' - 'repo')) + raise error.Abort( + _('cannot apply stream clone bundle on non-empty ' 'repo') + ) filecount, bytecount, requirements = readbundle1header(fp) missingreqs = requirements - repo.supportedformats if missingreqs: - raise error.Abort(_('unable to apply stream clone: ' - 'unsupported format: %s') % - ', '.join(sorted(missingreqs))) + raise error.Abort( + _('unable to apply stream clone: ' 'unsupported format: %s') + % ', '.join(sorted(missingreqs)) + ) consumev1(repo, fp, filecount, bytecount) + class streamcloneapplier(object): """Class to manage applying streaming clone bundles. We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle readers to perform bundle type-specific functionality. """ + def __init__(self, fh): self._fh = fh def apply(self, repo): return applybundlev1(repo, self._fh) + # type of file to stream -_fileappend = 0 # append only file -_filefull = 1 # full snapshot file +_fileappend = 0 # append only file +_filefull = 1 # full snapshot file # Source of the file -_srcstore = 's' # store (svfs) -_srccache = 'c' # cache (cache) +_srcstore = 's' # store (svfs) +_srccache = 'c' # cache (cache) # This is it's own function so extensions can override it. def _walkstreamfullstorefiles(repo): @@ -462,6 +508,7 @@ fnames.append('phaseroots') return fnames + def _filterfull(entry, copy, vfsmap): """actually copy the snapshot files""" src, name, ftype, data = entry @@ -469,22 +516,26 @@ return entry return (src, name, ftype, copy(vfsmap[src].join(name))) + @contextlib.contextmanager def maketempcopies(): """return a function to temporary copy file""" files = [] try: + def copy(src): fd, dst = pycompat.mkstemp() os.close(fd) files.append(dst) util.copyfiles(src, dst, hardlink=True) return dst + yield copy finally: for tmp in files: util.tryunlink(tmp) + def _makemap(repo): """make a (src -> vfs) map for the repo""" vfsmap = { @@ -497,16 +548,18 @@ return vfsmap + def _emit2(repo, entries, totalfilesize): """actually emit the stream bundle""" vfsmap = _makemap(repo) - progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize, - unit=_('bytes')) + progress = repo.ui.makeprogress( + _('bundle'), total=totalfilesize, unit=_('bytes') + ) progress.update(0) with maketempcopies() as copy, progress: # copy is delayed until we are in the try entries = [_filterfull(e, copy, vfsmap) for e in entries] - yield None # this release the lock on the repository + yield None # this release the lock on the repository seen = 0 for src, name, ftype, data in entries: @@ -533,6 +586,7 @@ finally: fp.close() + def generatev2(repo, includes, excludes, includeobsmarkers): """Emit content for version 2 of a streaming clone. @@ -578,6 +632,7 @@ return len(entries), totalfilesize, chunks + @contextlib.contextmanager def nested(*ctxs): this = ctxs[0] @@ -589,6 +644,7 @@ else: yield + def consumev2(repo, fp, filecount, filesize): """Apply the contents from a version 2 streaming clone. @@ -596,19 +652,21 @@ method. """ with repo.lock(): - repo.ui.status(_('%d files to transfer, %s of data\n') % - (filecount, util.bytecount(filesize))) + repo.ui.status( + _('%d files to transfer, %s of data\n') + % (filecount, util.bytecount(filesize)) + ) start = util.timer() - progress = repo.ui.makeprogress(_('clone'), total=filesize, - unit=_('bytes')) + progress = repo.ui.makeprogress( + _('clone'), total=filesize, unit=_('bytes') + ) progress.update(0) vfsmap = _makemap(repo) with repo.transaction('clone'): - ctxs = (vfs.backgroundclosing(repo.ui) - for vfs in vfsmap.values()) + ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) with nested(*ctxs): for i in range(filecount): src = util.readexactly(fp, 1) @@ -619,8 +677,10 @@ name = util.readexactly(fp, namelen) if repo.ui.debugflag: - repo.ui.debug('adding [%s] %s (%s)\n' % - (src, name, util.bytecount(datalen))) + repo.ui.debug( + 'adding [%s] %s (%s)\n' + % (src, name, util.bytecount(datalen)) + ) with vfs(name, 'w') as ofp: for chunk in util.filechunkiter(fp, limit=datalen): @@ -634,19 +694,26 @@ elapsed = util.timer() - start if elapsed <= 0: elapsed = 0.001 - repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % - (util.bytecount(progress.pos), elapsed, - util.bytecount(progress.pos / elapsed))) + repo.ui.status( + _('transferred %s in %.1f seconds (%s/sec)\n') + % ( + util.bytecount(progress.pos), + elapsed, + util.bytecount(progress.pos / elapsed), + ) + ) progress.complete() + def applybundlev2(repo, fp, filecount, filesize, requirements): from . import localrepo missingreqs = [r for r in requirements if r not in repo.supported] if missingreqs: - raise error.Abort(_('unable to apply stream clone: ' - 'unsupported format: %s') % - ', '.join(sorted(missingreqs))) + raise error.Abort( + _('unable to apply stream clone: ' 'unsupported format: %s') + % ', '.join(sorted(missingreqs)) + ) consumev2(repo, fp, filecount, filesize) @@ -654,7 +721,9 @@ # new format-related remote requirements # requirements from the streamed-in repository repo.requirements = set(requirements) | ( - repo.requirements - repo.supportedformats) + repo.requirements - repo.supportedformats + ) repo.svfs.options = localrepo.resolvestorevfsoptions( - repo.ui, repo.requirements, repo.features) + repo.ui, repo.requirements, repo.features + ) repo._writerequirements()