Mercurial > public > mercurial-scm > hg-stable
annotate mercurial/worker.py @ 35434:71427ff1dff8
workers: handling exceptions in windows workers
This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers.
If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received.
We don't have to join threads if is_alive() is false
Test Plan:
Ran multiple updates/enable/disable sparse profile and things worked well
Ran test on CentOS- all tests passing on @ passed here
Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P58642088
PS C:\open\<repo>> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile <profile>
updating [==> ] 1300/39166 1m57sException in thread Thread-3:
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner
self.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run
raise e
Exception: Forged exception
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner
self.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run
raise e
Exception: Forged exception
<...>
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in <module>
dispatch.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run
status = (dispatch(req) or 0) & 255
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch
ret = _runcatch(req)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch
return _callcatch(ui, _runcatchfunc)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch
return scmutil.callcatch(ui, func)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch
return func()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc
return _dispatch(req)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch
cmdpats, cmdoptions)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand
return orig(lui, repo, *args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper
result = orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand
return orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles
res = runcommand(lui, repo, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes
res = runcommand(lui, repo, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand
return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand
result = orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand
ret = _runcommand(ui, options, cmd, d)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand
return cmdfunc()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in <lambda>
d = lambda: util.checksignature(func)(ui, *args, **strcmdopt)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check
return func(*args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse
disableprofile=disableprofile, force=force)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config
len, _refresh(ui, repo, oldstatus, oldsparsematch, force))
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh
mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 311, in applyupdates
return orig(repo, actions, wctx, mctx, overwrite, labels=labels)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\merge.py", line 1464, in applyupdates
for i, item in prog:
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 286, in _windowsworker
raise t.exception
Exception: Forged exception
PS C:\open\ovrsource>
Differential Revision: https://phab.mercurial-scm.org/D1459
author | Wojciech Lis <wlis@fb.com> |
---|---|
date | Mon, 20 Nov 2017 10:27:41 -0800 |
parents | 02b36e860e0b |
children | 471918fa7f46 |
rev | line source |
---|---|
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
1 # worker.py - master-slave parallelism support |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
2 # |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
3 # Copyright 2013 Facebook, Inc. |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
4 # |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
5 # This software may be used and distributed according to the terms of the |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
6 # GNU General Public License version 2 or any later version. |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
7 |
25992
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
8 from __future__ import absolute_import |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
9 |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
10 import errno |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
11 import os |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
12 import signal |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
13 import sys |
35433
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
14 import threading |
25992
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
15 |
2d76f8a2d831
worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents:
25660
diff
changeset
|
16 from .i18n import _ |
30406 | 17 from . import ( |
30640
a150173da1c1
py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents:
30530
diff
changeset
|
18 encoding, |
30406 | 19 error, |
30644
d524c88511a7
py3: replace os.name with pycompat.osname (part 1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents:
30640
diff
changeset
|
20 pycompat, |
30530
86cd09bc13ba
worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents:
30434
diff
changeset
|
21 scmutil, |
30406 | 22 util, |
23 ) | |
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
24 |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
25 def countcpus(): |
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
26 '''try to count the number of CPUs on the system''' |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
27 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
28 # posix |
18635
fed06dd07665
worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff
changeset
|
29 try: |
32634
954489932c4f
py3: pass str in os.sysconf()
Pulkit Goyal <7895pulkit@gmail.com>
parents:
32166
diff
changeset
|
30 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN')) |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
31 if n > 0: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
32 return n |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
33 except (AttributeError, ValueError): |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
34 pass |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
35 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
36 # windows |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
37 try: |
30640
a150173da1c1
py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents:
30530
diff
changeset
|
38 n = int(encoding.environ['NUMBER_OF_PROCESSORS']) |
26568
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
39 if n > 0: |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
40 return n |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
41 except (KeyError, ValueError): |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
42 pass |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
43 |
c0501c26b05c
worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26063
diff
changeset
|
44 return 1 |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
45 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
46 def _numworkers(ui): |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
47 s = ui.config('worker', 'numcpus') |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
48 if s: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
49 try: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
50 n = int(s) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
51 if n >= 1: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
52 return n |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
53 except ValueError: |
26587
56b2bcea2529
error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents:
26568
diff
changeset
|
54 raise error.Abort(_('number of cpus must be an integer')) |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
55 return min(max(countcpus(), 4), 32) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
56 |
35433
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
57 if pycompat.isposix or pycompat.iswindows: |
18636
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
58 _startupcost = 0.01 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
59 else: |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
60 _startupcost = 1e30 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
61 |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
62 def worthwhile(ui, costperop, nops): |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
63 '''try to determine whether the benefit of multiple processes can |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
64 outweigh the cost of starting them''' |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
65 linear = costperop * nops |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
66 workers = _numworkers(ui) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
67 benefit = linear - (_startupcost * workers + linear / workers) |
dcb27c153a40
worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents:
18635
diff
changeset
|
68 return benefit >= 0.15 |
18637
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
69 |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
70 def worker(ui, costperarg, func, staticargs, args): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
71 '''run a function, possibly in parallel in multiple worker |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
72 processes. |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
73 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
74 returns a progress iterator |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
75 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
76 costperarg - cost of a single task |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
77 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
78 func - function to run |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
79 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
80 staticargs - arguments to pass to every invocation of the function |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
81 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
82 args - arguments to split into chunks, to pass to individual |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
83 workers |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
84 ''' |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
85 if worthwhile(ui, costperarg, len(args)): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
86 return _platformworker(ui, func, staticargs, args) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
87 return func(*staticargs + (args,)) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
88 |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
89 def _posixworker(ui, func, staticargs, args): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
90 rfd, wfd = os.pipe() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
91 workers = _numworkers(ui) |
18708
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
92 oldhandler = signal.getsignal(signal.SIGINT) |
86524a70c0f6
worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents:
18707
diff
changeset
|
93 signal.signal(signal.SIGINT, signal.SIG_IGN) |
30423 | 94 pids, problem = set(), [0] |
30420
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
95 def killworkers(): |
30432
237b2883cbd8
worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents:
30431
diff
changeset
|
96 # unregister SIGCHLD handler as all children will be killed. This |
237b2883cbd8
worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents:
30431
diff
changeset
|
97 # function shouldn't be interrupted by another SIGCHLD; otherwise pids |
237b2883cbd8
worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents:
30431
diff
changeset
|
98 # could be updated while iterating, which would cause inconsistency. |
237b2883cbd8
worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents:
30431
diff
changeset
|
99 signal.signal(signal.SIGCHLD, oldchldhandler) |
30420
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
100 # if one worker bails, there's no good reason to wait for the rest |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
101 for p in pids: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
102 try: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
103 os.kill(p, signal.SIGTERM) |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
104 except OSError as err: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
105 if err.errno != errno.ESRCH: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
106 raise |
30422
7bc25549e084
worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents:
30421
diff
changeset
|
107 def waitforworkers(blocking=True): |
30424
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
108 for pid in pids.copy(): |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
109 p = st = 0 |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
110 while True: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
111 try: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
112 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG)) |
30431
0e6ce6313e47
worker: fix missed break on successful waitpid()
Yuya Nishihara <yuya@tcha.org>
parents:
30426
diff
changeset
|
113 break |
30424
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
114 except OSError as e: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
115 if e.errno == errno.EINTR: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
116 continue |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
117 elif e.errno == errno.ECHILD: |
30434
03f7aa2bd0e3
worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents:
30433
diff
changeset
|
118 # child would already be reaped, but pids yet been |
03f7aa2bd0e3
worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents:
30433
diff
changeset
|
119 # updated (maybe interrupted just after waitpid) |
03f7aa2bd0e3
worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents:
30433
diff
changeset
|
120 pids.discard(pid) |
03f7aa2bd0e3
worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents:
30433
diff
changeset
|
121 break |
30424
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
122 else: |
5069a8a40b1b
worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents:
30423
diff
changeset
|
123 raise |
30878
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30644
diff
changeset
|
124 if not p: |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30644
diff
changeset
|
125 # skip subsequent steps, because child process should |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30644
diff
changeset
|
126 # be still running in this case |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30644
diff
changeset
|
127 continue |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30644
diff
changeset
|
128 pids.discard(p) |
18fb3cf572b4
worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents:
30644
diff
changeset
|
129 st = _exitstatus(st) |
30420
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
130 if st and not problem[0]: |
7a5d6e2fd2d5
worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents:
30406
diff
changeset
|
131 problem[0] = st |
30425
e8fb03cfbbde
worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents:
30424
diff
changeset
|
132 def sigchldhandler(signum, frame): |
e8fb03cfbbde
worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents:
30424
diff
changeset
|
133 waitforworkers(blocking=False) |
30433
f2d13eb85198
worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents:
30432
diff
changeset
|
134 if problem[0]: |
f2d13eb85198
worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents:
30432
diff
changeset
|
135 killworkers() |
30425
e8fb03cfbbde
worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents:
30424
diff
changeset
|
136 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
31701
9d3d56aa1a9f
worker: flush ui buffers before running the worker
David Soria Parra <davidsp@fb.com>
parents:
31134
diff
changeset
|
137 ui.flush() |
32166
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
138 parentpid = os.getpid() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
139 for pargs in partition(args, workers): |
32166
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
140 # make sure we use os._exit in all worker code paths. otherwise the |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
141 # worker may do some clean-ups which could cause surprises like |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
142 # deadlock. see sshpeer.cleanup for example. |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
143 # override error handling *before* fork. this is necessary because |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
144 # exception (signal) may arrive after fork, before "pid =" assignment |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
145 # completes, and other exception handler (dispatch.py) can lead to |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
146 # unexpected code path without os._exit. |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
147 ret = -1 |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
148 try: |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
149 pid = os.fork() |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
150 if pid == 0: |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
151 signal.signal(signal.SIGINT, oldhandler) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
152 signal.signal(signal.SIGCHLD, oldchldhandler) |
30530
86cd09bc13ba
worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents:
30434
diff
changeset
|
153 |
32166
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
154 def workerfunc(): |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
155 os.close(rfd) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
156 for i, item in func(*(staticargs + (pargs,))): |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
157 os.write(wfd, '%d %s\n' % (i, item)) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
158 return 0 |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
159 |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
160 ret = scmutil.callcatch(ui, workerfunc) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
161 except: # parent re-raises, child never returns |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
162 if os.getpid() == parentpid: |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
163 raise |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
164 exctype = sys.exc_info()[0] |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
165 force = not issubclass(exctype, KeyboardInterrupt) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
166 ui.traceback(force=force) |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
167 finally: |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
168 if os.getpid() != parentpid: |
30882
a91c62752d08
worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents:
30878
diff
changeset
|
169 try: |
a91c62752d08
worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents:
30878
diff
changeset
|
170 ui.flush() |
32166
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
171 except: # never returns, no re-raises |
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
172 pass |
30530
86cd09bc13ba
worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents:
30434
diff
changeset
|
173 finally: |
32166
31763785094b
worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents:
32043
diff
changeset
|
174 os._exit(ret & 255) |
30423 | 175 pids.add(pid) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
176 os.close(wfd) |
30944
48dea083f66d
py3: convert the mode argument of os.fdopen to unicodes (1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents:
30644
diff
changeset
|
177 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
178 def cleanup(): |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
179 signal.signal(signal.SIGINT, oldhandler) |
30426
c27614f2dec1
worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents:
30425
diff
changeset
|
180 waitforworkers() |
30425
e8fb03cfbbde
worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents:
30424
diff
changeset
|
181 signal.signal(signal.SIGCHLD, oldchldhandler) |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
182 status = problem[0] |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
183 if status: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
184 if status < 0: |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
185 os.kill(os.getpid(), -status) |
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
186 sys.exit(status) |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
187 try: |
30406 | 188 for line in util.iterfile(fp): |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
189 l = line.split(' ', 1) |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
190 yield int(l[0]), l[1][:-1] |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
191 except: # re-raises |
18709
9955fc5ee24b
worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents:
18708
diff
changeset
|
192 killworkers() |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
193 cleanup() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
194 raise |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
195 cleanup() |
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
196 |
18707
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
197 def _posixexitstatus(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
198 '''convert a posix exit status into the same form returned by |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
199 os.spawnv |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
200 |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
201 returns None if the process was stopped instead of exiting''' |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
202 if os.WIFEXITED(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
203 return os.WEXITSTATUS(code) |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
204 elif os.WIFSIGNALED(code): |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
205 return -os.WTERMSIG(code) |
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
206 |
35433
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
207 def _windowsworker(ui, func, staticargs, args): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
208 class Worker(threading.Thread): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
209 def __init__(self, taskqueue, resultqueue, func, staticargs, |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
210 group=None, target=None, name=None, verbose=None): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
211 threading.Thread.__init__(self, group=group, target=target, |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
212 name=name, verbose=verbose) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
213 self._taskqueue = taskqueue |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
214 self._resultqueue = resultqueue |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
215 self._func = func |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
216 self._staticargs = staticargs |
35434
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
217 self._interrupted = False |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
218 self.exception = None |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
219 |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
220 def interrupt(self): |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
221 self._interrupted = True |
35433
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
222 |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
223 def run(self): |
35434
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
224 try: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
225 while not self._taskqueue.empty(): |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
226 try: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
227 args = self._taskqueue.get_nowait() |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
228 for res in self._func(*self._staticargs + (args,)): |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
229 self._resultqueue.put(res) |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
230 # threading doesn't provide a native way to |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
231 # interrupt execution. handle it manually at every |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
232 # iteration. |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
233 if self._interrupted: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
234 return |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
235 except util.empty: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
236 break |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
237 except Exception as e: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
238 # store the exception such that the main thread can resurface |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
239 # it as if the func was running without workers. |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
240 self.exception = e |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
241 raise |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
242 |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
243 threads = [] |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
244 def killworkers(): |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
245 for t in threads: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
246 t.interrupt() |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
247 for t in threads: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
248 # try to let the threads handle interruption, but don't wait |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
249 # indefintely. the thread could be in infinite loop, handling |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
250 # a very long task or in a deadlock situation |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
251 t.join(5) |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
252 if t.is_alive(): |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
253 raise error.Abort(_('failed to join worker thread')) |
35433
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
254 |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
255 workers = _numworkers(ui) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
256 resultqueue = util.queue() |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
257 taskqueue = util.queue() |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
258 # partition work to more pieces than workers to minimize the chance |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
259 # of uneven distribution of large tasks between the workers |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
260 for pargs in partition(args, workers * 20): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
261 taskqueue.put(pargs) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
262 for _i in range(workers): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
263 t = Worker(taskqueue, resultqueue, func, staticargs) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
264 threads.append(t) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
265 t.start() |
35434
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
266 |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
267 while len(threads) > 0: |
35433
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
268 while not resultqueue.empty(): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
269 yield resultqueue.get() |
35434
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
270 threads[0].join(0.05) |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
271 finishedthreads = [_t for _t in threads if not _t.is_alive()] |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
272 for t in finishedthreads: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
273 if t.exception is not None: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
274 try: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
275 killworkers() |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
276 except Exception: |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
277 # pass over the workers joining failure. it is more |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
278 # important to surface the inital exception than the |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
279 # fact that one of workers may be processing a large |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
280 # task and does not get to handle the interruption. |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
281 ui.warn(_("failed to kill worker threads while handling " |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
282 "an exception")) |
71427ff1dff8
workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents:
35433
diff
changeset
|
283 raise t.exception |
35433
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
284 threads.remove(t) |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
285 while not resultqueue.empty(): |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
286 yield resultqueue.get() |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
287 |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
288 if pycompat.iswindows: |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
289 _platformworker = _windowsworker |
02b36e860e0b
workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents:
34646
diff
changeset
|
290 else: |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
291 _platformworker = _posixworker |
18707
d1a2b086d058
worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents:
18638
diff
changeset
|
292 _exitstatus = _posixexitstatus |
18638
047110c0e2a8
worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents:
18637
diff
changeset
|
293 |
18637
ac4dbceeb14a
worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents:
18636
diff
changeset
|
294 def partition(lst, nslices): |
28181
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
295 '''partition a list into N slices of roughly equal size |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
296 |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
297 The current strategy takes every Nth element from the input. If |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
298 we ever write workers that need to preserve grouping in input |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
299 we should consider allowing callers to specify a partition strategy. |
28292
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
300 |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
301 mpm is not a fan of this partitioning strategy when files are involved. |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
302 In his words: |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
303 |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
304 Single-threaded Mercurial makes a point of creating and visiting |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
305 files in a fixed order (alphabetical). When creating files in order, |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
306 a typical filesystem is likely to allocate them on nearby regions on |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
307 disk. Thus, when revisiting in the same order, locality is maximized |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
308 and various forms of OS and disk-level caching and read-ahead get a |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
309 chance to work. |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
310 |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
311 This effect can be quite significant on spinning disks. I discovered it |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
312 circa Mercurial v0.4 when revlogs were named by hashes of filenames. |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
313 Tarring a repo and copying it to another disk effectively randomized |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
314 the revlog ordering on disk by sorting the revlogs by hash and suddenly |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
315 performance of my kernel checkout benchmark dropped by ~10x because the |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
316 "working set" of sectors visited no longer fit in the drive's cache and |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
317 the workload switched from streaming to random I/O. |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
318 |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
319 What we should really be doing is have workers read filenames from a |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
320 ordered queue. This preserves locality and also keeps any worker from |
3eb7faf6d958
worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents:
28181
diff
changeset
|
321 getting more than one file out of balance. |
28181
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
322 ''' |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
323 for i in range(nslices): |
f8efc8a3a991
worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents:
26587
diff
changeset
|
324 yield lst[i::nslices] |