--- a/mercurial/streamclone.py Tue Oct 01 15:55:49 2024 +0200
+++ b/mercurial/streamclone.py Tue Oct 01 16:07:51 2024 +0200
@@ -567,53 +567,113 @@
class VolatileManager:
- """Manage temporary backup of volatile file during stream clone
+ """Manage temporary backups of volatile files during stream clone.
- This should be used as a Python context, the copies will be discarded when
- exiting the context.
+ This class will keep open file handles for the volatile files, writing the
+ smaller ones on disk if the number of open file handles grow too much.
- A copy can be done by calling the object on the real path (encoded full
- path)
+ This should be used as a Python context, the file handles and copies will
+ be discarded when exiting the context.
- The backup path can be retrieved using the __getitem__ protocol, obj[path].
- On file without backup, it will return the unmodified path. (equivalent to
- `dict.get(x, x)`)
+ The preservation can be done by calling the object on the real path
+ (encoded full path).
+
+ Valid filehandles for any file should be retrieved by calling `open(path)`.
"""
+ # arbitrarily picked as "it seemed fine" and much higher than the current
+ # usage.
+ MAX_OPEN = 100
+
def __init__(self):
+ self._counter = 0
+ self._volatile_fps = None
self._copies = None
self._dst_dir = None
def __enter__(self):
- if self._copies is not None:
- msg = "Copies context already open"
- raise error.ProgrammingError(msg)
+ if self._counter == 0:
+ assert self._volatile_fps is None
+ self._volatile_fps = {}
+ self._counter += 1
+ return self
+
+ def __exit__(self, *args, **kwars):
+ """discard all backups"""
+ self._counter -= 1
+ if self._counter == 0:
+ for _size, fp in self._volatile_fps.values():
+ fp.close()
+ self._volatile_fps = None
+ if self._copies is not None:
+ for tmp in self._copies.values():
+ util.tryunlink(tmp)
+ util.tryrmdir(self._dst_dir)
+ self._copies = None
+ self._dst_dir = None
+ assert self._volatile_fps is None
+ assert self._copies is None
+ assert self._dst_dir is None
+
+ def _init_tmp_copies(self):
+ """prepare a temporary directory to save volatile files
+
+ This will be used as backup if we have too many files open"""
+ assert 0 < self._counter
+ assert self._copies is None
+ assert self._dst_dir is None
self._copies = {}
self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
- return self
+
+ def _flush_some_on_disk(self):
+ """move some of the open files to tempory files on disk"""
+ if self._copies is None:
+ self._init_tmp_copies()
+ flush_count = self.MAX_OPEN // 2
+ for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]:
+ prefix = os.path.basename(src)
+ fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
+ self._copies[src] = dst
+ os.close(fd)
+ # we no longer hardlink, but on the other hand we rarely do this,
+ # and we do it for the smallest file only and not at all in the
+ # common case.
+ with open(dst, 'wb') as bck:
+ fp.seek(0)
+ bck.write(fp.read())
+ del self._volatile_fps[src]
+ fp.close()
+
+ def _keep_one(self, src):
+ """preserve an open file handle for a given path"""
+ # store the file quickly to ensure we close it if any error happens
+ _, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
+ fp.seek(0, os.SEEK_END)
+ size = fp.tell()
+ self._volatile_fps[src] = (size, fp)
def __call__(self, src):
- """create a backup of the file at src"""
- prefix = os.path.basename(src)
- fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
- os.close(fd)
- self._copies[src] = dst
- util.copyfiles(src, dst, hardlink=True)
- return dst
+ """preserve the volatile file at src"""
+ assert 0 < self._counter
+ if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
+ self._flush_some_on_disk()
+ self._keep_one(src)
@contextlib.contextmanager
def open(self, src):
- actual_path = self._copies.get(src, src)
- with open(actual_path, 'rb') as fp:
+ assert 0 < self._counter
+ entry = self._volatile_fps.get(src)
+ if entry is not None:
+ _size, fp = entry
+ fp.seek(0)
yield fp
-
- def __exit__(self, *args, **kwars):
- """discard all backups"""
- for tmp in self._copies.values():
- util.tryunlink(tmp)
- util.tryrmdir(self._dst_dir)
- self._copies = None
- self._dst_dir = None
+ else:
+ if self._copies is None:
+ actual_path = src
+ else:
+ actual_path = self._copies.get(src, src)
+ with open(actual_path, 'rb') as fp:
+ yield fp
def _makemap(repo):