Mercurial > public > mercurial-scm > hg
diff mercurial/repocache.py @ 40999:dcac24ec935b
commandserver: preload repository in master server and reuse its file cache
This greatly speeds up repository operation with lots of obsolete markers:
$ ls -lh .hg/store/obsstore
-rw-r--r-- 1 yuya yuya 21M Dec 2 17:55 .hg/store/obsstore
$ time hg log -G -l10 --pager no
(hg) 1.79s user 0.13s system 99% cpu 1.919 total
(chg uncached) 0.00s user 0.01s system 0% cpu 1.328 total
(chg cached) 0.00s user 0.00s system 3% cpu 0.180 total
As you can see, the implementation of the preloader function is highly
experimental. It works, but I'm yet to be sure how things can be organized.
So I don't want to formalize the API at this point.
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Wed, 31 Oct 2018 22:43:08 +0900 |
parents | |
children | 2372284d9457 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mercurial/repocache.py Wed Oct 31 22:43:08 2018 +0900 @@ -0,0 +1,131 @@ +# repocache.py - in-memory repository cache for long-running services +# +# Copyright 2018 Yuya Nishihara <yuya@tcha.org> +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +from __future__ import absolute_import + +import collections +import gc +import threading + +from . import ( + error, + hg, + obsolete, + scmutil, + util, +) + +class repoloader(object): + """Load repositories in background thread + + This is designed for a forking server. A cached repo cannot be obtained + until the server fork()s a worker and the loader thread stops. + """ + + def __init__(self, ui, maxlen): + self._ui = ui.copy() + self._cache = util.lrucachedict(max=maxlen) + # use deque and Event instead of Queue since deque can discard + # old items to keep at most maxlen items. + self._inqueue = collections.deque(maxlen=maxlen) + self._accepting = False + self._newentry = threading.Event() + self._thread = None + + def start(self): + assert not self._thread + if self._inqueue.maxlen == 0: + # no need to spawn loader thread as the cache is disabled + return + self._accepting = True + self._thread = threading.Thread(target=self._mainloop) + self._thread.start() + + def stop(self): + if not self._thread: + return + self._accepting = False + self._newentry.set() + self._thread.join() + self._thread = None + self._cache.clear() + self._inqueue.clear() + + def load(self, path): + """Request to load the specified repository in background""" + self._inqueue.append(path) + self._newentry.set() + + def get(self, path): + """Return a cached repo if available + + This function must be called after fork(), where the loader thread + is stopped. Otherwise, the returned repo might be updated by the + loader thread. + """ + if self._thread and self._thread.is_alive(): + raise error.ProgrammingError(b'cannot obtain cached repo while ' + b'loader is active') + return self._cache.peek(path, None) + + def _mainloop(self): + while self._accepting: + # Avoid heavy GC after fork(), which would cancel the benefit of + # COW. We assume that GIL is acquired while GC is underway in the + # loader thread. If that isn't true, we might have to move + # gc.collect() to the main thread so that fork() would never stop + # the thread where GC is in progress. + gc.collect() + + self._newentry.wait() + while self._accepting: + self._newentry.clear() + try: + path = self._inqueue.popleft() + except IndexError: + break + scmutil.callcatch(self._ui, lambda: self._load(path)) + + def _load(self, path): + start = util.timer() + # TODO: repo should be recreated if storage configuration changed + try: + # pop before loading so inconsistent state wouldn't be exposed + repo = self._cache.pop(path) + except KeyError: + repo = hg.repository(self._ui, path).unfiltered() + _warmupcache(repo) + repo.ui.log(b'repocache', b'loaded repo into cache: %s (in %.3fs)\n', + path, util.timer() - start) + self._cache.insert(path, repo) + +# TODO: think about proper API of preloading cache +def _warmupcache(repo): + repo.invalidateall() + repo.changelog + repo.obsstore._all + repo.obsstore.successors + repo.obsstore.predecessors + repo.obsstore.children + for name in obsolete.cachefuncs: + obsolete.getrevs(repo, name) + repo._phasecache.loadphaserevs(repo) + +# TODO: think about proper API of attaching preloaded attributes +def copycache(srcrepo, destrepo): + """Copy cached attributes from srcrepo to destrepo""" + destfilecache = destrepo._filecache + srcfilecache = srcrepo._filecache + if 'changelog' in srcfilecache: + destfilecache['changelog'] = ce = srcfilecache['changelog'] + ce.obj.opener = ce.obj._realopener = destrepo.svfs + if 'obsstore' in srcfilecache: + destfilecache['obsstore'] = ce = srcfilecache['obsstore'] + ce.obj.svfs = destrepo.svfs + if '_phasecache' in srcfilecache: + destfilecache['_phasecache'] = ce = srcfilecache['_phasecache'] + ce.obj.opener = destrepo.svfs