Mercurial > public > mercurial-scm > hg-stable
changeset 52921:3ee343dd3abf
stream-clone-v2: extract the stream parsing code in a function
This will be useful to spread the work across different threads. However, before
being able to introduce threading, we need to finish refactoring the code.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 20 Jan 2025 12:16:54 +0100 |
parents | 0005eb86deac |
children | 70306aefa52b |
files | mercurial/streamclone.py |
diffstat | 1 files changed, 85 insertions(+), 25 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/streamclone.py Sun Feb 16 23:29:09 2025 -0500 +++ b/mercurial/streamclone.py Mon Jan 20 12:16:54 2025 +0100 @@ -12,7 +12,13 @@ import os import struct -from typing import Iterable, Iterator, Optional, Set +from typing import ( + Iterable, + Iterator, + Optional, + Set, + Tuple, +) from .i18n import _ from .interfaces import repository @@ -1057,6 +1063,13 @@ yield +class V2Report: + """a small class to track the data we saw within the stream""" + + def __init__(self): + self.byte_count = 0 + + def consumev2(repo, fp, filecount: int, filesize: int) -> None: """Apply the contents from a version 2 streaming clone. @@ -1068,13 +1081,13 @@ _(b'%d files to transfer, %s of data\n') % (filecount, util.bytecount(filesize)) ) - + progress = repo.ui.makeprogress( + _(b'clone'), + total=filesize, + unit=_(b'bytes'), + ) start = util.timer() - progress = repo.ui.makeprogress( - _(b'clone'), total=filesize, unit=_(b'bytes') - ) - progress.update(0) - byte_count = 0 + report = V2Report() vfsmap = _makemap(repo) # we keep repo.vfs out of the on purpose, ther are too many danger @@ -1090,25 +1103,18 @@ with repo.transaction(b'clone'): ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) with nested(*ctxs): - for i in range(filecount): - src = util.readexactly(fp, 1) + files = _v2_parse_files( + repo, + fp, + filecount, + progress, + report, + ) + for src, name, chunks in files: vfs = vfsmap[src] - namelen = util.uvarintdecodestream(fp) - datalen = util.uvarintdecodestream(fp) - - name = util.readexactly(fp, namelen) - - if repo.ui.debugflag: - repo.ui.debug( - b'adding [%s] %s (%s)\n' - % (src, name, util.bytecount(datalen)) - ) - with vfs(name, b'w') as ofp: - for chunk in util.filechunkiter(fp, limit=datalen): - byte_count += len(chunk) - progress.increment(step=len(chunk)) - ofp.write(chunk) + for c in chunks: + ofp.write(c) # force @filecache properties to be reloaded from # streamclone-ed file at next access @@ -1119,7 +1125,61 @@ # sequential and parallel behavior. remains = fp.read(1) assert not remains - _report_transferred(repo, start, filecount, byte_count) + _report_transferred(repo, start, filecount, report.byte_count) + + +# iterator of chunk of bytes that constitute a file content. +FileChunksT = Iterator[bytes] +# Contains the information necessary to write stream file on disk +FileInfoT = Tuple[ + bytes, # vfs key + bytes, # file name (non-vfs-encoded) + FileChunksT, # content +] + + +def _v2_file_chunk( + fp: bundle2mod.unbundlepart, + data_len: int, + progress: scmutil.progress, + report: V2Report, +) -> FileChunksT: + """yield the chunk that constitute a file + + This function exists as the counterpart of a future threaded version and + would not be very useful on its own. + """ + for chunk in util.filechunkiter(fp, limit=data_len): + report.byte_count += len(chunk) + progress.increment(step=len(chunk)) + yield chunk + + +def _v2_parse_files( + repo, + fp: bundle2mod.unbundlepart, + file_count: int, + progress: scmutil.progress, + report: V2Report, +) -> Iterator[FileInfoT]: + """do the "stream-parsing" part of stream v2 + + The parsed information are yield result for consumption by the "writer" + """ + progress.update(0) + for i in range(file_count): + src = util.readexactly(fp, 1) + namelen = util.uvarintdecodestream(fp) + datalen = util.uvarintdecodestream(fp) + + name = util.readexactly(fp, namelen) + + if repo.ui.debugflag: + repo.ui.debug( + b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen)) + ) + + yield (src, name, _v2_file_chunk(fp, datalen, progress, report)) def consumev3(repo, fp) -> None: