--- a/mercurial/streamclone.py Wed Mar 05 22:33:11 2025 +0100
+++ b/mercurial/streamclone.py Wed Mar 05 23:02:19 2025 +0100
@@ -7,9 +7,22 @@
from __future__ import annotations
+import collections
import contextlib
+import errno
import os
import struct
+import threading
+
+from typing import (
+ Callable,
+ Iterable,
+ Iterator,
+ Optional,
+ Set,
+ Tuple,
+ Type,
+)
from .i18n import _
from .interfaces import repository
@@ -32,7 +45,29 @@
)
-def new_stream_clone_requirements(default_requirements, streamed_requirements):
+# Number arbitrarily picked, feel free to change them (but the LOW one)
+#
+# update the configuration documentation if you touch this.
+DEFAULT_NUM_WRITER = {
+ scmutil.RESOURCE_LOW: 1,
+ scmutil.RESOURCE_MEDIUM: 4,
+ scmutil.RESOURCE_HIGH: 8,
+}
+
+
+# Number arbitrarily picked, feel free to adjust them. Do update the
+# documentation if you do so
+DEFAULT_MEMORY_TARGET = {
+ scmutil.RESOURCE_LOW: 50 * (2**20), # 100 MB
+ scmutil.RESOURCE_MEDIUM: 500 * 2**20, # 500 MB
+ scmutil.RESOURCE_HIGH: 2 * 2**30, # 2 GB
+}
+
+
+def new_stream_clone_requirements(
+ default_requirements: Iterable[bytes],
+ streamed_requirements: Iterable[bytes],
+) -> Set[bytes]:
"""determine the final set of requirement for a new stream clone
this method combine the "default" requirements that a new repository would
@@ -45,7 +80,7 @@
return requirements
-def streamed_requirements(repo):
+def streamed_requirements(repo) -> Set[bytes]:
"""the set of requirement the new clone will have to support
This is used for advertising the stream options and to generate the actual
@@ -56,7 +91,7 @@
return requiredformats
-def canperformstreamclone(pullop, bundle2=False):
+def canperformstreamclone(pullop, bundle2: bool = False):
"""Whether it is possible to perform a streaming clone as part of pull.
``bundle2`` will cause the function to consider stream clone through
@@ -150,7 +185,7 @@
return True, requirements
-def maybeperformlegacystreamclone(pullop):
+def maybeperformlegacystreamclone(pullop) -> None:
"""Possibly perform a legacy stream clone operation.
Legacy stream clones are performed as part of pull but before all other
@@ -225,7 +260,7 @@
repo.invalidate()
-def allowservergeneration(repo):
+def allowservergeneration(repo) -> bool:
"""Whether streaming clones are allowed from the server."""
if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
return False
@@ -243,11 +278,30 @@
# This is it's own function so extensions can override it.
-def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
+def _walkstreamfiles(
+ repo, matcher=None, phase: bool = False, obsolescence: bool = False
+):
return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
-def generatev1(repo):
+def _report_transferred(
+ repo, start_time: float, file_count: int, byte_count: int
+):
+ """common utility to report time it took to apply the stream bundle"""
+ elapsed = util.timer() - start_time
+ if elapsed <= 0:
+ elapsed = 0.001
+ m = _(b'stream-cloned %d files / %s in %.1f seconds (%s/sec)\n')
+ m %= (
+ file_count,
+ util.bytecount(byte_count),
+ elapsed,
+ util.bytecount(byte_count / elapsed),
+ )
+ repo.ui.status(m)
+
+
+def generatev1(repo) -> tuple[int, int, Iterator[bytes]]:
"""Emit content for version 1 of a streaming clone.
This returns a 3-tuple of (file count, byte size, data iterator).
@@ -271,14 +325,15 @@
# Get consistent snapshot of repo, lock during scan.
with repo.lock():
repo.ui.debug(b'scanning\n')
+ _test_sync_point_walk_1_2(repo)
for entry in _walkstreamfiles(repo):
for f in entry.files():
file_size = f.file_size(repo.store.vfs)
if file_size:
entries.append((f.unencoded_path, file_size))
total_bytes += file_size
- _test_sync_point_walk_1(repo)
- _test_sync_point_walk_2(repo)
+ _test_sync_point_walk_3(repo)
+ _test_sync_point_walk_4(repo)
repo.ui.debug(
b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
@@ -287,7 +342,7 @@
svfs = repo.svfs
debugflag = repo.ui.debugflag
- def emitrevlogdata():
+ def emitrevlogdata() -> Iterator[bytes]:
for name, size in entries:
if debugflag:
repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
@@ -299,13 +354,12 @@
if size <= 65536:
yield fp.read(size)
else:
- for chunk in util.filechunkiter(fp, limit=size):
- yield chunk
+ yield from util.filechunkiter(fp, limit=size)
return len(entries), total_bytes, emitrevlogdata()
-def generatev1wireproto(repo):
+def generatev1wireproto(repo) -> Iterator[bytes]:
"""Emit content for version 1 of streaming clone suitable for the wire.
This is the data output from ``generatev1()`` with 2 header lines. The
@@ -329,11 +383,12 @@
# Indicates successful response.
yield b'0\n'
yield b'%d %d\n' % (filecount, bytecount)
- for chunk in it:
- yield chunk
+ yield from it
-def generatebundlev1(repo, compression=b'UN'):
+def generatebundlev1(
+ repo, compression: bytes = b'UN'
+) -> tuple[Set[bytes], Iterator[bytes]]:
"""Emit content for version 1 of a stream clone bundle.
The first 4 bytes of the output ("HGS1") denote this as stream clone
@@ -356,12 +411,12 @@
Returns a tuple of (requirements, data generator).
"""
if compression != b'UN':
- raise ValueError(b'we do not support the compression argument yet')
+ raise ValueError('we do not support the compression argument yet')
requirements = streamed_requirements(repo)
requires = b','.join(sorted(requirements))
- def gen():
+ def gen() -> Iterator[bytes]:
yield b'HGS1'
yield compression
@@ -391,7 +446,7 @@
return requirements, gen()
-def consumev1(repo, fp, filecount, bytecount):
+def consumev1(repo, fp, filecount: int, bytecount: int) -> None:
"""Apply the contents from version 1 of a streaming clone file handle.
This takes the output from "stream_out" and applies it to the specified
@@ -425,6 +480,7 @@
# nesting occurs also in ordinary case (e.g. enabling
# clonebundles).
+ total_file_count = 0
with repo.transaction(b'clone'):
with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
for i in range(filecount):
@@ -453,6 +509,7 @@
# for backwards compat, name was partially encoded
path = store.decodedir(name)
with repo.svfs(path, b'w', backgroundclose=True) as ofp:
+ total_file_count += 1
for chunk in util.filechunkiter(fp, limit=size):
progress.increment(step=len(chunk))
ofp.write(chunk)
@@ -461,21 +518,11 @@
# streamclone-ed file at next access
repo.invalidate(clearfilecache=True)
- elapsed = util.timer() - start
- if elapsed <= 0:
- elapsed = 0.001
progress.complete()
- repo.ui.status(
- _(b'transferred %s in %.1f seconds (%s/sec)\n')
- % (
- util.bytecount(bytecount),
- elapsed,
- util.bytecount(bytecount / elapsed),
- )
- )
+ _report_transferred(repo, start, total_file_count, bytecount)
-def readbundle1header(fp):
+def readbundle1header(fp) -> tuple[int, int, Set[bytes]]:
compression = fp.read(2)
if compression != b'UN':
raise error.Abort(
@@ -503,7 +550,7 @@
return filecount, bytecount, requirements
-def applybundlev1(repo, fp):
+def applybundlev1(repo, fp) -> None:
"""Apply the content from a stream clone bundle version 1.
We assume the 4 byte header has been read and validated and the file handle
@@ -533,10 +580,10 @@
readers to perform bundle type-specific functionality.
"""
- def __init__(self, fh):
+ def __init__(self, fh) -> None:
self._fh = fh
- def apply(self, repo):
+ def apply(self, repo) -> None:
return applybundlev1(repo, self._fh)
@@ -550,7 +597,7 @@
# This is it's own function so extensions can override it.
-def _walkstreamfullstorefiles(repo):
+def _walkstreamfullstorefiles(repo) -> list[bytes]:
"""list snapshot file from the store"""
fnames = []
if not repo.publishing():
@@ -587,7 +634,7 @@
# for flushing to disk in __call__().
MAX_OPEN = 2 if pycompat.iswindows else 100
- def __init__(self):
+ def __init__(self) -> None:
self._counter = 0
self._volatile_fps = None
self._copies = None
@@ -617,7 +664,7 @@
assert self._copies is None
assert self._dst_dir is None
- def _init_tmp_copies(self):
+ def _init_tmp_copies(self) -> None:
"""prepare a temporary directory to save volatile files
This will be used as backup if we have too many files open"""
@@ -627,7 +674,7 @@
self._copies = {}
self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
- def _flush_some_on_disk(self):
+ def _flush_some_on_disk(self) -> None:
"""move some of the open files to tempory files on disk"""
if self._copies is None:
self._init_tmp_copies()
@@ -646,21 +693,39 @@
del self._volatile_fps[src]
fp.close()
- def _keep_one(self, src):
+ def _keep_one(self, src: bytes) -> int:
"""preserve an open file handle for a given path"""
# store the file quickly to ensure we close it if any error happens
_, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
fp.seek(0, os.SEEK_END)
size = fp.tell()
self._volatile_fps[src] = (size, fp)
+ return size
- def __call__(self, src):
+ def __call__(self, src: bytes) -> None:
"""preserve the volatile file at src"""
assert 0 < self._counter
if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
self._flush_some_on_disk()
self._keep_one(src)
+ def try_keep(self, src: bytes) -> Optional[int]:
+ """record a volatile file and returns it size
+
+ return None if the file does not exists.
+
+ Used for cache file that are not lock protected.
+ """
+ assert 0 < self._counter
+ if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
+ self._flush_some_on_disk()
+ try:
+ return self._keep_one(src)
+ except OSError as err:
+ if err.errno not in (errno.ENOENT, errno.EPERM):
+ raise
+ return None
+
@contextlib.contextmanager
def open(self, src):
assert 0 < self._counter
@@ -701,65 +766,64 @@
# fine, while this is really not fine.
if repo.vfs in vfsmap.values():
raise error.ProgrammingError(
- b'repo.vfs must not be added to vfsmap for security reasons'
+ 'repo.vfs must not be added to vfsmap for security reasons'
)
# translate the vfs one
entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+ _test_sync_point_walk_1_2(repo)
max_linkrev = len(repo)
file_count = totalfilesize = 0
- with util.nogc():
- # record the expected size of every file
- for k, vfs, e in entries:
- for f in e.files():
- file_count += 1
- totalfilesize += f.file_size(vfs)
+ with VolatileManager() as volatiles:
+ # make sure we preserve volatile files
+ with util.nogc():
+ # record the expected size of every file
+ for k, vfs, e in entries:
+ e.preserve_volatiles(vfs, volatiles)
+ for f in e.files():
+ file_count += 1
+ totalfilesize += f.file_size(vfs)
- progress = repo.ui.makeprogress(
- _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
- )
- progress.update(0)
- with VolatileManager() as volatiles, progress:
- # make sure we preserve volatile files
- for k, vfs, e in entries:
- for f in e.files():
- if f.is_volatile:
- volatiles(vfs.join(f.unencoded_path))
- # the first yield release the lock on the repository
- yield file_count, totalfilesize
- totalbytecount = 0
+ progress = repo.ui.makeprogress(
+ _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
+ )
+ progress.update(0)
+ with progress:
+ # the first yield release the lock on the repository
+ yield file_count, totalfilesize
+ totalbytecount = 0
- for src, vfs, e in entries:
- entry_streams = e.get_streams(
- repo=repo,
- vfs=vfs,
- volatiles=volatiles,
- max_changeset=max_linkrev,
- preserve_file_count=True,
- )
- for name, stream, size in entry_streams:
- yield src
- yield util.uvarintencode(len(name))
- yield util.uvarintencode(size)
- yield name
- bytecount = 0
- for chunk in stream:
- bytecount += len(chunk)
- totalbytecount += len(chunk)
- progress.update(totalbytecount)
- yield chunk
- if bytecount != size:
- # Would most likely be caused by a race due to `hg
- # strip` or a revlog split
- msg = _(
- b'clone could only read %d bytes from %s, but '
- b'expected %d bytes'
- )
- raise error.Abort(msg % (bytecount, name, size))
+ for src, vfs, e in entries:
+ entry_streams = e.get_streams(
+ repo=repo,
+ vfs=vfs,
+ volatiles=volatiles,
+ max_changeset=max_linkrev,
+ preserve_file_count=True,
+ )
+ for name, stream, size in entry_streams:
+ yield src
+ yield util.uvarintencode(len(name))
+ yield util.uvarintencode(size)
+ yield name
+ bytecount = 0
+ for chunk in stream:
+ bytecount += len(chunk)
+ totalbytecount += len(chunk)
+ progress.update(totalbytecount)
+ yield chunk
+ if bytecount != size:
+ # Would most likely be caused by a race due to `hg
+ # strip` or a revlog split
+ msg = _(
+ b'clone could only read %d bytes from %s, but '
+ b'expected %d bytes'
+ )
+ raise error.Abort(msg % (bytecount, name, size))
-def _emit3(repo, entries):
+def _emit3(repo, entries) -> Iterator[bytes | None]:
"""actually emit the stream bundle (v3)"""
vfsmap = _makemap(repo)
# we keep repo.vfs out of the map on purpose, ther are too many dangers
@@ -769,60 +833,137 @@
# fine, while this is really not fine.
if repo.vfs in vfsmap.values():
raise error.ProgrammingError(
- b'repo.vfs must not be added to vfsmap for security reasons'
+ 'repo.vfs must not be added to vfsmap for security reasons'
)
# translate the vfs once
- entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
- total_entry_count = len(entries)
-
- max_linkrev = len(repo)
- progress = repo.ui.makeprogress(
- _(b'bundle'),
- total=total_entry_count,
- unit=_(b'entry'),
- )
- progress.update(0)
- with VolatileManager() as volatiles, progress:
+ # we only turn this into a list for the `_test_sync`, this is not ideal
+ base_entries = list(entries)
+ _test_sync_point_walk_1_2(repo)
+ entries = []
+ with VolatileManager() as volatiles:
# make sure we preserve volatile files
- for k, vfs, e in entries:
+ for vfs_key, e in base_entries:
+ vfs = vfsmap[vfs_key]
+ any_files = True
if e.maybe_volatile:
+ any_files = False
+ e.preserve_volatiles(vfs, volatiles)
for f in e.files():
if f.is_volatile:
# record the expected size under lock
f.file_size(vfs)
- volatiles(vfs.join(f.unencoded_path))
+ any_files = True
+ if any_files:
+ entries.append((vfs_key, vfsmap[vfs_key], e))
+
+ total_entry_count = len(entries)
+
+ max_linkrev = len(repo)
+ progress = repo.ui.makeprogress(
+ _(b'bundle'),
+ total=total_entry_count,
+ unit=_(b'entry'),
+ )
+ progress.update(0)
# the first yield release the lock on the repository
yield None
+ with progress:
+ yield util.uvarintencode(total_entry_count)
- yield util.uvarintencode(total_entry_count)
+ for src, vfs, e in entries:
+ entry_streams = e.get_streams(
+ repo=repo,
+ vfs=vfs,
+ volatiles=volatiles,
+ max_changeset=max_linkrev,
+ )
+ yield util.uvarintencode(len(entry_streams))
+ for name, stream, size in entry_streams:
+ yield src
+ yield util.uvarintencode(len(name))
+ yield util.uvarintencode(size)
+ yield name
+ yield from stream
+ progress.increment()
+
- for src, vfs, e in entries:
- entry_streams = e.get_streams(
- repo=repo,
- vfs=vfs,
- volatiles=volatiles,
- max_changeset=max_linkrev,
- )
- yield util.uvarintencode(len(entry_streams))
- for name, stream, size in entry_streams:
- yield src
- yield util.uvarintencode(len(name))
- yield util.uvarintencode(size)
- yield name
- yield from stream
- progress.increment()
+def _test_sync_point_walk_1_2(repo):
+ """a function for synchronisation during tests
+
+ Triggered after gather entry, but before starting to process/preserve them
+ under lock.
+
+ (on v1 is triggered before the actual walk start)
+ """
+
+
+def _test_sync_point_walk_3(repo):
+ """a function for synchronisation during tests
+
+ Triggered right before releasing the lock, but after computing what need
+ needed to compute under lock.
+ """
+
+
+def _test_sync_point_walk_4(repo):
+ """a function for synchronisation during tests
+
+ Triggered right after releasing the lock.
+ """
-def _test_sync_point_walk_1(repo):
- """a function for synchronisation during tests"""
+# not really a StoreEntry, but close enough
+class CacheEntry(store.SimpleStoreEntry):
+ """Represent an entry for Cache files
+
+ It has special logic to preserve cache file early and accept optional
+ presence.
-def _test_sync_point_walk_2(repo):
- """a function for synchronisation during tests"""
+ (Yes... this is not really a StoreEntry, but close enough. We could have a
+ BaseEntry base class, bbut the store one would be identical)
+ """
+
+ def __init__(self, entry_path) -> None:
+ super().__init__(
+ entry_path,
+ # we will directly deal with that in `setup_cache_file`
+ is_volatile=True,
+ )
+
+ def preserve_volatiles(self, vfs, volatiles) -> None:
+ self._file_size = volatiles.try_keep(vfs.join(self._entry_path))
+ if self._file_size is None:
+ self._files = []
+ else:
+ assert self._is_volatile
+ self._files = [
+ CacheFile(
+ unencoded_path=self._entry_path,
+ file_size=self._file_size,
+ is_volatile=self._is_volatile,
+ )
+ ]
+
+ def files(self) -> list[store.StoreFile]:
+ if self._files is None:
+ self._files = [
+ CacheFile(
+ unencoded_path=self._entry_path,
+ is_volatile=self._is_volatile,
+ )
+ ]
+ return super().files()
-def _entries_walk(repo, includes, excludes, includeobsmarkers):
+class CacheFile(store.StoreFile):
+ # inform the "copy/hardlink" version that this file might be missing
+ # without consequences.
+ optional: bool = True
+
+
+def _entries_walk(repo, includes, excludes, includeobsmarkers: bool):
"""emit a seris of files information useful to clone a repo
return (vfs-key, entry) iterator
@@ -851,14 +992,10 @@
for name in cacheutil.cachetocopy(repo):
if repo.cachevfs.exists(name):
# not really a StoreEntry, but close enough
- entry = store.SimpleStoreEntry(
- entry_path=name,
- is_volatile=True,
- )
- yield (_srccache, entry)
+ yield (_srccache, CacheEntry(entry_path=name))
-def generatev2(repo, includes, excludes, includeobsmarkers):
+def generatev2(repo, includes, excludes, includeobsmarkers: bool):
"""Emit content for version 2 of a streaming clone.
the data stream consists the following entries:
@@ -880,17 +1017,18 @@
excludes=excludes,
includeobsmarkers=includeobsmarkers,
)
-
chunks = _emit2(repo, entries)
first = next(chunks)
file_count, total_file_size = first
- _test_sync_point_walk_1(repo)
- _test_sync_point_walk_2(repo)
+ _test_sync_point_walk_3(repo)
+ _test_sync_point_walk_4(repo)
return file_count, total_file_size, chunks
-def generatev3(repo, includes, excludes, includeobsmarkers):
+def generatev3(
+ repo, includes, excludes, includeobsmarkers: bool
+) -> Iterator[bytes | None]:
"""Emit content for version 3 of a streaming clone.
the data stream consists the following:
@@ -930,8 +1068,8 @@
chunks = _emit3(repo, list(entries))
first = next(chunks)
assert first is None
- _test_sync_point_walk_1(repo)
- _test_sync_point_walk_2(repo)
+ _test_sync_point_walk_3(repo)
+ _test_sync_point_walk_4(repo)
return chunks
@@ -948,7 +1086,14 @@
yield
-def consumev2(repo, fp, filecount, filesize):
+class V2Report:
+ """a small class to track the data we saw within the stream"""
+
+ def __init__(self):
+ self.byte_count = 0
+
+
+def consumev2(repo, fp, filecount: int, filesize: int) -> None:
"""Apply the contents from a version 2 streaming clone.
Data is read from an object that only needs to provide a ``read(size)``
@@ -959,12 +1104,13 @@
_(b'%d files to transfer, %s of data\n')
% (filecount, util.bytecount(filesize))
)
-
+ progress = repo.ui.makeprogress(
+ _(b'clone'),
+ total=filesize,
+ unit=_(b'bytes'),
+ )
start = util.timer()
- progress = repo.ui.makeprogress(
- _(b'clone'), total=filesize, unit=_(b'bytes')
- )
- progress.update(0)
+ report = V2Report()
vfsmap = _makemap(repo)
# we keep repo.vfs out of the on purpose, ther are too many danger
@@ -974,50 +1120,453 @@
# is fine, while this is really not fine.
if repo.vfs in vfsmap.values():
raise error.ProgrammingError(
- b'repo.vfs must not be added to vfsmap for security reasons'
+ 'repo.vfs must not be added to vfsmap for security reasons'
)
+ cpu_profile = scmutil.get_resource_profile(repo.ui, b'cpu')
+ mem_profile = scmutil.get_resource_profile(repo.ui, b'memory')
+ threaded = repo.ui.configbool(
+ b"worker", b"parallel-stream-bundle-processing"
+ )
+ num_writer = repo.ui.configint(
+ b"worker",
+ b"parallel-stream-bundle-processing.num-writer",
+ )
+ if num_writer <= 0:
+ num_writer = DEFAULT_NUM_WRITER[cpu_profile]
+ memory_target = repo.ui.configbytes(
+ b"worker",
+ b"parallel-stream-bundle-processing.memory-target",
+ )
+ if memory_target < 0:
+ memory_target = None
+ elif memory_target == 0:
+ memory_target = DEFAULT_MEMORY_TARGET[mem_profile]
with repo.transaction(b'clone'):
ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
with nested(*ctxs):
- for i in range(filecount):
- src = util.readexactly(fp, 1)
- vfs = vfsmap[src]
- namelen = util.uvarintdecodestream(fp)
- datalen = util.uvarintdecodestream(fp)
-
- name = util.readexactly(fp, namelen)
+ workers = []
+ info_queue = None
+ data_queue = None
+ mark_used = None
+ try:
+ if not threaded:
+ fc = _FileChunker
+ raw_data = fp
+ else:
+ fc = _ThreadSafeFileChunker
+ data_queue = _DataQueue(memory_target=memory_target)
+ if memory_target is not None:
+ mark_used = data_queue.mark_used
+ raw_data = util.chunkbuffer(data_queue)
- if repo.ui.debugflag:
- repo.ui.debug(
- b'adding [%s] %s (%s)\n'
- % (src, name, util.bytecount(datalen))
+ w = threading.Thread(
+ target=data_queue.fill_from,
+ args=(fp,),
)
+ workers.append(w)
+ w.start()
+ files = _v2_parse_files(
+ repo,
+ raw_data,
+ vfsmap,
+ filecount,
+ progress,
+ report,
+ file_chunker=fc,
+ mark_used=mark_used,
+ )
+ if not threaded:
+ _write_files(files)
+ else:
+ info_queue = _FileInfoQueue(files)
- with vfs(name, b'w') as ofp:
- for chunk in util.filechunkiter(fp, limit=datalen):
- progress.increment(step=len(chunk))
- ofp.write(chunk)
+ for __ in range(num_writer):
+ w = threading.Thread(
+ target=_write_files,
+ args=(info_queue,),
+ )
+ workers.append(w)
+ w.start()
+ info_queue.fill()
+ except: # re-raises
+ if data_queue is not None:
+ data_queue.abort()
+ raise
+ finally:
+ # shut down all the workers
+ if info_queue is not None:
+ # this is strictly speaking one too many worker for
+ # this queu, but closing too many is not a problem.
+ info_queue.close(len(workers))
+ for w in workers:
+ w.join()
# force @filecache properties to be reloaded from
# streamclone-ed file at next access
repo.invalidate(clearfilecache=True)
- elapsed = util.timer() - start
- if elapsed <= 0:
- elapsed = 0.001
- repo.ui.status(
- _(b'transferred %s in %.1f seconds (%s/sec)\n')
- % (
- util.bytecount(progress.pos),
- elapsed,
- util.bytecount(progress.pos / elapsed),
- )
- )
progress.complete()
+ # acknowledge the end of the bundle2 part, this help aligning
+ # sequential and parallel behavior.
+ remains = fp.read(1)
+ assert not remains
+ _report_transferred(repo, start, filecount, report.byte_count)
+
+
+# iterator of chunk of bytes that constitute a file content.
+FileChunksT = Iterator[bytes]
+# Contains the information necessary to write stream file on disk
+FileInfoT = Tuple[
+ bytes, # real fs path
+ Optional[int], # permission to give to chmod
+ FileChunksT, # content
+]
+
+
+class _Queue:
+ """a reimplementation of queue.Queue which doesn't use thread.Condition"""
+
+ def __init__(self):
+ self._queue = collections.deque()
+
+ # the "_lock" protect manipulation of the "_queue" deque
+ # the "_wait" is used to have the "get" thread waits for the
+ # "put" thread when the queue is empty.
+ #
+ # This is similar to the "threading.Condition", but without the absurd
+ # slowness of the stdlib implementation.
+ #
+ # the "_wait" is always released while holding the "_lock".
+ self._lock = threading.Lock()
+ self._wait = threading.Lock()
+
+ def put(self, item):
+ with self._lock:
+ self._queue.append(item)
+ # if anyone is waiting on item, unblock it.
+ if self._wait.locked():
+ self._wait.release()
+
+ def get(self):
+ with self._lock:
+ while len(self._queue) == 0:
+ # "arm" the waiting lock
+ self._wait.acquire(blocking=False)
+ # release the lock to let other touch the queue
+ # (especially the put call we wait on)
+ self._lock.release()
+ # wait for for a `put` call to release the lock
+ self._wait.acquire()
+ # grab the lock to look at a possible available value
+ self._lock.acquire()
+ # disarm the lock if necessary.
+ #
+ # If the queue only constains one item, keep the _wait lock
+ # armed, as there is no need to wake another waiter anyway.
+ if self._wait.locked() and len(self._queue) > 1:
+ self._wait.release()
+ return self._queue.popleft()
+
+
+class _DataQueue:
+ """A queue passing data from the bundle stream to other thread
+
+ It has a "memory_target" optional parameter to avoid buffering too much
+ information. The implementation is not exact and the memory target might be
+ exceed for a time in some situation.
+ """
+
+ def __init__(self, memory_target=None):
+ self._q = _Queue()
+ self._abort = False
+ self._memory_target = memory_target
+ if self._memory_target is not None and self._memory_target <= 0:
+ raise error.ProgrammingError("memory target should be > 0")
+
+ # the "_lock" protect manipulation of the _current_used" variable
+ # the "_wait" is used to have the "reading" thread waits for the
+ # "using" thread when the buffer is full.
+ #
+ # This is similar to the "threading.Condition", but without the absurd
+ # slowness of the stdlib implementation.
+ #
+ # the "_wait" is always released while holding the "_lock".
+ self._lock = threading.Lock()
+ self._wait = threading.Lock()
+ # only the stream reader touch this, it is find to touch without the lock
+ self._current_read = 0
+ # do not touch this without the lock
+ self._current_used = 0
+
+ def _has_free_space(self):
+ """True if more data can be read without further exceeding memory target
+
+ Must be called under the lock.
+ """
+ if self._memory_target is None:
+ # Ideally we should not even get into the locking business in that
+ # case, but we keep the implementation simple for now.
+ return True
+ return (self._current_read - self._current_used) < self._memory_target
+
+ def mark_used(self, offset):
+ """Notify we have used the buffer up to "offset"
+
+ This is meant to be used from another thread than the one filler the queue.
+ """
+ if self._memory_target is not None:
+ with self._lock:
+ if offset > self._current_used:
+ self._current_used = offset
+ # If the reader is waiting for room, unblock it.
+ if self._wait.locked() and self._has_free_space():
+ self._wait.release()
+
+ def fill_from(self, data):
+ """fill the data queue from a bundle2 part object
+
+ This is meant to be called by the data reading thread
+ """
+ q = self._q
+ try:
+ for item in data:
+ self._current_read += len(item)
+ q.put(item)
+ if self._abort:
+ break
+ if self._memory_target is not None:
+ with self._lock:
+ while not self._has_free_space():
+ # make sure the _wait lock is locked
+ # this is done under lock, so there case be no race with the release logic
+ self._wait.acquire(blocking=False)
+ self._lock.release()
+ # acquiring the lock will block until some other thread release it.
+ self._wait.acquire()
+ # lets dive into the locked section again
+ self._lock.acquire()
+ # make sure we release the lock we just grabed if
+ # needed.
+ if self._wait.locked():
+ self._wait.release()
+ finally:
+ q.put(None)
+
+ def __iter__(self):
+ """Iterate over the bundle chunkgs
+
+ This is meant to be called by the data parsing thread."""
+ q = self._q
+ while (i := q.get()) is not None:
+ yield i
+ if self._abort:
+ break
+
+ def abort(self):
+ """stop the data-reading thread and interrupt the comsuming iteration
+
+ This is meant to be called on errors.
+ """
+ self._abort = True
+ self._q.put(None)
+ if self._memory_target is not None:
+ with self._lock:
+ # make sure we unstuck the reader thread.
+ if self._wait.locked():
+ self._wait.release()
-def consumev3(repo, fp):
+class _FileInfoQueue:
+ """A thread-safe queue to passer parsed file information to the writers"""
+
+ def __init__(self, info: Iterable[FileInfoT]):
+ self._info = info
+ self._q = _Queue()
+
+ def fill(self):
+ """iterate over the parsed information to file the queue
+
+ This is meant to be call from the thread parsing the stream information.
+ """
+ q = self._q
+ for i in self._info:
+ q.put(i)
+
+ def close(self, number_worker):
+ """signal all the workers that we no longer have any file info coming
+
+ Called from the thread parsing the stream information (and/or the main
+ thread if different).
+ """
+ for __ in range(number_worker):
+ self._q.put(None)
+
+ def __iter__(self):
+ """iterate over the available file info
+
+ This is meant to be called from the writer threads.
+ """
+ q = self._q
+ while (i := q.get()) is not None:
+ yield i
+
+
+class _FileChunker:
+ """yield the chunk that constitute a file
+
+ This class exists as the counterpart of the threaded version and
+ would not be very useful on its own.
+ """
+
+ def __init__(
+ self,
+ fp: bundle2mod.unbundlepart,
+ data_len: int,
+ progress: scmutil.progress,
+ report: V2Report,
+ mark_used: Optional[Callable[[int], None]] = None,
+ ):
+ self.report = report
+ self.progress = progress
+ self._chunks = util.filechunkiter(fp, limit=data_len)
+
+ def fill(self) -> None:
+ """Do nothing in non-threading context"""
+
+ def __iter__(self) -> FileChunksT:
+ for chunk in self._chunks:
+ self.report.byte_count += len(chunk)
+ self.progress.increment(step=len(chunk))
+ yield chunk
+
+
+class _ThreadSafeFileChunker(_FileChunker):
+ """yield the chunk that constitute a file
+
+ Make sure you call the "fill" function in the main thread to read the
+ right data at the right time.
+ """
+
+ def __init__(
+ self,
+ fp: bundle2mod.unbundlepart,
+ data_len: int,
+ progress: scmutil.progress,
+ report: V2Report,
+ mark_used: Optional[Callable[[int], None]] = None,
+ ):
+ super().__init__(fp, data_len, progress, report)
+ self._fp = fp
+ self._queue = _Queue()
+ self._mark_used = mark_used
+
+ def fill(self) -> None:
+ """fill the file chunker queue with data read from the stream
+
+ This is meant to be called from the thread parsing information (and
+ consuming the stream data).
+ """
+ try:
+ for chunk in super().__iter__():
+ offset = self._fp.tell()
+ self._queue.put((chunk, offset))
+ finally:
+ self._queue.put(None)
+
+ def __iter__(self) -> FileChunksT:
+ """Iterate over all the file chunk
+
+ This is meant to be called from the writer threads.
+ """
+ while (info := self._queue.get()) is not None:
+ chunk, offset = info
+ if self._mark_used is not None:
+ self._mark_used(offset)
+ yield chunk
+
+
+def _trivial_file(
+ chunk: bytes,
+ mark_used: Optional[Callable[[int], None]],
+ offset: int,
+) -> FileChunksT:
+ """used for single chunk file,"""
+ if mark_used is not None:
+ mark_used(offset)
+ yield chunk
+
+
+def _v2_parse_files(
+ repo,
+ fp: bundle2mod.unbundlepart,
+ vfs_map,
+ file_count: int,
+ progress: scmutil.progress,
+ report: V2Report,
+ file_chunker: Type[_FileChunker] = _FileChunker,
+ mark_used: Optional[Callable[[int], None]] = None,
+) -> Iterator[FileInfoT]:
+ """do the "stream-parsing" part of stream v2
+
+ The parsed information are yield result for consumption by the "writer"
+ """
+ known_dirs = set() # set of directory that we know to exists
+ progress.update(0)
+ for i in range(file_count):
+ src = util.readexactly(fp, 1)
+ namelen = util.uvarintdecodestream(fp)
+ datalen = util.uvarintdecodestream(fp)
+
+ name = util.readexactly(fp, namelen)
+
+ if repo.ui.debugflag:
+ repo.ui.debug(
+ b'adding [%s] %s (%s)\n' % (src, name, util.bytecount(datalen))
+ )
+ vfs = vfs_map[src]
+ path, mode = vfs.prepare_streamed_file(name, known_dirs)
+ if datalen <= util.DEFAULT_FILE_CHUNK:
+ c = fp.read(datalen)
+ offset = fp.tell()
+ report.byte_count += len(c)
+ progress.increment(step=len(c))
+ chunks = _trivial_file(c, mark_used, offset)
+ yield (path, mode, iter(chunks))
+ else:
+ chunks = file_chunker(
+ fp,
+ datalen,
+ progress,
+ report,
+ mark_used=mark_used,
+ )
+ yield (path, mode, iter(chunks))
+ # make sure we read all the chunk before moving to the next file
+ chunks.fill()
+
+
+def _write_files(info: Iterable[FileInfoT]):
+ """write files from parsed data"""
+ io_flags = os.O_WRONLY | os.O_CREAT
+ if pycompat.iswindows:
+ io_flags |= os.O_BINARY
+ for path, mode, data in info:
+ if mode is None:
+ fd = os.open(path, io_flags)
+ else:
+ fd = os.open(path, io_flags, mode=mode)
+ try:
+ for chunk in data:
+ written = os.write(fd, chunk)
+ # write missing pieces if the write was interrupted
+ while written < len(chunk):
+ written = os.write(fd, chunk[written:])
+ finally:
+ os.close(fd)
+
+
+def consumev3(repo, fp) -> None:
"""Apply the contents from a version 3 streaming clone.
Data is read from an object that only needs to provide a ``read(size)``
@@ -1045,9 +1594,9 @@
# is fine, while this is really not fine.
if repo.vfs in vfsmap.values():
raise error.ProgrammingError(
- b'repo.vfs must not be added to vfsmap for security reasons'
+ 'repo.vfs must not be added to vfsmap for security reasons'
)
-
+ total_file_count = 0
with repo.transaction(b'clone'):
ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
with nested(*ctxs):
@@ -1056,6 +1605,7 @@
if filecount == 0:
if repo.ui.debugflag:
repo.ui.debug(b'entry with no files [%d]\n' % (i))
+ total_file_count += filecount
for i in range(filecount):
src = util.readexactly(fp, 1)
vfs = vfsmap[src]
@@ -1079,18 +1629,13 @@
# streamclone-ed file at next access
repo.invalidate(clearfilecache=True)
- elapsed = util.timer() - start
- if elapsed <= 0:
- elapsed = 0.001
- msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
- byte_count = util.bytecount(bytes_transferred)
- bytes_sec = util.bytecount(bytes_transferred / elapsed)
- msg %= (byte_count, elapsed, bytes_sec)
- repo.ui.status(msg)
progress.complete()
+ _report_transferred(repo, start, total_file_count, bytes_transferred)
-def applybundlev2(repo, fp, filecount, filesize, requirements):
+def applybundlev2(
+ repo, fp, filecount: int, filesize: int, requirements: Iterable[bytes]
+) -> None:
from . import localrepo
missingreqs = [r for r in requirements if r not in repo.supported]
@@ -1100,7 +1645,8 @@
% b', '.join(sorted(missingreqs))
)
- consumev2(repo, fp, filecount, filesize)
+ with util.nogc():
+ consumev2(repo, fp, filecount, filesize)
repo.requirements = new_stream_clone_requirements(
repo.requirements,
@@ -1113,7 +1659,7 @@
nodemap.post_stream_cleanup(repo)
-def applybundlev3(repo, fp, requirements):
+def applybundlev3(repo, fp, requirements: Iterable[bytes]) -> None:
from . import localrepo
missingreqs = [r for r in requirements if r not in repo.supported]
@@ -1135,14 +1681,14 @@
nodemap.post_stream_cleanup(repo)
-def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
+def _copy_files(src_vfs_map, dst_vfs_map, entries, progress) -> bool:
hardlink = [True]
def copy_used():
hardlink[0] = False
progress.topic = _(b'copying')
- for k, path in entries:
+ for k, path, optional in entries:
src_vfs = src_vfs_map[k]
dst_vfs = dst_vfs_map[k]
src_path = src_vfs.join(path)
@@ -1154,18 +1700,22 @@
util.makedirs(dirname)
dst_vfs.register_file(path)
# XXX we could use the #nb_bytes argument.
- util.copyfile(
- src_path,
- dst_path,
- hardlink=hardlink[0],
- no_hardlink_cb=copy_used,
- check_fs_hardlink=False,
- )
+ try:
+ util.copyfile(
+ src_path,
+ dst_path,
+ hardlink=hardlink[0],
+ no_hardlink_cb=copy_used,
+ check_fs_hardlink=False,
+ )
+ except FileNotFoundError:
+ if not optional:
+ raise
progress.increment()
return hardlink[0]
-def local_copy(src_repo, dest_repo):
+def local_copy(src_repo, dest_repo) -> None:
"""copy all content from one local repository to another
This is useful for local clone"""
@@ -1215,7 +1765,7 @@
# to the files while we do the clone, so this is not done yet. We
# could do this blindly when copying files.
files = [
- (vfs_key, f.unencoded_path)
+ (vfs_key, f.unencoded_path, f.optional)
for vfs_key, e in entries
for f in e.files()
]