--- a/mercurial/commandserver.py Thu Dec 13 23:20:28 2018 -0800
+++ b/mercurial/commandserver.py Wed Oct 31 22:19:03 2018 +0900
@@ -506,12 +506,19 @@
raise error.Abort(_('no socket path specified with --address'))
self._servicehandler = handler or unixservicehandler(ui)
self._sock = None
+ self._mainipc = None
+ self._workeripc = None
self._oldsigchldhandler = None
self._workerpids = set() # updated by signal handler; do not iterate
self._socketunlinked = None
def init(self):
self._sock = socket.socket(socket.AF_UNIX)
+ # IPC channel from many workers to one main process; this is actually
+ # a uni-directional pipe, but is backed by a DGRAM socket so each
+ # message can be easily separated.
+ o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
+ self._mainipc, self._workeripc = o
self._servicehandler.bindsocket(self._sock, self.address)
if util.safehasattr(procutil, 'unblocksignal'):
procutil.unblocksignal(signal.SIGCHLD)
@@ -527,6 +534,8 @@
def _cleanup(self):
signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
self._sock.close()
+ self._mainipc.close()
+ self._workeripc.close()
self._unlinksocket()
# don't kill child processes as they have active clients, just wait
self._reapworkers(0)
@@ -543,6 +552,8 @@
selector = selectors.DefaultSelector()
selector.register(self._sock, selectors.EVENT_READ,
self._acceptnewconnection)
+ selector.register(self._mainipc, selectors.EVENT_READ,
+ self._handlemainipc)
while True:
if not exiting and h.shouldexit():
# clients can no longer connect() to the domain socket, so
@@ -592,8 +603,10 @@
try:
selector.close()
sock.close()
+ self._mainipc.close()
self._runworker(conn)
conn.close()
+ self._workeripc.close()
os._exit(0)
except: # never return, hence no re-raises
try:
@@ -601,6 +614,17 @@
finally:
os._exit(255)
+ def _handlemainipc(self, sock, selector):
+ """Process messages sent from a worker"""
+ try:
+ path = sock.recv(32768) # large enough to receive path
+ except socket.error as inst:
+ if inst.args[0] == errno.EINTR:
+ return
+ raise
+
+ self.ui.log(b'cmdserver', b'repository: %s\n', path)
+
def _sigchldhandler(self, signal, frame):
self._reapworkers(os.WNOHANG)
@@ -628,6 +652,22 @@
h = self._servicehandler
try:
_serverequest(self.ui, self.repo, conn, h.createcmdserver,
- prereposetups=None) # TODO: pass in hook functions
+ prereposetups=[self._reposetup])
finally:
gc.collect() # trigger __del__ since worker process uses os._exit
+
+ def _reposetup(self, ui, repo):
+ if not repo.local():
+ return
+
+ class unixcmdserverrepo(repo.__class__):
+ def close(self):
+ super(unixcmdserverrepo, self).close()
+ try:
+ self._cmdserveripc.send(self.root)
+ except socket.error:
+ self.ui.log(b'cmdserver',
+ b'failed to send repo root to master\n')
+
+ repo.__class__ = unixcmdserverrepo
+ repo._cmdserveripc = self._workeripc