comparison mercurial/commandserver.py @ 29544:024e8f82f3de

commandserver: add new forking server implemented without using SocketServer SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as: - race condition that leads to 100% CPU usage (Python 2.6) https://bugs.python.org/issue21491 - can't wait for children belonging to different process groups (Python 2.6) - leaves at least one zombie process (Python 2.6, 2.7) https://bugs.python.org/issue11109 The first two are critical because we do setpgid(0, 0) in child process to isolate terminal signals. The last one isn't, but ForkingMixIn seems to be doing silly. So there are two choices: a) backport and maintain SocketServer until we can drop support for Python 2.x b) replace SocketServer by simpler one and eliminate glue codes I chose (b) because it's great time for getting rid of utterly complicated SocketServer stuff, and preparing for future move towards prefork service. New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It is monolithic but much simpler than SocketServer. unixservicehandler provides customizing points for chg, and it will be shared with future prefork service. Old unixservice class is still used by chgserver. It will be removed later. Thanks to Jun Wu for investigating these issues.
author Yuya Nishihara <yuya@tcha.org>
date Sun, 22 May 2016 11:43:18 +0900
parents d74b8a4fde3b
children 9da1adc18639
comparison
equal deleted inserted replaced
29543:d74b8a4fde3b 29544:024e8f82f3de
9 9
10 import errno 10 import errno
11 import gc 11 import gc
12 import os 12 import os
13 import random 13 import random
14 import select
15 import signal
16 import socket
14 import struct 17 import struct
15 import sys 18 import sys
16 import traceback 19 import traceback
17 20
18 from .i18n import _ 21 from .i18n import _
383 if inst.errno != errno.EPIPE: 386 if inst.errno != errno.EPIPE:
384 raise 387 raise
385 # trigger __del__ since ForkingMixIn uses os._exit 388 # trigger __del__ since ForkingMixIn uses os._exit
386 gc.collect() 389 gc.collect()
387 390
391 class unixservicehandler(object):
392 """Set of pluggable operations for unix-mode services
393
394 Almost all methods except for createcmdserver() are called in the main
395 process. You can't pass mutable resource back from createcmdserver().
396 """
397
398 pollinterval = None
399
400 def __init__(self, ui):
401 self.ui = ui
402
403 def bindsocket(self, sock, address):
404 util.bindunixsocket(sock, address)
405
406 def unlinksocket(self, address):
407 os.unlink(address)
408
409 def printbanner(self, address):
410 self.ui.status(_('listening at %s\n') % address)
411 self.ui.flush() # avoid buffering of status message
412
413 def shouldexit(self):
414 """True if server should shut down; checked per pollinterval"""
415 return False
416
417 def newconnection(self):
418 """Called when main process notices new connection"""
419 pass
420
421 def createcmdserver(self, repo, conn, fin, fout):
422 """Create new command server instance; called in the process that
423 serves for the current connection"""
424 return server(self.ui, repo, fin, fout)
425
388 class _requesthandler(socketserver.BaseRequestHandler): 426 class _requesthandler(socketserver.BaseRequestHandler):
389 def handle(self): 427 def handle(self):
390 _serverequest(self.server.ui, self.server.repo, self.request, 428 _serverequest(self.server.ui, self.server.repo, self.request,
391 self._createcmdserver) 429 self._createcmdserver)
392 430
422 try: 460 try:
423 self.server.serve_forever() 461 self.server.serve_forever()
424 finally: 462 finally:
425 self._cleanup() 463 self._cleanup()
426 464
465 class unixforkingservice(unixservice):
466 def __init__(self, ui, repo, opts, handler=None):
467 super(unixforkingservice, self).__init__(ui, repo, opts)
468 self._servicehandler = handler or unixservicehandler(ui)
469 self._sock = None
470 self._oldsigchldhandler = None
471 self._workerpids = set() # updated by signal handler; do not iterate
472
473 def init(self):
474 self._sock = socket.socket(socket.AF_UNIX)
475 self._servicehandler.bindsocket(self._sock, self.address)
476 self._sock.listen(5)
477 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
478 self._oldsigchldhandler = o
479 self._servicehandler.printbanner(self.address)
480
481 def _cleanup(self):
482 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
483 self._sock.close()
484 self._servicehandler.unlinksocket(self.address)
485 # don't kill child processes as they have active clients, just wait
486 self._reapworkers(0)
487
488 def run(self):
489 try:
490 self._mainloop()
491 finally:
492 self._cleanup()
493
494 def _mainloop(self):
495 h = self._servicehandler
496 while not h.shouldexit():
497 try:
498 ready = select.select([self._sock], [], [], h.pollinterval)[0]
499 if not ready:
500 continue
501 conn, _addr = self._sock.accept()
502 except (select.error, socket.error) as inst:
503 if inst.args[0] == errno.EINTR:
504 continue
505 raise
506
507 pid = os.fork()
508 if pid:
509 try:
510 self.ui.debug('forked worker process (pid=%d)\n' % pid)
511 self._workerpids.add(pid)
512 h.newconnection()
513 finally:
514 conn.close() # release handle in parent process
515 else:
516 try:
517 self._serveworker(conn)
518 conn.close()
519 os._exit(0)
520 except: # never return, hence no re-raises
521 try:
522 self.ui.traceback(force=True)
523 finally:
524 os._exit(255)
525
526 def _sigchldhandler(self, signal, frame):
527 self._reapworkers(os.WNOHANG)
528
529 def _reapworkers(self, options):
530 while self._workerpids:
531 try:
532 pid, _status = os.waitpid(-1, options)
533 except OSError as inst:
534 if inst.errno == errno.EINTR:
535 continue
536 if inst.errno != errno.ECHILD:
537 raise
538 # no child processes at all (reaped by other waitpid()?)
539 self._workerpids.clear()
540 return
541 if pid == 0:
542 # no waitable child processes
543 return
544 self.ui.debug('worker process exited (pid=%d)\n' % pid)
545 self._workerpids.discard(pid)
546
547 def _serveworker(self, conn):
548 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
549 h = self._servicehandler
550 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
551
427 _servicemap = { 552 _servicemap = {
428 'pipe': pipeservice, 553 'pipe': pipeservice,
429 'unix': unixservice, 554 'unix': unixforkingservice,
430 } 555 }
431 556
432 def createservice(ui, repo, opts): 557 def createservice(ui, repo, opts):
433 mode = opts['cmdserver'] 558 mode = opts['cmdserver']
434 try: 559 try: