annotate mercurial/worker.py @ 35434:71427ff1dff8

workers: handling exceptions in windows workers This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers. If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received. We don't have to join threads if is_alive() is false Test Plan: Ran multiple updates/enable/disable sparse profile and things worked well Ran test on CentOS- all tests passing on @ passed here Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P58642088 PS C:\open\<repo>> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile <profile> updating [==> ] 1300/39166 1m57sException in thread Thread-3: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception Exception in thread Thread-2: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception <...> Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in <module> dispatch.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run status = (dispatch(req) or 0) & 255 File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch ret = _runcatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch return _callcatch(ui, _runcatchfunc) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch return scmutil.callcatch(ui, func) File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch return func() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc return _dispatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch cmdpats, cmdoptions) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand return orig(lui, repo, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand return orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand ret = _runcommand(ui, options, cmd, d) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand return cmdfunc() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in <lambda> d = lambda: util.checksignature(func)(ui, *args, **strcmdopt) File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check return func(*args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse disableprofile=disableprofile, force=force) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config len, _refresh(ui, repo, oldstatus, oldsparsematch, force)) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 311, in applyupdates return orig(repo, actions, wctx, mctx, overwrite, labels=labels) File "C:\open\facebook-hg-rpms\build\hg\mercurial\merge.py", line 1464, in applyupdates for i, item in prog: File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 286, in _windowsworker raise t.exception Exception: Forged exception PS C:\open\ovrsource> Differential Revision: https://phab.mercurial-scm.org/D1459
author Wojciech Lis <wlis@fb.com>
date Mon, 20 Nov 2017 10:27:41 -0800
parents 02b36e860e0b
children 471918fa7f46
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
1 # worker.py - master-slave parallelism support
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
2 #
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
3 # Copyright 2013 Facebook, Inc.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
4 #
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
5 # This software may be used and distributed according to the terms of the
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
6 # GNU General Public License version 2 or any later version.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
7
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
8 from __future__ import absolute_import
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
9
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
10 import errno
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
11 import os
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
12 import signal
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
13 import sys
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
14 import threading
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
15
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
16 from .i18n import _
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
17 from . import (
30640
a150173da1c1 py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30530
diff changeset
18 encoding,
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
19 error,
30644
d524c88511a7 py3: replace os.name with pycompat.osname (part 1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30640
diff changeset
20 pycompat,
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
21 scmutil,
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
22 util,
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
23 )
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
24
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
25 def countcpus():
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
26 '''try to count the number of CPUs on the system'''
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
27
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
28 # posix
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
29 try:
32634
954489932c4f py3: pass str in os.sysconf()
Pulkit Goyal <7895pulkit@gmail.com>
parents: 32166
diff changeset
30 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
31 if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
32 return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
33 except (AttributeError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
34 pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
35
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
36 # windows
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
37 try:
30640
a150173da1c1 py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30530
diff changeset
38 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
39 if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
40 return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
41 except (KeyError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
42 pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
43
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
44 return 1
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
45
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
46 def _numworkers(ui):
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
47 s = ui.config('worker', 'numcpus')
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
48 if s:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
49 try:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
50 n = int(s)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
51 if n >= 1:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
52 return n
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
53 except ValueError:
26587
56b2bcea2529 error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents: 26568
diff changeset
54 raise error.Abort(_('number of cpus must be an integer'))
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
55 return min(max(countcpus(), 4), 32)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
56
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
57 if pycompat.isposix or pycompat.iswindows:
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
58 _startupcost = 0.01
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
59 else:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
60 _startupcost = 1e30
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
61
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
62 def worthwhile(ui, costperop, nops):
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
63 '''try to determine whether the benefit of multiple processes can
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
64 outweigh the cost of starting them'''
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
65 linear = costperop * nops
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
66 workers = _numworkers(ui)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
67 benefit = linear - (_startupcost * workers + linear / workers)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
68 return benefit >= 0.15
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
69
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
70 def worker(ui, costperarg, func, staticargs, args):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
71 '''run a function, possibly in parallel in multiple worker
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
72 processes.
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
73
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
74 returns a progress iterator
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
75
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
76 costperarg - cost of a single task
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
77
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
78 func - function to run
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
79
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
80 staticargs - arguments to pass to every invocation of the function
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
81
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
82 args - arguments to split into chunks, to pass to individual
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
83 workers
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
84 '''
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
85 if worthwhile(ui, costperarg, len(args)):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
86 return _platformworker(ui, func, staticargs, args)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
87 return func(*staticargs + (args,))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
88
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
89 def _posixworker(ui, func, staticargs, args):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
90 rfd, wfd = os.pipe()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
91 workers = _numworkers(ui)
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
92 oldhandler = signal.getsignal(signal.SIGINT)
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
93 signal.signal(signal.SIGINT, signal.SIG_IGN)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
94 pids, problem = set(), [0]
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
95 def killworkers():
30432
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
96 # unregister SIGCHLD handler as all children will be killed. This
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
97 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
98 # could be updated while iterating, which would cause inconsistency.
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
99 signal.signal(signal.SIGCHLD, oldchldhandler)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
100 # if one worker bails, there's no good reason to wait for the rest
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
101 for p in pids:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
102 try:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
103 os.kill(p, signal.SIGTERM)
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
104 except OSError as err:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
105 if err.errno != errno.ESRCH:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
106 raise
30422
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30421
diff changeset
107 def waitforworkers(blocking=True):
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
108 for pid in pids.copy():
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
109 p = st = 0
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
110 while True:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
111 try:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
112 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
30431
0e6ce6313e47 worker: fix missed break on successful waitpid()
Yuya Nishihara <yuya@tcha.org>
parents: 30426
diff changeset
113 break
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
114 except OSError as e:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
115 if e.errno == errno.EINTR:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
116 continue
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
117 elif e.errno == errno.ECHILD:
30434
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
118 # child would already be reaped, but pids yet been
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
119 # updated (maybe interrupted just after waitpid)
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
120 pids.discard(pid)
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
121 break
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
122 else:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
123 raise
30878
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
124 if not p:
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
125 # skip subsequent steps, because child process should
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
126 # be still running in this case
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
127 continue
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
128 pids.discard(p)
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
129 st = _exitstatus(st)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
130 if st and not problem[0]:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
131 problem[0] = st
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
132 def sigchldhandler(signum, frame):
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
133 waitforworkers(blocking=False)
30433
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30432
diff changeset
134 if problem[0]:
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30432
diff changeset
135 killworkers()
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
136 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
31701
9d3d56aa1a9f worker: flush ui buffers before running the worker
David Soria Parra <davidsp@fb.com>
parents: 31134
diff changeset
137 ui.flush()
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
138 parentpid = os.getpid()
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
139 for pargs in partition(args, workers):
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
140 # make sure we use os._exit in all worker code paths. otherwise the
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
141 # worker may do some clean-ups which could cause surprises like
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
142 # deadlock. see sshpeer.cleanup for example.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
143 # override error handling *before* fork. this is necessary because
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
144 # exception (signal) may arrive after fork, before "pid =" assignment
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
145 # completes, and other exception handler (dispatch.py) can lead to
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
146 # unexpected code path without os._exit.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
147 ret = -1
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
148 try:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
149 pid = os.fork()
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
150 if pid == 0:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
151 signal.signal(signal.SIGINT, oldhandler)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
152 signal.signal(signal.SIGCHLD, oldchldhandler)
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
153
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
154 def workerfunc():
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
155 os.close(rfd)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
156 for i, item in func(*(staticargs + (pargs,))):
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
157 os.write(wfd, '%d %s\n' % (i, item))
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
158 return 0
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
159
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
160 ret = scmutil.callcatch(ui, workerfunc)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
161 except: # parent re-raises, child never returns
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
162 if os.getpid() == parentpid:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
163 raise
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
164 exctype = sys.exc_info()[0]
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
165 force = not issubclass(exctype, KeyboardInterrupt)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
166 ui.traceback(force=force)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
167 finally:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
168 if os.getpid() != parentpid:
30882
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 30878
diff changeset
169 try:
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 30878
diff changeset
170 ui.flush()
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
171 except: # never returns, no re-raises
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
172 pass
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
173 finally:
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
174 os._exit(ret & 255)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
175 pids.add(pid)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
176 os.close(wfd)
30944
48dea083f66d py3: convert the mode argument of os.fdopen to unicodes (1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30644
diff changeset
177 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
178 def cleanup():
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
179 signal.signal(signal.SIGINT, oldhandler)
30426
c27614f2dec1 worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents: 30425
diff changeset
180 waitforworkers()
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
181 signal.signal(signal.SIGCHLD, oldchldhandler)
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
182 status = problem[0]
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
183 if status:
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
184 if status < 0:
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
185 os.kill(os.getpid(), -status)
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
186 sys.exit(status)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
187 try:
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
188 for line in util.iterfile(fp):
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
189 l = line.split(' ', 1)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
190 yield int(l[0]), l[1][:-1]
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
191 except: # re-raises
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
192 killworkers()
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
193 cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
194 raise
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
195 cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
196
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
197 def _posixexitstatus(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
198 '''convert a posix exit status into the same form returned by
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
199 os.spawnv
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
200
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
201 returns None if the process was stopped instead of exiting'''
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
202 if os.WIFEXITED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
203 return os.WEXITSTATUS(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
204 elif os.WIFSIGNALED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
205 return -os.WTERMSIG(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
206
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
207 def _windowsworker(ui, func, staticargs, args):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
208 class Worker(threading.Thread):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
209 def __init__(self, taskqueue, resultqueue, func, staticargs,
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
210 group=None, target=None, name=None, verbose=None):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
211 threading.Thread.__init__(self, group=group, target=target,
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
212 name=name, verbose=verbose)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
213 self._taskqueue = taskqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
214 self._resultqueue = resultqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
215 self._func = func
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
216 self._staticargs = staticargs
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
217 self._interrupted = False
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
218 self.exception = None
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
219
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
220 def interrupt(self):
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
221 self._interrupted = True
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
222
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
223 def run(self):
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
224 try:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
225 while not self._taskqueue.empty():
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
226 try:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
227 args = self._taskqueue.get_nowait()
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
228 for res in self._func(*self._staticargs + (args,)):
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
229 self._resultqueue.put(res)
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
230 # threading doesn't provide a native way to
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
231 # interrupt execution. handle it manually at every
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
232 # iteration.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
233 if self._interrupted:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
234 return
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
235 except util.empty:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
236 break
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
237 except Exception as e:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
238 # store the exception such that the main thread can resurface
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
239 # it as if the func was running without workers.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
240 self.exception = e
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
241 raise
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
242
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
243 threads = []
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
244 def killworkers():
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
245 for t in threads:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
246 t.interrupt()
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
247 for t in threads:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
248 # try to let the threads handle interruption, but don't wait
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
249 # indefintely. the thread could be in infinite loop, handling
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
250 # a very long task or in a deadlock situation
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
251 t.join(5)
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
252 if t.is_alive():
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
253 raise error.Abort(_('failed to join worker thread'))
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
254
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
255 workers = _numworkers(ui)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
256 resultqueue = util.queue()
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
257 taskqueue = util.queue()
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
258 # partition work to more pieces than workers to minimize the chance
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
259 # of uneven distribution of large tasks between the workers
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
260 for pargs in partition(args, workers * 20):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
261 taskqueue.put(pargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
262 for _i in range(workers):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
263 t = Worker(taskqueue, resultqueue, func, staticargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
264 threads.append(t)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
265 t.start()
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
266
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
267 while len(threads) > 0:
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
268 while not resultqueue.empty():
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
269 yield resultqueue.get()
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
270 threads[0].join(0.05)
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
271 finishedthreads = [_t for _t in threads if not _t.is_alive()]
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
272 for t in finishedthreads:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
273 if t.exception is not None:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
274 try:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
275 killworkers()
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
276 except Exception:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
277 # pass over the workers joining failure. it is more
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
278 # important to surface the inital exception than the
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
279 # fact that one of workers may be processing a large
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
280 # task and does not get to handle the interruption.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
281 ui.warn(_("failed to kill worker threads while handling "
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
282 "an exception"))
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
283 raise t.exception
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
284 threads.remove(t)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
285 while not resultqueue.empty():
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
286 yield resultqueue.get()
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
287
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
288 if pycompat.iswindows:
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
289 _platformworker = _windowsworker
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
290 else:
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
291 _platformworker = _posixworker
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
292 _exitstatus = _posixexitstatus
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
293
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
294 def partition(lst, nslices):
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
295 '''partition a list into N slices of roughly equal size
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
296
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
297 The current strategy takes every Nth element from the input. If
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
298 we ever write workers that need to preserve grouping in input
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
299 we should consider allowing callers to specify a partition strategy.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
300
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
301 mpm is not a fan of this partitioning strategy when files are involved.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
302 In his words:
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
303
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
304 Single-threaded Mercurial makes a point of creating and visiting
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
305 files in a fixed order (alphabetical). When creating files in order,
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
306 a typical filesystem is likely to allocate them on nearby regions on
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
307 disk. Thus, when revisiting in the same order, locality is maximized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
308 and various forms of OS and disk-level caching and read-ahead get a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
309 chance to work.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
310
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
311 This effect can be quite significant on spinning disks. I discovered it
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
312 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
313 Tarring a repo and copying it to another disk effectively randomized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
314 the revlog ordering on disk by sorting the revlogs by hash and suddenly
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
315 performance of my kernel checkout benchmark dropped by ~10x because the
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
316 "working set" of sectors visited no longer fit in the drive's cache and
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
317 the workload switched from streaming to random I/O.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
318
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
319 What we should really be doing is have workers read filenames from a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
320 ordered queue. This preserves locality and also keeps any worker from
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
321 getting more than one file out of balance.
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
322 '''
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
323 for i in range(nslices):
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
324 yield lst[i::nslices]