Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/commandserver.py @ 41008:042ed354b9eb
commandserver: add IPC channel to teach repository path on command finished
The idea is to load recently-used repositories first in the master process,
and fork(). The forked worker can reuse a warm repository if it's preloaded.
There are a couple of ways of in-memory repository caching. They have pros
and cons:
a. "preload by master"
pros: can use a single cache dict, maximizing cache hit rate
cons: need to reload a repo in master process (because worker process
dies per command)
b. "prefork"
pros: can cache a repo without reloading (as worker processes persist)
cons: lower cache hit rate since each worker has to maintain its own cache
c. "shared memory" (or separate key-value store server)
pros: no need to reload a repo in master process, ideally
cons: need to serialize objects to sharable form
Since my primary goal is to get rid of the cost of loading obsstore without
massive rewrites, (c) doesn't work. (b) isn't ideal since it would require
much more SDRAMs than (a). So I take (a).
The idea credits to Jun Wu.
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Wed, 31 Oct 2018 22:19:03 +0900 |
parents | 2525faf4ecdb |
children | dcac24ec935b |
comparison
equal
deleted
inserted
replaced
41007:038108a9811c | 41008:042ed354b9eb |
---|---|
504 raise error.Abort(_('unsupported platform')) | 504 raise error.Abort(_('unsupported platform')) |
505 if not self.address: | 505 if not self.address: |
506 raise error.Abort(_('no socket path specified with --address')) | 506 raise error.Abort(_('no socket path specified with --address')) |
507 self._servicehandler = handler or unixservicehandler(ui) | 507 self._servicehandler = handler or unixservicehandler(ui) |
508 self._sock = None | 508 self._sock = None |
509 self._mainipc = None | |
510 self._workeripc = None | |
509 self._oldsigchldhandler = None | 511 self._oldsigchldhandler = None |
510 self._workerpids = set() # updated by signal handler; do not iterate | 512 self._workerpids = set() # updated by signal handler; do not iterate |
511 self._socketunlinked = None | 513 self._socketunlinked = None |
512 | 514 |
513 def init(self): | 515 def init(self): |
514 self._sock = socket.socket(socket.AF_UNIX) | 516 self._sock = socket.socket(socket.AF_UNIX) |
517 # IPC channel from many workers to one main process; this is actually | |
518 # a uni-directional pipe, but is backed by a DGRAM socket so each | |
519 # message can be easily separated. | |
520 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) | |
521 self._mainipc, self._workeripc = o | |
515 self._servicehandler.bindsocket(self._sock, self.address) | 522 self._servicehandler.bindsocket(self._sock, self.address) |
516 if util.safehasattr(procutil, 'unblocksignal'): | 523 if util.safehasattr(procutil, 'unblocksignal'): |
517 procutil.unblocksignal(signal.SIGCHLD) | 524 procutil.unblocksignal(signal.SIGCHLD) |
518 o = signal.signal(signal.SIGCHLD, self._sigchldhandler) | 525 o = signal.signal(signal.SIGCHLD, self._sigchldhandler) |
519 self._oldsigchldhandler = o | 526 self._oldsigchldhandler = o |
525 self._socketunlinked = True | 532 self._socketunlinked = True |
526 | 533 |
527 def _cleanup(self): | 534 def _cleanup(self): |
528 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) | 535 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) |
529 self._sock.close() | 536 self._sock.close() |
537 self._mainipc.close() | |
538 self._workeripc.close() | |
530 self._unlinksocket() | 539 self._unlinksocket() |
531 # don't kill child processes as they have active clients, just wait | 540 # don't kill child processes as they have active clients, just wait |
532 self._reapworkers(0) | 541 self._reapworkers(0) |
533 | 542 |
534 def run(self): | 543 def run(self): |
541 exiting = False | 550 exiting = False |
542 h = self._servicehandler | 551 h = self._servicehandler |
543 selector = selectors.DefaultSelector() | 552 selector = selectors.DefaultSelector() |
544 selector.register(self._sock, selectors.EVENT_READ, | 553 selector.register(self._sock, selectors.EVENT_READ, |
545 self._acceptnewconnection) | 554 self._acceptnewconnection) |
555 selector.register(self._mainipc, selectors.EVENT_READ, | |
556 self._handlemainipc) | |
546 while True: | 557 while True: |
547 if not exiting and h.shouldexit(): | 558 if not exiting and h.shouldexit(): |
548 # clients can no longer connect() to the domain socket, so | 559 # clients can no longer connect() to the domain socket, so |
549 # we stop queuing new requests. | 560 # we stop queuing new requests. |
550 # for requests that are queued (connect()-ed, but haven't been | 561 # for requests that are queued (connect()-ed, but haven't been |
590 conn.close() # release handle in parent process | 601 conn.close() # release handle in parent process |
591 else: | 602 else: |
592 try: | 603 try: |
593 selector.close() | 604 selector.close() |
594 sock.close() | 605 sock.close() |
606 self._mainipc.close() | |
595 self._runworker(conn) | 607 self._runworker(conn) |
596 conn.close() | 608 conn.close() |
609 self._workeripc.close() | |
597 os._exit(0) | 610 os._exit(0) |
598 except: # never return, hence no re-raises | 611 except: # never return, hence no re-raises |
599 try: | 612 try: |
600 self.ui.traceback(force=True) | 613 self.ui.traceback(force=True) |
601 finally: | 614 finally: |
602 os._exit(255) | 615 os._exit(255) |
616 | |
617 def _handlemainipc(self, sock, selector): | |
618 """Process messages sent from a worker""" | |
619 try: | |
620 path = sock.recv(32768) # large enough to receive path | |
621 except socket.error as inst: | |
622 if inst.args[0] == errno.EINTR: | |
623 return | |
624 raise | |
625 | |
626 self.ui.log(b'cmdserver', b'repository: %s\n', path) | |
603 | 627 |
604 def _sigchldhandler(self, signal, frame): | 628 def _sigchldhandler(self, signal, frame): |
605 self._reapworkers(os.WNOHANG) | 629 self._reapworkers(os.WNOHANG) |
606 | 630 |
607 def _reapworkers(self, options): | 631 def _reapworkers(self, options): |
626 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) | 650 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) |
627 _initworkerprocess() | 651 _initworkerprocess() |
628 h = self._servicehandler | 652 h = self._servicehandler |
629 try: | 653 try: |
630 _serverequest(self.ui, self.repo, conn, h.createcmdserver, | 654 _serverequest(self.ui, self.repo, conn, h.createcmdserver, |
631 prereposetups=None) # TODO: pass in hook functions | 655 prereposetups=[self._reposetup]) |
632 finally: | 656 finally: |
633 gc.collect() # trigger __del__ since worker process uses os._exit | 657 gc.collect() # trigger __del__ since worker process uses os._exit |
658 | |
659 def _reposetup(self, ui, repo): | |
660 if not repo.local(): | |
661 return | |
662 | |
663 class unixcmdserverrepo(repo.__class__): | |
664 def close(self): | |
665 super(unixcmdserverrepo, self).close() | |
666 try: | |
667 self._cmdserveripc.send(self.root) | |
668 except socket.error: | |
669 self.ui.log(b'cmdserver', | |
670 b'failed to send repo root to master\n') | |
671 | |
672 repo.__class__ = unixcmdserverrepo | |
673 repo._cmdserveripc = self._workeripc |