comparison mercurial/commandserver.py @ 41009:dcac24ec935b

commandserver: preload repository in master server and reuse its file cache This greatly speeds up repository operation with lots of obsolete markers: $ ls -lh .hg/store/obsstore -rw-r--r-- 1 yuya yuya 21M Dec 2 17:55 .hg/store/obsstore $ time hg log -G -l10 --pager no (hg) 1.79s user 0.13s system 99% cpu 1.919 total (chg uncached) 0.00s user 0.01s system 0% cpu 1.328 total (chg cached) 0.00s user 0.00s system 3% cpu 0.180 total As you can see, the implementation of the preloader function is highly experimental. It works, but I'm yet to be sure how things can be organized. So I don't want to formalize the API at this point.
author Yuya Nishihara <yuya@tcha.org>
date Wed, 31 Oct 2018 22:43:08 +0900
parents 042ed354b9eb
children b0e3f2d7c143
comparison
equal deleted inserted replaced
41008:042ed354b9eb 41009:dcac24ec935b
26 from . import ( 26 from . import (
27 encoding, 27 encoding,
28 error, 28 error,
29 loggingutil, 29 loggingutil,
30 pycompat, 30 pycompat,
31 repocache,
31 util, 32 util,
32 vfs as vfsmod, 33 vfs as vfsmod,
33 ) 34 )
34 from .utils import ( 35 from .utils import (
35 cborutil, 36 cborutil,
509 self._mainipc = None 510 self._mainipc = None
510 self._workeripc = None 511 self._workeripc = None
511 self._oldsigchldhandler = None 512 self._oldsigchldhandler = None
512 self._workerpids = set() # updated by signal handler; do not iterate 513 self._workerpids = set() # updated by signal handler; do not iterate
513 self._socketunlinked = None 514 self._socketunlinked = None
515 # experimental config: cmdserver.max-repo-cache
516 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
517 if maxlen < 0:
518 raise error.Abort(_('negative max-repo-cache size not allowed'))
519 self._repoloader = repocache.repoloader(ui, maxlen)
514 520
515 def init(self): 521 def init(self):
516 self._sock = socket.socket(socket.AF_UNIX) 522 self._sock = socket.socket(socket.AF_UNIX)
517 # IPC channel from many workers to one main process; this is actually 523 # IPC channel from many workers to one main process; this is actually
518 # a uni-directional pipe, but is backed by a DGRAM socket so each 524 # a uni-directional pipe, but is backed by a DGRAM socket so each
523 if util.safehasattr(procutil, 'unblocksignal'): 529 if util.safehasattr(procutil, 'unblocksignal'):
524 procutil.unblocksignal(signal.SIGCHLD) 530 procutil.unblocksignal(signal.SIGCHLD)
525 o = signal.signal(signal.SIGCHLD, self._sigchldhandler) 531 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
526 self._oldsigchldhandler = o 532 self._oldsigchldhandler = o
527 self._socketunlinked = False 533 self._socketunlinked = False
534 self._repoloader.start()
528 535
529 def _unlinksocket(self): 536 def _unlinksocket(self):
530 if not self._socketunlinked: 537 if not self._socketunlinked:
531 self._servicehandler.unlinksocket(self.address) 538 self._servicehandler.unlinksocket(self.address)
532 self._socketunlinked = True 539 self._socketunlinked = True
535 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) 542 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
536 self._sock.close() 543 self._sock.close()
537 self._mainipc.close() 544 self._mainipc.close()
538 self._workeripc.close() 545 self._workeripc.close()
539 self._unlinksocket() 546 self._unlinksocket()
547 self._repoloader.stop()
540 # don't kill child processes as they have active clients, just wait 548 # don't kill child processes as they have active clients, just wait
541 self._reapworkers(0) 549 self._reapworkers(0)
542 550
543 def run(self): 551 def run(self):
544 try: 552 try:
588 except socket.error as inst: 596 except socket.error as inst:
589 if inst.args[0] == errno.EINTR: 597 if inst.args[0] == errno.EINTR:
590 return 598 return
591 raise 599 raise
592 600
601 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
602 # to prevent COW memory from being touched by GC.
603 # https://instagram-engineering.com/
604 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
593 pid = os.fork() 605 pid = os.fork()
594 if pid: 606 if pid:
595 try: 607 try:
596 self.ui.log(b'cmdserver', b'forked worker process (pid=%d)\n', 608 self.ui.log(b'cmdserver', b'forked worker process (pid=%d)\n',
597 pid) 609 pid)
620 path = sock.recv(32768) # large enough to receive path 632 path = sock.recv(32768) # large enough to receive path
621 except socket.error as inst: 633 except socket.error as inst:
622 if inst.args[0] == errno.EINTR: 634 if inst.args[0] == errno.EINTR:
623 return 635 return
624 raise 636 raise
625 637 self._repoloader.load(path)
626 self.ui.log(b'cmdserver', b'repository: %s\n', path)
627 638
628 def _sigchldhandler(self, signal, frame): 639 def _sigchldhandler(self, signal, frame):
629 self._reapworkers(os.WNOHANG) 640 self._reapworkers(os.WNOHANG)
630 641
631 def _reapworkers(self, options): 642 def _reapworkers(self, options):
669 self.ui.log(b'cmdserver', 680 self.ui.log(b'cmdserver',
670 b'failed to send repo root to master\n') 681 b'failed to send repo root to master\n')
671 682
672 repo.__class__ = unixcmdserverrepo 683 repo.__class__ = unixcmdserverrepo
673 repo._cmdserveripc = self._workeripc 684 repo._cmdserveripc = self._workeripc
685
686 cachedrepo = self._repoloader.get(repo.root)
687 if cachedrepo is None:
688 return
689 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
690 repocache.copycache(cachedrepo, repo)