Mercurial > public > mercurial-scm > hg
comparison mercurial/worker.py @ 49545:3556f0392808 stable
lfs: avoid closing connections when the worker doesn't fork
Probably not much more than an minor optimization, but could be useful in the
case of `hg verify` where missing blobs are fetched one at a time.
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Tue, 18 Oct 2022 13:56:45 -0400 |
parents | d54b213c4380 |
children | 3eef8baf6b92 |
comparison
equal
deleted
inserted
replaced
49544:abf471862b8e | 49545:3556f0392808 |
---|---|
123 benefit = linear - (_STARTUP_COST * workers + linear / workers) | 123 benefit = linear - (_STARTUP_COST * workers + linear / workers) |
124 return benefit >= 0.15 | 124 return benefit >= 0.15 |
125 | 125 |
126 | 126 |
127 def worker( | 127 def worker( |
128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True | 128 ui, |
129 costperarg, | |
130 func, | |
131 staticargs, | |
132 args, | |
133 hasretval=False, | |
134 threadsafe=True, | |
135 prefork=None, | |
129 ): | 136 ): |
130 """run a function, possibly in parallel in multiple worker | 137 """run a function, possibly in parallel in multiple worker |
131 processes. | 138 processes. |
132 | 139 |
133 returns a progress iterator | 140 returns a progress iterator |
147 overlapping keys are a bad idea. | 154 overlapping keys are a bad idea. |
148 | 155 |
149 threadsafe - whether work items are thread safe and can be executed using | 156 threadsafe - whether work items are thread safe and can be executed using |
150 a thread-based worker. Should be disabled for CPU heavy tasks that don't | 157 a thread-based worker. Should be disabled for CPU heavy tasks that don't |
151 release the GIL. | 158 release the GIL. |
159 | |
160 prefork - a parameterless Callable that is invoked prior to forking the | |
161 process. fork() is only used on non-Windows platforms, but is also not | |
162 called on POSIX platforms if the work amount doesn't warrant a worker. | |
152 """ | 163 """ |
153 enabled = ui.configbool(b'worker', b'enabled') | 164 enabled = ui.configbool(b'worker', b'enabled') |
154 if enabled and _platformworker is _posixworker and not ismainthread(): | 165 if enabled and _platformworker is _posixworker and not ismainthread(): |
155 # The POSIX worker has to install a handler for SIGCHLD. | 166 # The POSIX worker has to install a handler for SIGCHLD. |
156 # Python up to 3.9 only allows this in the main thread. | 167 # Python up to 3.9 only allows this in the main thread. |
157 enabled = False | 168 enabled = False |
158 | 169 |
159 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | 170 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): |
160 return _platformworker(ui, func, staticargs, args, hasretval) | 171 return _platformworker( |
172 ui, func, staticargs, args, hasretval, prefork=prefork | |
173 ) | |
161 return func(*staticargs + (args,)) | 174 return func(*staticargs + (args,)) |
162 | 175 |
163 | 176 |
164 def _posixworker(ui, func, staticargs, args, hasretval): | 177 def _posixworker(ui, func, staticargs, args, hasretval, prefork=None): |
165 workers = _numworkers(ui) | 178 workers = _numworkers(ui) |
166 oldhandler = signal.getsignal(signal.SIGINT) | 179 oldhandler = signal.getsignal(signal.SIGINT) |
167 signal.signal(signal.SIGINT, signal.SIG_IGN) | 180 signal.signal(signal.SIGINT, signal.SIG_IGN) |
168 pids, problem = set(), [0] | 181 pids, problem = set(), [0] |
169 | 182 |
205 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) | 218 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
206 ui.flush() | 219 ui.flush() |
207 parentpid = os.getpid() | 220 parentpid = os.getpid() |
208 pipes = [] | 221 pipes = [] |
209 retval = {} | 222 retval = {} |
223 | |
224 if prefork: | |
225 prefork() | |
226 | |
210 for pargs in partition(args, min(workers, len(args))): | 227 for pargs in partition(args, min(workers, len(args))): |
211 # Every worker gets its own pipe to send results on, so we don't have to | 228 # Every worker gets its own pipe to send results on, so we don't have to |
212 # implement atomic writes larger than PIPE_BUF. Each forked process has | 229 # implement atomic writes larger than PIPE_BUF. Each forked process has |
213 # its own pipe's descriptors in the local variables, and the parent | 230 # its own pipe's descriptors in the local variables, and the parent |
214 # process has the full list of pipe descriptors (and it doesn't really | 231 # process has the full list of pipe descriptors (and it doesn't really |
314 return os.WEXITSTATUS(code) | 331 return os.WEXITSTATUS(code) |
315 elif os.WIFSIGNALED(code): | 332 elif os.WIFSIGNALED(code): |
316 return -(os.WTERMSIG(code)) | 333 return -(os.WTERMSIG(code)) |
317 | 334 |
318 | 335 |
319 def _windowsworker(ui, func, staticargs, args, hasretval): | 336 def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None): |
320 class Worker(threading.Thread): | 337 class Worker(threading.Thread): |
321 def __init__( | 338 def __init__( |
322 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs | 339 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs |
323 ): | 340 ): |
324 threading.Thread.__init__(self, *args, **kwargs) | 341 threading.Thread.__init__(self, *args, **kwargs) |