Mercurial > public > mercurial-scm > hg-stable
diff mercurial/worker.py @ 43076:2372284d9457
formatting: blacken the codebase
This is using my patch to black
(https://github.com/psf/black/pull/826) so we don't un-wrap collection
literals.
Done with:
hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S
# skip-blame mass-reformatting only
# no-check-commit reformats foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D6971
author | Augie Fackler <augie@google.com> |
---|---|
date | Sun, 06 Oct 2019 09:45:02 -0400 |
parents | d29db0a0c4eb |
children | 687b865b95ad |
line wrap: on
line diff
--- a/mercurial/worker.py Sat Oct 05 10:29:34 2019 -0400 +++ b/mercurial/worker.py Sun Oct 06 09:45:02 2019 -0400 @@ -16,6 +16,7 @@ try: import selectors + selectors.BaseSelector except ImportError: from .thirdparty import selectors2 as selectors @@ -29,6 +30,7 @@ util, ) + def countcpus(): '''try to count the number of CPUs on the system''' @@ -50,6 +52,7 @@ return 1 + def _numworkers(ui): s = ui.config('worker', 'numcpus') if s: @@ -61,6 +64,7 @@ raise error.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) + if pycompat.isposix or pycompat.iswindows: _STARTUP_COST = 0.01 # The Windows worker is thread based. If tasks are CPU bound, threads @@ -71,6 +75,7 @@ _STARTUP_COST = 1e30 _DISALLOW_THREAD_UNSAFE = False + def worthwhile(ui, costperop, nops, threadsafe=True): '''try to determine whether the benefit of multiple processes can outweigh the cost of starting them''' @@ -83,8 +88,10 @@ benefit = linear - (_STARTUP_COST * workers + linear / workers) return benefit >= 0.15 -def worker(ui, costperarg, func, staticargs, args, hasretval=False, - threadsafe=True): + +def worker( + ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True +): '''run a function, possibly in parallel in multiple worker processes. @@ -113,11 +120,13 @@ return _platformworker(ui, func, staticargs, args, hasretval) return func(*staticargs + (args,)) + def _posixworker(ui, func, staticargs, args, hasretval): workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) pids, problem = set(), [0] + def killworkers(): # unregister SIGCHLD handler as all children will be killed. This # function shouldn't be interrupted by another SIGCHLD; otherwise pids @@ -130,6 +139,7 @@ except OSError as err: if err.errno != errno.ESRCH: raise + def waitforworkers(blocking=True): for pid in pids.copy(): p = st = 0 @@ -155,10 +165,12 @@ st = _exitstatus(st) if st and not problem[0]: problem[0] = st + def sigchldhandler(signum, frame): waitforworkers(blocking=False) if problem[0]: killworkers() + oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() parentpid = os.getpid() @@ -196,7 +208,7 @@ return 0 ret = scmutil.callcatch(ui, workerfunc) - except: # parent re-raises, child never returns + except: # parent re-raises, child never returns if os.getpid() == parentpid: raise exctype = sys.exc_info()[0] @@ -206,7 +218,7 @@ if os.getpid() != parentpid: try: ui.flush() - except: # never returns, no re-raises + except: # never returns, no re-raises pass finally: os._exit(ret & 255) @@ -215,12 +227,14 @@ for rfd, wfd in pipes: os.close(wfd) selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) + def cleanup(): signal.signal(signal.SIGINT, oldhandler) waitforworkers() signal.signal(signal.SIGCHLD, oldchldhandler) selector.close() return problem[0] + try: openpipes = len(pipes) while openpipes > 0: @@ -239,7 +253,7 @@ if e.errno == errno.EINTR: continue raise - except: # re-raises + except: # re-raises killworkers() cleanup() raise @@ -251,6 +265,7 @@ if hasretval: yield True, retval + def _posixexitstatus(code): '''convert a posix exit status into the same form returned by os.spawnv @@ -259,12 +274,14 @@ if os.WIFEXITED(code): return os.WEXITSTATUS(code) elif os.WIFSIGNALED(code): - return -os.WTERMSIG(code) + return -(os.WTERMSIG(code)) + def _windowsworker(ui, func, staticargs, args, hasretval): class Worker(threading.Thread): - def __init__(self, taskqueue, resultqueue, func, staticargs, *args, - **kwargs): + def __init__( + self, taskqueue, resultqueue, func, staticargs, *args, **kwargs + ): threading.Thread.__init__(self, *args, **kwargs) self._taskqueue = taskqueue self._resultqueue = resultqueue @@ -298,6 +315,7 @@ raise threads = [] + def trykillworkers(): # Allow up to 1 second to clean worker threads nicely cleanupend = time.time() + 1 @@ -311,8 +329,12 @@ # important to surface the inital exception than the # fact that one of workers may be processing a large # task and does not get to handle the interruption. - ui.warn(_("failed to kill worker threads while " - "handling an exception\n")) + ui.warn( + _( + "failed to kill worker threads while " + "handling an exception\n" + ) + ) return workers = _numworkers(ui) @@ -341,7 +363,7 @@ if t.exception is not None: raise t.exception threads.remove(t) - except (Exception, KeyboardInterrupt): # re-raises + except (Exception, KeyboardInterrupt): # re-raises trykillworkers() raise while not resultqueue.empty(): @@ -353,12 +375,14 @@ if hasretval: yield True, retval + if pycompat.iswindows: _platformworker = _windowsworker else: _platformworker = _posixworker _exitstatus = _posixexitstatus + def partition(lst, nslices): '''partition a list into N slices of roughly equal size