Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/commandserver.py @ 29544:024e8f82f3de
commandserver: add new forking server implemented without using SocketServer
SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as:
- race condition that leads to 100% CPU usage (Python 2.6)
https://bugs.python.org/issue21491
- can't wait for children belonging to different process groups (Python 2.6)
- leaves at least one zombie process (Python 2.6, 2.7)
https://bugs.python.org/issue11109
The first two are critical because we do setpgid(0, 0) in child process to
isolate terminal signals. The last one isn't, but ForkingMixIn seems to be
doing silly. So there are two choices:
a) backport and maintain SocketServer until we can drop support for Python 2.x
b) replace SocketServer by simpler one and eliminate glue codes
I chose (b) because it's great time for getting rid of utterly complicated
SocketServer stuff, and preparing for future move towards prefork service.
New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It
is monolithic but much simpler than SocketServer. unixservicehandler provides
customizing points for chg, and it will be shared with future prefork service.
Old unixservice class is still used by chgserver. It will be removed later.
Thanks to Jun Wu for investigating these issues.
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Sun, 22 May 2016 11:43:18 +0900 |
parents | d74b8a4fde3b |
children | 9da1adc18639 |
comparison
equal
deleted
inserted
replaced
29543:d74b8a4fde3b | 29544:024e8f82f3de |
---|---|
9 | 9 |
10 import errno | 10 import errno |
11 import gc | 11 import gc |
12 import os | 12 import os |
13 import random | 13 import random |
14 import select | |
15 import signal | |
16 import socket | |
14 import struct | 17 import struct |
15 import sys | 18 import sys |
16 import traceback | 19 import traceback |
17 | 20 |
18 from .i18n import _ | 21 from .i18n import _ |
383 if inst.errno != errno.EPIPE: | 386 if inst.errno != errno.EPIPE: |
384 raise | 387 raise |
385 # trigger __del__ since ForkingMixIn uses os._exit | 388 # trigger __del__ since ForkingMixIn uses os._exit |
386 gc.collect() | 389 gc.collect() |
387 | 390 |
391 class unixservicehandler(object): | |
392 """Set of pluggable operations for unix-mode services | |
393 | |
394 Almost all methods except for createcmdserver() are called in the main | |
395 process. You can't pass mutable resource back from createcmdserver(). | |
396 """ | |
397 | |
398 pollinterval = None | |
399 | |
400 def __init__(self, ui): | |
401 self.ui = ui | |
402 | |
403 def bindsocket(self, sock, address): | |
404 util.bindunixsocket(sock, address) | |
405 | |
406 def unlinksocket(self, address): | |
407 os.unlink(address) | |
408 | |
409 def printbanner(self, address): | |
410 self.ui.status(_('listening at %s\n') % address) | |
411 self.ui.flush() # avoid buffering of status message | |
412 | |
413 def shouldexit(self): | |
414 """True if server should shut down; checked per pollinterval""" | |
415 return False | |
416 | |
417 def newconnection(self): | |
418 """Called when main process notices new connection""" | |
419 pass | |
420 | |
421 def createcmdserver(self, repo, conn, fin, fout): | |
422 """Create new command server instance; called in the process that | |
423 serves for the current connection""" | |
424 return server(self.ui, repo, fin, fout) | |
425 | |
388 class _requesthandler(socketserver.BaseRequestHandler): | 426 class _requesthandler(socketserver.BaseRequestHandler): |
389 def handle(self): | 427 def handle(self): |
390 _serverequest(self.server.ui, self.server.repo, self.request, | 428 _serverequest(self.server.ui, self.server.repo, self.request, |
391 self._createcmdserver) | 429 self._createcmdserver) |
392 | 430 |
422 try: | 460 try: |
423 self.server.serve_forever() | 461 self.server.serve_forever() |
424 finally: | 462 finally: |
425 self._cleanup() | 463 self._cleanup() |
426 | 464 |
465 class unixforkingservice(unixservice): | |
466 def __init__(self, ui, repo, opts, handler=None): | |
467 super(unixforkingservice, self).__init__(ui, repo, opts) | |
468 self._servicehandler = handler or unixservicehandler(ui) | |
469 self._sock = None | |
470 self._oldsigchldhandler = None | |
471 self._workerpids = set() # updated by signal handler; do not iterate | |
472 | |
473 def init(self): | |
474 self._sock = socket.socket(socket.AF_UNIX) | |
475 self._servicehandler.bindsocket(self._sock, self.address) | |
476 self._sock.listen(5) | |
477 o = signal.signal(signal.SIGCHLD, self._sigchldhandler) | |
478 self._oldsigchldhandler = o | |
479 self._servicehandler.printbanner(self.address) | |
480 | |
481 def _cleanup(self): | |
482 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) | |
483 self._sock.close() | |
484 self._servicehandler.unlinksocket(self.address) | |
485 # don't kill child processes as they have active clients, just wait | |
486 self._reapworkers(0) | |
487 | |
488 def run(self): | |
489 try: | |
490 self._mainloop() | |
491 finally: | |
492 self._cleanup() | |
493 | |
494 def _mainloop(self): | |
495 h = self._servicehandler | |
496 while not h.shouldexit(): | |
497 try: | |
498 ready = select.select([self._sock], [], [], h.pollinterval)[0] | |
499 if not ready: | |
500 continue | |
501 conn, _addr = self._sock.accept() | |
502 except (select.error, socket.error) as inst: | |
503 if inst.args[0] == errno.EINTR: | |
504 continue | |
505 raise | |
506 | |
507 pid = os.fork() | |
508 if pid: | |
509 try: | |
510 self.ui.debug('forked worker process (pid=%d)\n' % pid) | |
511 self._workerpids.add(pid) | |
512 h.newconnection() | |
513 finally: | |
514 conn.close() # release handle in parent process | |
515 else: | |
516 try: | |
517 self._serveworker(conn) | |
518 conn.close() | |
519 os._exit(0) | |
520 except: # never return, hence no re-raises | |
521 try: | |
522 self.ui.traceback(force=True) | |
523 finally: | |
524 os._exit(255) | |
525 | |
526 def _sigchldhandler(self, signal, frame): | |
527 self._reapworkers(os.WNOHANG) | |
528 | |
529 def _reapworkers(self, options): | |
530 while self._workerpids: | |
531 try: | |
532 pid, _status = os.waitpid(-1, options) | |
533 except OSError as inst: | |
534 if inst.errno == errno.EINTR: | |
535 continue | |
536 if inst.errno != errno.ECHILD: | |
537 raise | |
538 # no child processes at all (reaped by other waitpid()?) | |
539 self._workerpids.clear() | |
540 return | |
541 if pid == 0: | |
542 # no waitable child processes | |
543 return | |
544 self.ui.debug('worker process exited (pid=%d)\n' % pid) | |
545 self._workerpids.discard(pid) | |
546 | |
547 def _serveworker(self, conn): | |
548 signal.signal(signal.SIGCHLD, self._oldsigchldhandler) | |
549 h = self._servicehandler | |
550 _serverequest(self.ui, self.repo, conn, h.createcmdserver) | |
551 | |
427 _servicemap = { | 552 _servicemap = { |
428 'pipe': pipeservice, | 553 'pipe': pipeservice, |
429 'unix': unixservice, | 554 'unix': unixforkingservice, |
430 } | 555 } |
431 | 556 |
432 def createservice(ui, repo, opts): | 557 def createservice(ui, repo, opts): |
433 mode = opts['cmdserver'] | 558 mode = opts['cmdserver'] |
434 try: | 559 try: |