Mercurial > public > mercurial-scm > hg
comparison mercurial/worker.py @ 35428:71427ff1dff8
workers: handling exceptions in windows workers
This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers.
If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received.
We don't have to join threads if is_alive() is false
Test Plan:
Ran multiple updates/enable/disable sparse profile and things worked well
Ran test on CentOS- all tests passing on @ passed here
Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P58642088
PS C:\open\<repo>> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile <profile>
updating [==> ] 1300/39166 1m57sException in thread Thread-3:
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner
self.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run
raise e
Exception: Forged exception
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner
self.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run
raise e
Exception: Forged exception
<...>
Traceback (most recent call last):
File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in <module>
dispatch.run()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run
status = (dispatch(req) or 0) & 255
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch
ret = _runcatch(req)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch
return _callcatch(ui, _runcatchfunc)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch
return scmutil.callcatch(ui, func)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch
return func()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc
return _dispatch(req)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch
cmdpats, cmdoptions)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand
return orig(lui, repo, *args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper
result = orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand
return orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles
res = runcommand(lui, repo, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes
res = runcommand(lui, repo, *args)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand
return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand
result = orig(lui, repo, cmd, fullargs, *args)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand
ret = _runcommand(ui, options, cmd, d)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand
return cmdfunc()
File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in <lambda>
d = lambda: util.checksignature(func)(ui, *args, **strcmdopt)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check
return func(*args, **kwargs)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse
disableprofile=disableprofile, force=force)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config
len, _refresh(ui, repo, oldstatus, oldsparsematch, force))
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh
mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False)
File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 311, in applyupdates
return orig(repo, actions, wctx, mctx, overwrite, labels=labels)
File "C:\open\facebook-hg-rpms\build\hg\mercurial\merge.py", line 1464, in applyupdates
for i, item in prog:
File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 286, in _windowsworker
raise t.exception
Exception: Forged exception
PS C:\open\ovrsource>
Differential Revision: https://phab.mercurial-scm.org/D1459
author | Wojciech Lis <wlis@fb.com> |
---|---|
date | Mon, 20 Nov 2017 10:27:41 -0800 |
parents | 02b36e860e0b |
children | 471918fa7f46 |
comparison
equal
deleted
inserted
replaced
35427:02b36e860e0b | 35428:71427ff1dff8 |
---|---|
212 name=name, verbose=verbose) | 212 name=name, verbose=verbose) |
213 self._taskqueue = taskqueue | 213 self._taskqueue = taskqueue |
214 self._resultqueue = resultqueue | 214 self._resultqueue = resultqueue |
215 self._func = func | 215 self._func = func |
216 self._staticargs = staticargs | 216 self._staticargs = staticargs |
217 self._interrupted = False | |
218 self.exception = None | |
219 | |
220 def interrupt(self): | |
221 self._interrupted = True | |
217 | 222 |
218 def run(self): | 223 def run(self): |
219 while not self._taskqueue.empty(): | 224 try: |
220 try: | 225 while not self._taskqueue.empty(): |
221 args = self._taskqueue.get_nowait() | 226 try: |
222 for res in self._func(*self._staticargs + (args,)): | 227 args = self._taskqueue.get_nowait() |
223 self._resultqueue.put(res) | 228 for res in self._func(*self._staticargs + (args,)): |
224 except util.empty: | 229 self._resultqueue.put(res) |
225 break | 230 # threading doesn't provide a native way to |
231 # interrupt execution. handle it manually at every | |
232 # iteration. | |
233 if self._interrupted: | |
234 return | |
235 except util.empty: | |
236 break | |
237 except Exception as e: | |
238 # store the exception such that the main thread can resurface | |
239 # it as if the func was running without workers. | |
240 self.exception = e | |
241 raise | |
242 | |
243 threads = [] | |
244 def killworkers(): | |
245 for t in threads: | |
246 t.interrupt() | |
247 for t in threads: | |
248 # try to let the threads handle interruption, but don't wait | |
249 # indefintely. the thread could be in infinite loop, handling | |
250 # a very long task or in a deadlock situation | |
251 t.join(5) | |
252 if t.is_alive(): | |
253 raise error.Abort(_('failed to join worker thread')) | |
226 | 254 |
227 workers = _numworkers(ui) | 255 workers = _numworkers(ui) |
228 threads = [] | |
229 resultqueue = util.queue() | 256 resultqueue = util.queue() |
230 taskqueue = util.queue() | 257 taskqueue = util.queue() |
231 # partition work to more pieces than workers to minimize the chance | 258 # partition work to more pieces than workers to minimize the chance |
232 # of uneven distribution of large tasks between the workers | 259 # of uneven distribution of large tasks between the workers |
233 for pargs in partition(args, workers * 20): | 260 for pargs in partition(args, workers * 20): |
234 taskqueue.put(pargs) | 261 taskqueue.put(pargs) |
235 for _i in range(workers): | 262 for _i in range(workers): |
236 t = Worker(taskqueue, resultqueue, func, staticargs) | 263 t = Worker(taskqueue, resultqueue, func, staticargs) |
237 threads.append(t) | 264 threads.append(t) |
238 t.start() | 265 t.start() |
239 while any(t.is_alive() for t in threads): | 266 |
267 while len(threads) > 0: | |
240 while not resultqueue.empty(): | 268 while not resultqueue.empty(): |
241 yield resultqueue.get() | 269 yield resultqueue.get() |
242 t = threads[0] | 270 threads[0].join(0.05) |
243 t.join(0.05) | 271 finishedthreads = [_t for _t in threads if not _t.is_alive()] |
244 if not t.is_alive(): | 272 for t in finishedthreads: |
273 if t.exception is not None: | |
274 try: | |
275 killworkers() | |
276 except Exception: | |
277 # pass over the workers joining failure. it is more | |
278 # important to surface the inital exception than the | |
279 # fact that one of workers may be processing a large | |
280 # task and does not get to handle the interruption. | |
281 ui.warn(_("failed to kill worker threads while handling " | |
282 "an exception")) | |
283 raise t.exception | |
245 threads.remove(t) | 284 threads.remove(t) |
246 while not resultqueue.empty(): | 285 while not resultqueue.empty(): |
247 yield resultqueue.get() | 286 yield resultqueue.get() |
248 | 287 |
249 if pycompat.iswindows: | 288 if pycompat.iswindows: |