Mercurial > public > mercurial-scm > hg
diff mercurial/streamclone.py @ 53031:e705fec4a03f stable
branching: merging with 7.0 changes
Since 6.9.3 was made after 7.0rc0 we need to deal with more branching than
usual.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 05 Mar 2025 23:02:19 +0100 |
parents | 89ab2459f62a e75ed9ae5fb9 |
children |
line wrap: on
line diff
--- a/mercurial/streamclone.py Wed Mar 05 22:33:11 2025 +0100 +++ b/mercurial/streamclone.py Wed Mar 05 23:02:19 2025 +0100 @@ -7,9 +7,22 @@ from __future__ import annotations +import collections import contextlib +import errno import os import struct +import threading + +from typing import ( + Callable, + Iterable, + Iterator, + Optional, + Set, + Tuple, + Type, +) from .i18n import _ from .interfaces import repository @@ -32,7 +45,29 @@ ) -def new_stream_clone_requirements(default_requirements, streamed_requirements): +# Number arbitrarily picked, feel free to change them (but the LOW one) +# +# update the configuration documentation if you touch this. +DEFAULT_NUM_WRITER = { + scmutil.RESOURCE_LOW: 1, + scmutil.RESOURCE_MEDIUM: 4, + scmutil.RESOURCE_HIGH: 8, +} + + +# Number arbitrarily picked, feel free to adjust them. Do update the +# documentation if you do so +DEFAULT_MEMORY_TARGET = { + scmutil.RESOURCE_LOW: 50 * (2**20), # 100 MB + scmutil.RESOURCE_MEDIUM: 500 * 2**20, # 500 MB + scmutil.RESOURCE_HIGH: 2 * 2**30, # 2 GB +} + + +def new_stream_clone_requirements( + default_requirements: Iterable[bytes], + streamed_requirements: Iterable[bytes], +) -> Set[bytes]: """determine the final set of requirement for a new stream clone this method combine the "default" requirements that a new repository would @@ -45,7 +80,7 @@ return requirements -def streamed_requirements(repo): +def streamed_requirements(repo) -> Set[bytes]: """the set of requirement the new clone will have to support This is used for advertising the stream options and to generate the actual @@ -56,7 +91,7 @@ return requiredformats -def canperformstreamclone(pullop, bundle2=False): +def canperformstreamclone(pullop, bundle2: bool = False): """Whether it is possible to perform a streaming clone as part of pull. ``bundle2`` will cause the function to consider stream clone through @@ -150,7 +185,7 @@ return True, requirements -def maybeperformlegacystreamclone(pullop): +def maybeperformlegacystreamclone(pullop) -> None: """Possibly perform a legacy stream clone operation. Legacy stream clones are performed as part of pull but before all other @@ -225,7 +260,7 @@ repo.invalidate() -def allowservergeneration(repo): +def allowservergeneration(repo) -> bool: """Whether streaming clones are allowed from the server.""" if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: return False @@ -243,11 +278,30 @@ # This is it's own function so extensions can override it. -def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False): +def _walkstreamfiles( + repo, matcher=None, phase: bool = False, obsolescence: bool = False +): return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence) -def generatev1(repo): +def _report_transferred( + repo, start_time: float, file_count: int, byte_count: int +): + """common utility to report time it took to apply the stream bundle""" + elapsed = util.timer() - start_time + if elapsed <= 0: + elapsed = 0.001 + m = _(b'stream-cloned %d files / %s in %.1f seconds (%s/sec)\n') + m %= ( + file_count, + util.bytecount(byte_count), + elapsed, + util.bytecount(byte_count / elapsed), + ) + repo.ui.status(m) + + +def generatev1(repo) -> tuple[int, int, Iterator[bytes]]: """Emit content for version 1 of a streaming clone. This returns a 3-tuple of (file count, byte size, data iterator). @@ -271,14 +325,15 @@ # Get consistent snapshot of repo, lock during scan. with repo.lock(): repo.ui.debug(b'scanning\n') + _test_sync_point_walk_1_2(repo) for entry in _walkstreamfiles(repo): for f in entry.files(): file_size = f.file_size(repo.store.vfs) if file_size: entries.append((f.unencoded_path, file_size)) total_bytes += file_size - _test_sync_point_walk_1(repo) - _test_sync_point_walk_2(repo) + _test_sync_point_walk_3(repo) + _test_sync_point_walk_4(repo) repo.ui.debug( b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) @@ -287,7 +342,7 @@ svfs = repo.svfs debugflag = repo.ui.debugflag - def emitrevlogdata(): + def emitrevlogdata() -> Iterator[bytes]: for name, size in entries: if debugflag: repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size)) @@ -299,13 +354,12 @@ if size <= 65536: yield fp.read(size) else: - for chunk in util.filechunkiter(fp, limit=size): - yield chunk + yield from util.filechunkiter(fp, limit=size) return len(entries), total_bytes, emitrevlogdata() -def generatev1wireproto(repo): +def generatev1wireproto(repo) -> Iterator[bytes]: """Emit content for version 1 of streaming clone suitable for the wire. This is the data output from ``generatev1()`` with 2 header lines. The @@ -329,11 +383,12 @@ # Indicates successful response. yield b'0\n' yield b'%d %d\n' % (filecount, bytecount) - for chunk in it: - yield chunk + yield from it -def generatebundlev1(repo, compression=b'UN'): +def generatebundlev1( + repo, compression: bytes = b'UN' +) -> tuple[Set[bytes], Iterator[bytes]]: """Emit content for version 1 of a stream clone bundle. The first 4 bytes of the output ("HGS1") denote this as stream clone @@ -356,12 +411,12 @@ Returns a tuple of (requirements, data generator). """ if compression != b'UN': - raise ValueError(b'we do not support the compression argument yet') + raise ValueError('we do not support the compression argument yet') requirements = streamed_requirements(repo) requires = b','.join(sorted(requirements)) - def gen(): + def gen() -> Iterator[bytes]: yield b'HGS1' yield compression @@ -391,7 +446,7 @@ return requirements, gen() -def consumev1(repo, fp, filecount, bytecount): +def consumev1(repo, fp, filecount: int, bytecount: int) -> None: """Apply the contents from version 1 of a streaming clone file handle. This takes the output from "stream_out" and applies it to the specified @@ -425,6 +480,7 @@ # nesting occurs also in ordinary case (e.g. enabling # clonebundles). + total_file_count = 0 with repo.transaction(b'clone'): with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount): for i in range(filecount): @@ -453,6 +509,7 @@ # for backwards compat, name was partially encoded path = store.decodedir(name) with repo.svfs(path, b'w', backgroundclose=True) as ofp: + total_file_count += 1 for chunk in util.filechunkiter(fp, limit=size): progress.increment(step=len(chunk)) ofp.write(chunk) @@ -461,21 +518,11 @@ # streamclone-ed file at next access repo.invalidate(clearfilecache=True) - elapsed = util.timer() - start - if elapsed <= 0: - elapsed = 0.001 progress.complete() - repo.ui.status( - _(b'transferred %s in %.1f seconds (%s/sec)\n') - % ( - util.bytecount(bytecount), - elapsed, - util.bytecount(bytecount / elapsed), - ) - ) + _report_transferred(repo, start, total_file_count, bytecount) -def readbundle1header(fp): +def readbundle1header(fp) -> tuple[int, int, Set[bytes]]: compression = fp.read(2) if compression != b'UN': raise error.Abort( @@ -503,7 +550,7 @@ return filecount, bytecount, requirements -def applybundlev1(repo, fp): +def applybundlev1(repo, fp) -> None: """Apply the content from a stream clone bundle version 1. We assume the 4 byte header has been read and validated and the file handle @@ -533,10 +580,10 @@ readers to perform bundle type-specific functionality. """ - def __init__(self, fh): + def __init__(self, fh) -> None: self._fh = fh - def apply(self, repo): + def apply(self, repo) -> None: return applybundlev1(repo, self._fh) @@ -550,7 +597,7 @@ # This is it's own function so extensions can override it. -def _walkstreamfullstorefiles(repo): +def _walkstreamfullstorefiles(repo) -> list[bytes]: """list snapshot file from the store""" fnames = [] if not repo.publishing(): @@ -587,7 +634,7 @@ # for flushing to disk in __call__(). MAX_OPEN = 2 if pycompat.iswindows else 100 - def __init__(self): + def __init__(self) -> None: self._counter = 0 self._volatile_fps = None self._copies = None @@ -617,7 +664,7 @@ assert self._copies is None assert self._dst_dir is None - def _init_tmp_copies(self): + def _init_tmp_copies(self) -> None: """prepare a temporary directory to save volatile files This will be used as backup if we have too many files open""" @@ -627,7 +674,7 @@ self._copies = {} self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-') - def _flush_some_on_disk(self): + def _flush_some_on_disk(self) -> None: """move some of the open files to tempory files on disk""" if self._copies is None: self._init_tmp_copies() @@ -646,21 +693,39 @@ del self._volatile_fps[src] fp.close() - def _keep_one(self, src): + def _keep_one(self, src: bytes) -> int: """preserve an open file handle for a given path""" # store the file quickly to ensure we close it if any error happens _, fp = self._volatile_fps[src] = (None, open(src, 'rb')) fp.seek(0, os.SEEK_END) size = fp.tell() self._volatile_fps[src] = (size, fp) + return size - def __call__(self, src): + def __call__(self, src: bytes) -> None: """preserve the volatile file at src""" assert 0 < self._counter if len(self._volatile_fps) >= (self.MAX_OPEN - 1): self._flush_some_on_disk() self._keep_one(src) + def try_keep(self, src: bytes) -> Optional[int]: + """record a volatile file and returns it size + + return None if the file does not exists. + + Used for cache file that are not lock protected. + """ + assert 0 < self._counter + if len(self._volatile_fps) >= (self.MAX_OPEN - 1): + self._flush_some_on_disk() + try: + return self._keep_one(src) + except OSError as err: + if err.errno not in (errno.ENOENT, errno.EPERM): + raise + return None + @contextlib.contextmanager def open(self, src): assert 0 < self._counter @@ -701,65 +766,64 @@ # fine, while this is really not fine. if repo.vfs in vfsmap.values(): raise error.ProgrammingError( - b'repo.vfs must not be added to vfsmap for security reasons' + 'repo.vfs must not be added to vfsmap for security reasons' ) # translate the vfs one entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + _test_sync_point_walk_1_2(repo) max_linkrev = len(repo) file_count = totalfilesize = 0 - with util.nogc(): - # record the expected size of every file - for k, vfs, e in entries: - for f in e.files(): - file_count += 1 - totalfilesize += f.file_size(vfs) + with VolatileManager() as volatiles: + # make sure we preserve volatile files + with util.nogc(): + # record the expected size of every file + for k, vfs, e in entries: + e.preserve_volatiles(vfs, volatiles) + for f in e.files(): + file_count += 1 + totalfilesize += f.file_size(vfs) - progress = repo.ui.makeprogress( - _(b'bundle'), total=totalfilesize, unit=_(b'bytes') - ) - progress.update(0) - with VolatileManager() as volatiles, progress: - # make sure we preserve volatile files - for k, vfs, e in entries: - for f in e.files(): - if f.is_volatile: - volatiles(vfs.join(f.unencoded_path)) - # the first yield release the lock on the repository - yield file_count, totalfilesize - totalbytecount = 0 + progress = repo.ui.makeprogress( + _(b'bundle'), total=totalfilesize, unit=_(b'bytes') + ) + progress.update(0) + with progress: + # the first yield release the lock on the repository + yield file_count, totalfilesize + totalbytecount = 0 - for src, vfs, e in entries: - entry_streams = e.get_streams( - repo=repo, - vfs=vfs, - volatiles=volatiles, - max_changeset=max_linkrev, - preserve_file_count=True, - ) - for name, stream, size in entry_streams: - yield src - yield util.uvarintencode(len(name)) - yield util.uvarintencode(size) - yield name - bytecount = 0 - for chunk in stream: - bytecount += len(chunk) - totalbytecount += len(chunk) - progress.update(totalbytecount) - yield chunk - if bytecount != size: - # Would most likely be caused by a race due to `hg - # strip` or a revlog split - msg = _( - b'clone could only read %d bytes from %s, but ' - b'expected %d bytes' - ) - raise error.Abort(msg % (bytecount, name, size)) + for src, vfs, e in entries: + entry_streams = e.get_streams( + repo=repo, + vfs=vfs, + volatiles=volatiles, + max_changeset=max_linkrev, + preserve_file_count=True, + ) + for name, stream, size in entry_streams: + yield src + yield util.uvarintencode(len(name)) + yield util.uvarintencode(size) + yield name + bytecount = 0 + for chunk in stream: + bytecount += len(chunk) + totalbytecount += len(chunk) + progress.update(totalbytecount) + yield chunk + if bytecount != size: + # Would most likely be caused by a race due to `hg + # strip` or a revlog split + msg = _( + b'clone could only read %d bytes from %s, but ' + b'expected %d bytes' + ) + raise error.Abort(msg % (bytecount, name, size)) -def _emit3(repo, entries): +def _emit3(repo, entries) -> Iterator[bytes | None]: """actually emit the stream bundle (v3)""" vfsmap = _makemap(repo) # we keep repo.vfs out of the map on purpose, ther are too many dangers @@ -769,60 +833,137 @@ # fine, while this is really not fine. if repo.vfs in vfsmap.values(): raise error.ProgrammingError( - b'repo.vfs must not be added to vfsmap for security reasons' + 'repo.vfs must not be added to vfsmap for security reasons' ) # translate the vfs once - entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] - total_entry_count = len(entries) - - max_linkrev = len(repo) - progress = repo.ui.makeprogress( - _(b'bundle'), - total=total_entry_count, - unit=_(b'entry'), - ) - progress.update(0) - with VolatileManager() as volatiles, progress: + # we only turn this into a list for the `_test_sync`, this is not ideal + base_entries = list(entries) + _test_sync_point_walk_1_2(repo) + entries = [] + with VolatileManager() as volatiles: # make sure we preserve volatile files - for k, vfs, e in entries: + for vfs_key, e in base_entries: + vfs = vfsmap[vfs_key] + any_files = True if e.maybe_volatile: + any_files = False + e.preserve_volatiles(vfs, volatiles) for f in e.files(): if f.is_volatile: # record the expected size under lock f.file_size(vfs) - volatiles(vfs.join(f.unencoded_path)) + any_files = True + if any_files: + entries.append((vfs_key, vfsmap[vfs_key], e)) + + total_entry_count = len(entries) + + max_linkrev = len(repo) + progress = repo.ui.makeprogress( + _(b'bundle'), + total=total_entry_count, + unit=_(b'entry'), + ) + progress.update(0) # the first yield release the lock on the repository yield None + with progress: + yield util.uvarintencode(total_entry_count) - yield util.uvarintencode(total_entry_count) + for src, vfs, e in entries: + entry_streams = e.get_streams( + repo=repo, + vfs=vfs, + volatiles=volatiles, + max_changeset=max_linkrev, + ) + yield util.uvarintencode(len(entry_streams)) + for name, stream, size in entry_streams: + yield src + yield util.uvarintencode(len(name)) + yield util.uvarintencode(size) + yield name + yield from stream + progress.increment() + - for src, vfs, e in entries: - entry_streams = e.get_streams( - repo=repo, - vfs=vfs, - volatiles=volatiles, - max_changeset=max_linkrev, - ) - yield util.uvarintencode(len(entry_streams)) - for name, stream, size in entry_streams: - yield src - yield util.uvarintencode(len(name)) - yield util.uvarintencode(size) - yield name - yield from stream - progress.increment() +def _test_sync_point_walk_1_2(repo): + """a function for synchronisation during tests + + Triggered after gather entry, but before starting to process/preserve them + under lock. + + (on v1 is triggered before the actual walk start) + """ + + +def _test_sync_point_walk_3(repo): + """a function for synchronisation during tests + + Triggered right before releasing the lock, but after computing what need + needed to compute under lock. + """ + + +def _test_sync_point_walk_4(repo): + """a function for synchronisation during tests + + Triggered right after releasing the lock. + """ -def _test_sync_point_walk_1(repo): - """a function for synchronisation during tests""" +# not really a StoreEntry, but close enough +class CacheEntry(store.SimpleStoreEntry): + """Represent an entry for Cache files + + It has special logic to preserve cache file early and accept optional + presence. -def _test_sync_point_walk_2(repo): - """a function for synchronisation during tests""" + (Yes... this is not really a StoreEntry, but close enough. We could have a + BaseEntry base class, bbut the store one would be identical) + """ + + def __init__(self, entry_path) -> None: + super().__init__( + entry_path, + # we will directly deal with that in `setup_cache_file` + is_volatile=True, + ) + + def preserve_volatiles(self, vfs, volatiles) -> None: + self._file_size = volatiles.try_keep(vfs.join(self._entry_path)) + if self._file_size is None: + self._files = [] + else: + assert self._is_volatile + self._files = [ + CacheFile( + unencoded_path=self._entry_path, + file_size=self._file_size, + is_volatile=self._is_volatile, + ) + ] + + def files(self) -> list[store.StoreFile]: + if self._files is None: + self._files = [ + CacheFile( + unencoded_path=self._entry_path, + is_volatile=self._is_volatile, + ) + ] + return super().files() -def _entries_walk(repo, includes, excludes, includeobsmarkers): +class CacheFile(store.StoreFile): + # inform the "copy/hardlink" version that this file might be missing + # without consequences. + optional: bool = True + + +def _entries_walk(repo, includes, excludes, includeobsmarkers: bool): """emit a seris of files information useful to clone a repo return (vfs-key, entry) iterator @@ -851,14 +992,10 @@ for name in cacheutil.cachetocopy(repo): if repo.cachevfs.exists(name): # not really a StoreEntry, but close enough - entry = store.SimpleStoreEntry( - entry_path=name, - is_volatile=True, - ) - yield (_srccache, entry) + yield (_srccache, CacheEntry(entry_path=name)) -def generatev2(repo, includes, excludes, includeobsmarkers): +def generatev2(repo, includes, excludes, includeobsmarkers: bool): """Emit content for version 2 of a streaming clone. the data stream consists the following entries: @@ -880,17 +1017,18 @@ excludes=excludes, includeobsmarkers=includeobsmarkers, ) - chunks = _emit2(repo, entries) first = next(chunks) file_count, total_file_size = first - _test_sync_point_walk_1(repo) - _test_sync_point_walk_2(repo) + _test_sync_point_walk_3(repo) + _test_sync_point_walk_4(repo) return file_count, total_file_size, chunks -def generatev3(repo, includes, excludes, includeobsmarkers): +def generatev3( + repo, includes, excludes, includeobsmarkers: bool +) -> Iterator[bytes | None]: """Emit content for version 3 of a streaming clone. the data stream consists the following: @@ -930,8 +1068,8 @@ chunks = _emit3(repo, list(entries)) first = next(chunks) assert first is None - _test_sync_point_walk_1(repo) - _test_sync_point_walk_2(repo) + _test_sync_point_walk_3(repo) + _test_sync_point_walk_4(repo) return chunks @@ -948,7 +1086,14 @@ yield -def consumev2(repo, fp, filecount, filesize): +class V2Report: + """a small class to track the data we saw within the stream""" + + def __init__(self): + self.byte_count = 0 + + +def consumev2(repo, fp, filecount: int, filesize: int) -> None: """Apply the contents from a version 2 streaming clone. Data is read from an object that only needs to provide a ``read(size)`` @@ -959,12 +1104,13 @@ _(b'%d files to transfer, %s of data\n') % (filecount, util.bytecount(filesize)) ) - + progress = repo.ui.makeprogress( + _(b'clone'), + total=filesize, + unit=_(b'bytes'), + ) start = util.timer() - progress = repo.ui.makeprogress( - _(b'clone'), total=filesize, unit=_(b'bytes') - ) - progress.update(0) + report = V2Report() vfsmap = _makemap(repo) # we keep repo.vfs out of the on purpose, ther are too many danger @@ -974,50 +1120,453 @@ # is fine, while this is really not fine. if repo.vfs in vfsmap.values(): raise error.ProgrammingError( - b'repo.vfs must not be added to vfsmap for security reasons' + 'repo.vfs must not be added to vfsmap for security reasons' ) + cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu') + mem_profile = scmutil.get_resource_profile(repo.ui, b'memory') + threaded = repo.ui.configbool( + b"worker", b"parallel-stream-bundle-processing" + ) + num_writer = repo.ui.configint( + b"worker", + b"parallel-stream-bundle-processing.num-writer", + ) + if num_writer <= 0: + num_writer = DEFAULT_NUM_WRITER[cpu_profile] + memory_target = repo.ui.configbytes( + b"worker", + b"parallel-stream-bundle-processing.memory-target", + ) + if memory_target < 0: + memory_target = None + elif memory_target == 0: + memory_target = DEFAULT_MEMORY_TARGET[mem_profile] with repo.transaction(b'clone'): ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) with nested(*ctxs): - for i in range(filecount): - src = util.readexactly(fp, 1) - vfs = vfsmap[src] - namelen = util.uvarintdecodestream(fp) - datalen = util.uvarintdecodestream(fp) - - name = util.readexactly(fp, namelen) + workers = [] + info_queue = None + data_queue = None + mark_used = None + try: + if not threaded: + fc = _FileChunker + raw_data = fp + else: + fc = _ThreadSafeFileChunker + data_queue = _DataQueue(memory_target=memory_target) + if memory_target is not None: + mark_used = data_queue.mark_used + raw_data = util.chunkbuffer(data_queue) - if repo.ui.debugflag: - repo.ui.debug( - b'adding [%s] %s (%s)\n' - % (src, name, util.bytecount(datalen)) + w = threading.Thread( + target=data_queue.fill_from, + args=(fp,), ) + workers.append(w) + w.start() + files = _v2_parse_files( + repo, + raw_data, + vfsmap, + filecount, + progress, + report, + file_chunker=fc, + mark_used=mark_used, + ) + if not threaded: + _write_files(files) + else: + info_queue = _FileInfoQueue(files) - with vfs(name, b'w') as ofp: - for chunk in util.filechunkiter(fp, limit=datalen): - progress.increment(step=len(chunk)) - ofp.write(chunk) + for __ in range(num_writer): + w = threading.Thread( + target=_write_files, + args=(info_queue,), + ) + workers.append(w) + w.start() + info_queue.fill() + except: # re-raises + if data_queue is not None: + data_queue.abort() + raise + finally: + # shut down all the workers + if info_queue is not None: + # this is strictly speaking one too many worker for + # this queu, but closing too many is not a problem. + info_queue.close(len(workers)) + for w in workers: + w.join() # force @filecache properties to be reloaded from # streamclone-ed file at next access repo.invalidate(clearfilecache=True) - elapsed = util.timer() - start - if elapsed <= 0: - elapsed = 0.001 - repo.ui.status( - _(b'transferred %s in %.1f seconds (%s/sec)\n') - % ( - util.bytecount(progress.pos), - elapsed, - util.bytecount(progress.pos / elapsed), - ) - ) progress.complete() + # acknowledge the end of the bundle2 part, this help aligning + # sequential and parallel behavior. + remains = fp.read(1) + assert not remains + _report_transferred(repo, start, filecount, report.byte_count) + + +# iterator of chunk of bytes that constitute a file content. +FileChunksT = Iterator[bytes] +# Contains the information necessary to write stream file on disk +FileInfoT = Tuple[ + bytes, # real fs path + Optional[int], # permission to give to chmod + FileChunksT, # content +] + + +class _Queue: + """a reimplementation of queue.Queue which doesn't use thread.Condition""" + + def __init__(self): + self._queue = collections.deque() + + # the "_lock" protect manipulation of the "_queue" deque + # the "_wait" is used to have the "get" thread waits for the + # "put" thread when the queue is empty. + # + # This is similar to the "threading.Condition", but without the absurd + # slowness of the stdlib implementation. + # + # the "_wait" is always released while holding the "_lock". + self._lock = threading.Lock() + self._wait = threading.Lock() + + def put(self, item): + with self._lock: + self._queue.append(item) + # if anyone is waiting on item, unblock it. + if self._wait.locked(): + self._wait.release() + + def get(self): + with self._lock: + while len(self._queue) == 0: + # "arm" the waiting lock + self._wait.acquire(blocking=False) + # release the lock to let other touch the queue + # (especially the put call we wait on) + self._lock.release() + # wait for for a `put` call to release the lock + self._wait.acquire() + # grab the lock to look at a possible available value + self._lock.acquire() + # disarm the lock if necessary. + # + # If the queue only constains one item, keep the _wait lock + # armed, as there is no need to wake another waiter anyway. + if self._wait.locked() and len(self._queue) > 1: + self._wait.release() + return self._queue.popleft() + + +class _DataQueue: + """A queue passing data from the bundle stream to other thread + + It has a "memory_target" optional parameter to avoid buffering too much + information. The implementation is not exact and the memory target might be + exceed for a time in some situation. + """ + + def __init__(self, memory_target=None): + self._q = _Queue() + self._abort = False + self._memory_target = memory_target + if self._memory_target is not None and self._memory_target <= 0: + raise error.ProgrammingError("memory target should be > 0") + + # the "_lock" protect manipulation of the _current_used" variable + # the "_wait" is used to have the "reading" thread waits for the + # "using" thread when the buffer is full. + # + # This is similar to the "threading.Condition", but without the absurd + # slowness of the stdlib implementation. + # + # the "_wait" is always released while holding the "_lock". + self._lock = threading.Lock() + self._wait = threading.Lock() + # only the stream reader touch this, it is find to touch without the lock + self._current_read = 0 + # do not touch this without the lock + self._current_used = 0 + + def _has_free_space(self): + """True if more data can be read without further exceeding memory target + + Must be called under the lock. + """ + if self._memory_target is None: + # Ideally we should not even get into the locking business in that + # case, but we keep the implementation simple for now. + return True + return (self._current_read - self._current_used) < self._memory_target + + def mark_used(self, offset): + """Notify we have used the buffer up to "offset" + + This is meant to be used from another thread than the one filler the queue. + """ + if self._memory_target is not None: + with self._lock: + if offset > self._current_used: + self._current_used = offset + # If the reader is waiting for room, unblock it. + if self._wait.locked() and self._has_free_space(): + self._wait.release() + + def fill_from(self, data): + """fill the data queue from a bundle2 part object + + This is meant to be called by the data reading thread + """ + q = self._q + try: + for item in data: + self._current_read += len(item) + q.put(item) + if self._abort: + break + if self._memory_target is not None: + with self._lock: + while not self._has_free_space(): + # make sure the _wait lock is locked + # this is done under lock, so there case be no race with the release logic + self._wait.acquire(blocking=False) + self._lock.release() + # acquiring the lock will block until some other thread release it. + self._wait.acquire() + # lets dive into the locked section again + self._lock.acquire() + # make sure we release the lock we just grabed if + # needed. + if self._wait.locked(): + self._wait.release() + finally: + q.put(None) + + def __iter__(self): + """Iterate over the bundle chunkgs + + This is meant to be called by the data parsing thread.""" + q = self._q + while (i := q.get()) is not None: + yield i + if self._abort: + break + + def abort(self): + """stop the data-reading thread and interrupt the comsuming iteration + + This is meant to be called on errors. + """ + self._abort = True + self._q.put(None) + if self._memory_target is not None: + with self._lock: + # make sure we unstuck the reader thread. + if self._wait.locked(): + self._wait.release() -def consumev3(repo, fp): +class _FileInfoQueue: + """A thread-safe queue to passer parsed file information to the writers""" + + def __init__(self, info: Iterable[FileInfoT]): + self._info = info + self._q = _Queue() + + def fill(self): + """iterate over the parsed information to file the queue + + This is meant to be call from the thread parsing the stream information. + """ + q = self._q + for i in self._info: + q.put(i) + + def close(self, number_worker): + """signal all the workers that we no longer have any file info coming + + Called from the thread parsing the stream information (and/or the main + thread if different). + """ + for __ in range(number_worker): + self._q.put(None) + + def __iter__(self): + """iterate over the available file info + + This is meant to be called from the writer threads. + """ + q = self._q + while (i := q.get()) is not None: + yield i + + +class _FileChunker: + """yield the chunk that constitute a file + + This class exists as the counterpart of the threaded version and + would not be very useful on its own. + """ + + def __init__( + self, + fp: bundle2mod.unbundlepart, + data_len: int, + progress: scmutil.progress, + report: V2Report, + mark_used: Optional[Callable[[int], None]] = None, + ): + self.report = report + self.progress = progress + self._chunks = util.filechunkiter(fp, limit=data_len) + + def fill(self) -> None: + """Do nothing in non-threading context""" + + def __iter__(self) -> FileChunksT: + for chunk in self._chunks: + self.report.byte_count += len(chunk) + self.progress.increment(step=len(chunk)) + yield chunk + + +class _ThreadSafeFileChunker(_FileChunker): + """yield the chunk that constitute a file + + Make sure you call the "fill" function in the main thread to read the + right data at the right time. + """ + + def __init__( + self, + fp: bundle2mod.unbundlepart, + data_len: int, + progress: scmutil.progress, + report: V2Report, + mark_used: Optional[Callable[[int], None]] = None, + ): + super().__init__(fp, data_len, progress, report) + self._fp = fp + self._queue = _Queue() + self._mark_used = mark_used + + def fill(self) -> None: + """fill the file chunker queue with data read from the stream + + This is meant to be called from the thread parsing information (and + consuming the stream data). + """ + try: + for chunk in super().__iter__(): + offset = self._fp.tell() + self._queue.put((chunk, offset)) + finally: + self._queue.put(None) + + def __iter__(self) -> FileChunksT: + """Iterate over all the file chunk + + This is meant to be called from the writer threads. + """ + while (info := self._queue.get()) is not None: + chunk, offset = info + if self._mark_used is not None: + self._mark_used(offset) + yield chunk + + +def _trivial_file( + chunk: bytes, + mark_used: Optional[Callable[[int], None]], + offset: int, +) -> FileChunksT: + """used for single chunk file,""" + if mark_used is not None: + mark_used(offset) + yield chunk + + +def _v2_parse_files( + repo, + fp: bundle2mod.unbundlepart, + vfs_map, + file_count: int, + progress: scmutil.progress, + report: V2Report, + file_chunker: Type[_FileChunker] = _FileChunker, + mark_used: Optional[Callable[[int], None]] = None, +) -> Iterator[FileInfoT]: + """do the "stream-parsing" part of stream v2 + + The parsed information are yield result for consumption by the "writer" + """ + known_dirs = set() # set of directory that we know to exists + progress.update(0) + for i in range(file_count): + src = util.readexactly(fp, 1) + namelen = util.uvarintdecodestream(fp) + datalen = util.uvarintdecodestream(fp) + + name = util.readexactly(fp, namelen) + + if repo.ui.debugflag: + repo.ui.debug( + b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen)) + ) + vfs = vfs_map[src] + path, mode = vfs.prepare_streamed_file(name, known_dirs) + if datalen <= util.DEFAULT_FILE_CHUNK: + c = fp.read(datalen) + offset = fp.tell() + report.byte_count += len(c) + progress.increment(step=len(c)) + chunks = _trivial_file(c, mark_used, offset) + yield (path, mode, iter(chunks)) + else: + chunks = file_chunker( + fp, + datalen, + progress, + report, + mark_used=mark_used, + ) + yield (path, mode, iter(chunks)) + # make sure we read all the chunk before moving to the next file + chunks.fill() + + +def _write_files(info: Iterable[FileInfoT]): + """write files from parsed data""" + io_flags = os.O_WRONLY | os.O_CREAT + if pycompat.iswindows: + io_flags |= os.O_BINARY + for path, mode, data in info: + if mode is None: + fd = os.open(path, io_flags) + else: + fd = os.open(path, io_flags, mode=mode) + try: + for chunk in data: + written = os.write(fd, chunk) + # write missing pieces if the write was interrupted + while written < len(chunk): + written = os.write(fd, chunk[written:]) + finally: + os.close(fd) + + +def consumev3(repo, fp) -> None: """Apply the contents from a version 3 streaming clone. Data is read from an object that only needs to provide a ``read(size)`` @@ -1045,9 +1594,9 @@ # is fine, while this is really not fine. if repo.vfs in vfsmap.values(): raise error.ProgrammingError( - b'repo.vfs must not be added to vfsmap for security reasons' + 'repo.vfs must not be added to vfsmap for security reasons' ) - + total_file_count = 0 with repo.transaction(b'clone'): ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) with nested(*ctxs): @@ -1056,6 +1605,7 @@ if filecount == 0: if repo.ui.debugflag: repo.ui.debug(b'entry with no files [%d]\n' % (i)) + total_file_count += filecount for i in range(filecount): src = util.readexactly(fp, 1) vfs = vfsmap[src] @@ -1079,18 +1629,13 @@ # streamclone-ed file at next access repo.invalidate(clearfilecache=True) - elapsed = util.timer() - start - if elapsed <= 0: - elapsed = 0.001 - msg = _(b'transferred %s in %.1f seconds (%s/sec)\n') - byte_count = util.bytecount(bytes_transferred) - bytes_sec = util.bytecount(bytes_transferred / elapsed) - msg %= (byte_count, elapsed, bytes_sec) - repo.ui.status(msg) progress.complete() + _report_transferred(repo, start, total_file_count, bytes_transferred) -def applybundlev2(repo, fp, filecount, filesize, requirements): +def applybundlev2( + repo, fp, filecount: int, filesize: int, requirements: Iterable[bytes] +) -> None: from . import localrepo missingreqs = [r for r in requirements if r not in repo.supported] @@ -1100,7 +1645,8 @@ % b', '.join(sorted(missingreqs)) ) - consumev2(repo, fp, filecount, filesize) + with util.nogc(): + consumev2(repo, fp, filecount, filesize) repo.requirements = new_stream_clone_requirements( repo.requirements, @@ -1113,7 +1659,7 @@ nodemap.post_stream_cleanup(repo) -def applybundlev3(repo, fp, requirements): +def applybundlev3(repo, fp, requirements: Iterable[bytes]) -> None: from . import localrepo missingreqs = [r for r in requirements if r not in repo.supported] @@ -1135,14 +1681,14 @@ nodemap.post_stream_cleanup(repo) -def _copy_files(src_vfs_map, dst_vfs_map, entries, progress): +def _copy_files(src_vfs_map, dst_vfs_map, entries, progress) -> bool: hardlink = [True] def copy_used(): hardlink[0] = False progress.topic = _(b'copying') - for k, path in entries: + for k, path, optional in entries: src_vfs = src_vfs_map[k] dst_vfs = dst_vfs_map[k] src_path = src_vfs.join(path) @@ -1154,18 +1700,22 @@ util.makedirs(dirname) dst_vfs.register_file(path) # XXX we could use the #nb_bytes argument. - util.copyfile( - src_path, - dst_path, - hardlink=hardlink[0], - no_hardlink_cb=copy_used, - check_fs_hardlink=False, - ) + try: + util.copyfile( + src_path, + dst_path, + hardlink=hardlink[0], + no_hardlink_cb=copy_used, + check_fs_hardlink=False, + ) + except FileNotFoundError: + if not optional: + raise progress.increment() return hardlink[0] -def local_copy(src_repo, dest_repo): +def local_copy(src_repo, dest_repo) -> None: """copy all content from one local repository to another This is useful for local clone""" @@ -1215,7 +1765,7 @@ # to the files while we do the clone, so this is not done yet. We # could do this blindly when copying files. files = [ - (vfs_key, f.unencoded_path) + (vfs_key, f.unencoded_path, f.optional) for vfs_key, e in entries for f in e.files() ]