diff -r 038108a9811c -r 042ed354b9eb mercurial/commandserver.py --- a/mercurial/commandserver.py Thu Dec 13 23:20:28 2018 -0800 +++ b/mercurial/commandserver.py Wed Oct 31 22:19:03 2018 +0900 @@ -506,12 +506,19 @@ raise error.Abort(_('no socket path specified with --address')) self._servicehandler = handler or unixservicehandler(ui) self._sock = None + self._mainipc = None + self._workeripc = None self._oldsigchldhandler = None self._workerpids = set() # updated by signal handler; do not iterate self._socketunlinked = None def init(self): self._sock = socket.socket(socket.AF_UNIX) + # IPC channel from many workers to one main process; this is actually + # a uni-directional pipe, but is backed by a DGRAM socket so each + # message can be easily separated. + o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) + self._mainipc, self._workeripc = o self._servicehandler.bindsocket(self._sock, self.address) if util.safehasattr(procutil, 'unblocksignal'): procutil.unblocksignal(signal.SIGCHLD) @@ -527,6 +534,8 @@ def _cleanup(self): signal.signal(signal.SIGCHLD, self._oldsigchldhandler) self._sock.close() + self._mainipc.close() + self._workeripc.close() self._unlinksocket() # don't kill child processes as they have active clients, just wait self._reapworkers(0) @@ -543,6 +552,8 @@ selector = selectors.DefaultSelector() selector.register(self._sock, selectors.EVENT_READ, self._acceptnewconnection) + selector.register(self._mainipc, selectors.EVENT_READ, + self._handlemainipc) while True: if not exiting and h.shouldexit(): # clients can no longer connect() to the domain socket, so @@ -592,8 +603,10 @@ try: selector.close() sock.close() + self._mainipc.close() self._runworker(conn) conn.close() + self._workeripc.close() os._exit(0) except: # never return, hence no re-raises try: @@ -601,6 +614,17 @@ finally: os._exit(255) + def _handlemainipc(self, sock, selector): + """Process messages sent from a worker""" + try: + path = sock.recv(32768) # large enough to receive path + except socket.error as inst: + if inst.args[0] == errno.EINTR: + return + raise + + self.ui.log(b'cmdserver', b'repository: %s\n', path) + def _sigchldhandler(self, signal, frame): self._reapworkers(os.WNOHANG) @@ -628,6 +652,22 @@ h = self._servicehandler try: _serverequest(self.ui, self.repo, conn, h.createcmdserver, - prereposetups=None) # TODO: pass in hook functions + prereposetups=[self._reposetup]) finally: gc.collect() # trigger __del__ since worker process uses os._exit + + def _reposetup(self, ui, repo): + if not repo.local(): + return + + class unixcmdserverrepo(repo.__class__): + def close(self): + super(unixcmdserverrepo, self).close() + try: + self._cmdserveripc.send(self.root) + except socket.error: + self.ui.log(b'cmdserver', + b'failed to send repo root to master\n') + + repo.__class__ = unixcmdserverrepo + repo._cmdserveripc = self._workeripc