Mercurial > public > mercurial-scm > hg-stable
diff mercurial/streamclone.py @ 50706:0452af304808
stream-clone: add a v3 version of the protocol
This new version is less rigid regarding the extract number of files and number
of bytes to be actually transfered, it also lays the groundwork for other
improvements.
The format stays experimental, but this is an interesting base to build upon.
author | Arseniy Alekseyev <aalekseyev@janestreet.com> |
---|---|
date | Thu, 01 Jun 2023 17:39:22 +0100 |
parents | 60f9602b413e |
children | d718eddf01d9 |
line wrap: on
line diff
--- a/mercurial/streamclone.py Thu Jun 01 18:20:28 2023 +0100 +++ b/mercurial/streamclone.py Thu Jun 01 17:39:22 2023 +0100 @@ -668,7 +668,11 @@ for src, vfs, e in entries: entry_streams = e.get_streams( - repo=repo, vfs=vfs, copies=copy, max_changeset=max_linkrev + repo=repo, + vfs=vfs, + copies=copy, + max_changeset=max_linkrev, + preserve_file_count=True, ) for name, stream, size in entry_streams: yield src @@ -691,6 +695,59 @@ raise error.Abort(msg % (bytecount, name, size)) +def _emit3(repo, entries): + """actually emit the stream bundle (v3)""" + vfsmap = _makemap(repo) + # we keep repo.vfs out of the map on purpose, ther are too many dangers + # there (eg: .hg/hgrc), + # + # this assert is duplicated (from _makemap) as authors might think this is + # fine, while this is really not fine. + if repo.vfs in vfsmap.values(): + raise error.ProgrammingError( + b'repo.vfs must not be added to vfsmap for security reasons' + ) + + # translate the vfs once + entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + total_entry_count = len(entries) + + max_linkrev = len(repo) + progress = repo.ui.makeprogress( + _(b'bundle'), + total=total_entry_count, + unit=_(b'entry'), + ) + progress.update(0) + with TempCopyManager() as copy, progress: + # create a copy of volatile files + for k, vfs, e in entries: + for f in e.files(): + f.file_size(vfs) # record the expected size under lock + if f.is_volatile: + copy(vfs.join(f.unencoded_path)) + # the first yield release the lock on the repository + yield None + + yield util.uvarintencode(total_entry_count) + + for src, vfs, e in entries: + entry_streams = e.get_streams( + repo=repo, + vfs=vfs, + copies=copy, + max_changeset=max_linkrev, + ) + yield util.uvarintencode(len(entry_streams)) + for name, stream, size in entry_streams: + yield src + yield util.uvarintencode(len(name)) + yield util.uvarintencode(size) + yield name + yield from stream + progress.increment() + + def _test_sync_point_walk_1(repo): """a function for synchronisation during tests""" @@ -766,7 +823,47 @@ def generatev3(repo, includes, excludes, includeobsmarkers): - return generatev2(repo, includes, excludes, includeobsmarkers) + """Emit content for version 3 of a streaming clone. + + the data stream consists the following: + 1) A varint E containing the number of entries (can be 0), then E entries follow + 2) For each entry: + 2.1) The number of files in this entry (can be 0, but typically 1 or 2) + 2.2) For each file: + 2.2.1) A char representing the file destination (eg: store or cache) + 2.2.2) A varint N containing the length of the filename + 2.2.3) A varint M containing the length of file data + 2.2.4) N bytes containing the filename (the internal, store-agnostic form) + 2.2.5) M bytes containing the file data + + Returns the data iterator. + + XXX This format is experimental and subject to change. Here is a + XXX non-exhaustive list of things this format could do or change: + + - making it easier to write files in parallel + - holding the lock for a shorter time + - improving progress information + - ways to adjust the number of expected entries/files ? + """ + + with repo.lock(): + + repo.ui.debug(b'scanning\n') + + entries = _entries_walk( + repo, + includes=includes, + excludes=excludes, + includeobsmarkers=includeobsmarkers, + ) + chunks = _emit3(repo, list(entries)) + first = next(chunks) + assert first is None + _test_sync_point_walk_1(repo) + _test_sync_point_walk_2(repo) + + return chunks @contextlib.contextmanager @@ -850,6 +947,80 @@ progress.complete() +def consumev3(repo, fp): + """Apply the contents from a version 3 streaming clone. + + Data is read from an object that only needs to provide a ``read(size)`` + method. + """ + with repo.lock(): + start = util.timer() + + entrycount = util.uvarintdecodestream(fp) + repo.ui.status(_(b'%d entries to transfer\n') % (entrycount)) + + progress = repo.ui.makeprogress( + _(b'clone'), + total=entrycount, + unit=_(b'entries'), + ) + progress.update(0) + bytes_transferred = 0 + + vfsmap = _makemap(repo) + # we keep repo.vfs out of the on purpose, there are too many dangers + # there (eg: .hg/hgrc), + # + # this assert is duplicated (from _makemap) as authors might think this + # is fine, while this is really not fine. + if repo.vfs in vfsmap.values(): + raise error.ProgrammingError( + b'repo.vfs must not be added to vfsmap for security reasons' + ) + + with repo.transaction(b'clone'): + ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) + with nested(*ctxs): + + for i in range(entrycount): + filecount = util.uvarintdecodestream(fp) + if filecount == 0: + if repo.ui.debugflag: + repo.ui.debug(b'entry with no files [%d]\n' % (i)) + for i in range(filecount): + src = util.readexactly(fp, 1) + vfs = vfsmap[src] + namelen = util.uvarintdecodestream(fp) + datalen = util.uvarintdecodestream(fp) + + name = util.readexactly(fp, namelen) + + if repo.ui.debugflag: + msg = b'adding [%s] %s (%s)\n' + msg %= (src, name, util.bytecount(datalen)) + repo.ui.debug(msg) + bytes_transferred += datalen + + with vfs(name, b'w') as ofp: + for chunk in util.filechunkiter(fp, limit=datalen): + ofp.write(chunk) + progress.increment(step=1) + + # force @filecache properties to be reloaded from + # streamclone-ed file at next access + repo.invalidate(clearfilecache=True) + + elapsed = util.timer() - start + if elapsed <= 0: + elapsed = 0.001 + msg = _(b'transferred %s in %.1f seconds (%s/sec)\n') + byte_count = util.bytecount(bytes_transferred) + bytes_sec = util.bytecount(bytes_transferred / elapsed) + msg %= (byte_count, elapsed, bytes_sec) + repo.ui.status(msg) + progress.complete() + + def applybundlev2(repo, fp, filecount, filesize, requirements): from . import localrepo @@ -873,6 +1044,28 @@ nodemap.post_stream_cleanup(repo) +def applybundlev3(repo, fp, requirements): + from . import localrepo + + missingreqs = [r for r in requirements if r not in repo.supported] + if missingreqs: + msg = _(b'unable to apply stream clone: unsupported format: %s') + msg %= b', '.join(sorted(missingreqs)) + raise error.Abort(msg) + + consumev3(repo, fp) + + repo.requirements = new_stream_clone_requirements( + repo.requirements, + requirements, + ) + repo.svfs.options = localrepo.resolvestorevfsoptions( + repo.ui, repo.requirements, repo.features + ) + scmutil.writereporequirements(repo) + nodemap.post_stream_cleanup(repo) + + def _copy_files(src_vfs_map, dst_vfs_map, entries, progress): hardlink = [True]