annotate mercurial/worker.py @ 30432:237b2883cbd8

worker: make sure killworkers() never be interrupted by another SIGCHLD killworkers() iterates over pids, which can be updated by SIGCHLD handler. So we should either copy pids or prevent killworkers() from being interrupted by SIGCHLD. I chose the latter as it is simpler and can make pids handling more consistent. This fixes a possible "set changed size during iteration" error at killworkers() before cleanup().
author Yuya Nishihara <yuya@tcha.org>
date Thu, 17 Nov 2016 20:44:05 +0900
parents 0e6ce6313e47
children f2d13eb85198
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
1 # worker.py - master-slave parallelism support
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
2 #
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
3 # Copyright 2013 Facebook, Inc.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
4 #
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
5 # This software may be used and distributed according to the terms of the
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
6 # GNU General Public License version 2 or any later version.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
7
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
8 from __future__ import absolute_import
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
9
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
10 import errno
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
11 import os
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
12 import signal
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
13 import sys
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
14
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
15 from .i18n import _
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
16 from . import (
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
17 error,
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
18 util,
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
19 )
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
20
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
21 def countcpus():
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
22 '''try to count the number of CPUs on the system'''
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
23
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
24 # posix
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
25 try:
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
27 if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
28 return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
29 except (AttributeError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
30 pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
31
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
32 # windows
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
33 try:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
35 if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
36 return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
37 except (KeyError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
38 pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
39
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
40 return 1
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
41
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
42 def _numworkers(ui):
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
43 s = ui.config('worker', 'numcpus')
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
44 if s:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
45 try:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
46 n = int(s)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
47 if n >= 1:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
48 return n
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
49 except ValueError:
26587
56b2bcea2529 error: get Abort from 'error' instead of 'util'
Pierre-Yves David <pierre-yves.david@fb.com>
parents: 26568
diff changeset
50 raise error.Abort(_('number of cpus must be an integer'))
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
51 return min(max(countcpus(), 4), 32)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
52
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
53 if os.name == 'posix':
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
54 _startupcost = 0.01
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
55 else:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
56 _startupcost = 1e30
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
57
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
58 def worthwhile(ui, costperop, nops):
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
59 '''try to determine whether the benefit of multiple processes can
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
60 outweigh the cost of starting them'''
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
61 linear = costperop * nops
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
62 workers = _numworkers(ui)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
63 benefit = linear - (_startupcost * workers + linear / workers)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
64 return benefit >= 0.15
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
65
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
66 def worker(ui, costperarg, func, staticargs, args):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
67 '''run a function, possibly in parallel in multiple worker
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
68 processes.
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
69
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
70 returns a progress iterator
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
71
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
72 costperarg - cost of a single task
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
73
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
74 func - function to run
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
75
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
76 staticargs - arguments to pass to every invocation of the function
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
77
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
78 args - arguments to split into chunks, to pass to individual
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
79 workers
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
80 '''
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
81 if worthwhile(ui, costperarg, len(args)):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
82 return _platformworker(ui, func, staticargs, args)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
83 return func(*staticargs + (args,))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
84
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
85 def _posixworker(ui, func, staticargs, args):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
86 rfd, wfd = os.pipe()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
87 workers = _numworkers(ui)
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
88 oldhandler = signal.getsignal(signal.SIGINT)
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
90 pids, problem = set(), [0]
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
91 def killworkers():
30432
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
92 # unregister SIGCHLD handler as all children will be killed. This
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
94 # could be updated while iterating, which would cause inconsistency.
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
95 signal.signal(signal.SIGCHLD, oldchldhandler)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
96 # if one worker bails, there's no good reason to wait for the rest
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
97 for p in pids:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
98 try:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
99 os.kill(p, signal.SIGTERM)
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
100 except OSError as err:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
101 if err.errno != errno.ESRCH:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
102 raise
30422
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30421
diff changeset
103 def waitforworkers(blocking=True):
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
104 for pid in pids.copy():
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
105 p = st = 0
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
106 while True:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
107 try:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
30431
0e6ce6313e47 worker: fix missed break on successful waitpid()
Yuya Nishihara <yuya@tcha.org>
parents: 30426
diff changeset
109 break
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
110 except OSError as e:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
111 if e.errno == errno.EINTR:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
112 continue
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
113 elif e.errno == errno.ECHILD:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
114 break # ignore ECHILD
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
115 else:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
116 raise
30422
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30421
diff changeset
117 if p:
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
118 pids.remove(p)
30422
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30421
diff changeset
119 st = _exitstatus(st)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
120 if st and not problem[0]:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
121 problem[0] = st
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
122 killworkers()
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
123 def sigchldhandler(signum, frame):
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
124 waitforworkers(blocking=False)
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
125 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
126 for pargs in partition(args, workers):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
127 pid = os.fork()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
128 if pid == 0:
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
129 signal.signal(signal.SIGINT, oldhandler)
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
130 signal.signal(signal.SIGCHLD, oldchldhandler)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
131 try:
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
132 os.close(rfd)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
133 for i, item in func(*(staticargs + (pargs,))):
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
134 os.write(wfd, '%d %s\n' % (i, item))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
135 os._exit(0)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
136 except KeyboardInterrupt:
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
137 os._exit(255)
19408
c7ec39c1a381 worker: properly report errors from worker processes (issue3982)
Matt Mackall <mpm@selenic.com>
parents: 19406
diff changeset
138 # other exceptions are allowed to propagate, we rely
c7ec39c1a381 worker: properly report errors from worker processes (issue3982)
Matt Mackall <mpm@selenic.com>
parents: 19406
diff changeset
139 # on lock.py's pid checks to avoid release callbacks
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
140 pids.add(pid)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
141 os.close(wfd)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
142 fp = os.fdopen(rfd, 'rb', 0)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
143 def cleanup():
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
144 signal.signal(signal.SIGINT, oldhandler)
30426
c27614f2dec1 worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents: 30425
diff changeset
145 waitforworkers()
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
146 signal.signal(signal.SIGCHLD, oldchldhandler)
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
147 status = problem[0]
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
148 if status:
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
149 if status < 0:
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
150 os.kill(os.getpid(), -status)
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
151 sys.exit(status)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
152 try:
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
153 for line in util.iterfile(fp):
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
154 l = line.split(' ', 1)
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
155 yield int(l[0]), l[1][:-1]
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
156 except: # re-raises
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
157 killworkers()
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
158 cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
159 raise
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
160 cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
161
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
162 def _posixexitstatus(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
163 '''convert a posix exit status into the same form returned by
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
164 os.spawnv
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
165
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
166 returns None if the process was stopped instead of exiting'''
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
167 if os.WIFEXITED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
168 return os.WEXITSTATUS(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
169 elif os.WIFSIGNALED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
170 return -os.WTERMSIG(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
171
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
172 if os.name != 'nt':
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
173 _platformworker = _posixworker
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
174 _exitstatus = _posixexitstatus
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
175
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
176 def partition(lst, nslices):
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
177 '''partition a list into N slices of roughly equal size
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
178
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
179 The current strategy takes every Nth element from the input. If
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
180 we ever write workers that need to preserve grouping in input
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
181 we should consider allowing callers to specify a partition strategy.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
182
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
183 mpm is not a fan of this partitioning strategy when files are involved.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
184 In his words:
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
185
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
186 Single-threaded Mercurial makes a point of creating and visiting
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
187 files in a fixed order (alphabetical). When creating files in order,
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
188 a typical filesystem is likely to allocate them on nearby regions on
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
189 disk. Thus, when revisiting in the same order, locality is maximized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
190 and various forms of OS and disk-level caching and read-ahead get a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
191 chance to work.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
192
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
193 This effect can be quite significant on spinning disks. I discovered it
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
194 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
195 Tarring a repo and copying it to another disk effectively randomized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
196 the revlog ordering on disk by sorting the revlogs by hash and suddenly
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
197 performance of my kernel checkout benchmark dropped by ~10x because the
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
198 "working set" of sectors visited no longer fit in the drive's cache and
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
199 the workload switched from streaming to random I/O.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
200
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
201 What we should really be doing is have workers read filenames from a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
202 ordered queue. This preserves locality and also keeps any worker from
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
203 getting more than one file out of balance.
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
204 '''
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
205 for i in range(nslices):
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
206 yield lst[i::nslices]