--- a/mercurial/worker.py Mon Nov 20 10:25:29 2017 -0800
+++ b/mercurial/worker.py Mon Nov 20 10:27:41 2017 -0800
@@ -214,18 +214,45 @@
self._resultqueue = resultqueue
self._func = func
self._staticargs = staticargs
+ self._interrupted = False
+ self.exception = None
+
+ def interrupt(self):
+ self._interrupted = True
def run(self):
- while not self._taskqueue.empty():
- try:
- args = self._taskqueue.get_nowait()
- for res in self._func(*self._staticargs + (args,)):
- self._resultqueue.put(res)
- except util.empty:
- break
+ try:
+ while not self._taskqueue.empty():
+ try:
+ args = self._taskqueue.get_nowait()
+ for res in self._func(*self._staticargs + (args,)):
+ self._resultqueue.put(res)
+ # threading doesn't provide a native way to
+ # interrupt execution. handle it manually at every
+ # iteration.
+ if self._interrupted:
+ return
+ except util.empty:
+ break
+ except Exception as e:
+ # store the exception such that the main thread can resurface
+ # it as if the func was running without workers.
+ self.exception = e
+ raise
+
+ threads = []
+ def killworkers():
+ for t in threads:
+ t.interrupt()
+ for t in threads:
+ # try to let the threads handle interruption, but don't wait
+ # indefintely. the thread could be in infinite loop, handling
+ # a very long task or in a deadlock situation
+ t.join(5)
+ if t.is_alive():
+ raise error.Abort(_('failed to join worker thread'))
workers = _numworkers(ui)
- threads = []
resultqueue = util.queue()
taskqueue = util.queue()
# partition work to more pieces than workers to minimize the chance
@@ -236,12 +263,24 @@
t = Worker(taskqueue, resultqueue, func, staticargs)
threads.append(t)
t.start()
- while any(t.is_alive() for t in threads):
+
+ while len(threads) > 0:
while not resultqueue.empty():
yield resultqueue.get()
- t = threads[0]
- t.join(0.05)
- if not t.is_alive():
+ threads[0].join(0.05)
+ finishedthreads = [_t for _t in threads if not _t.is_alive()]
+ for t in finishedthreads:
+ if t.exception is not None:
+ try:
+ killworkers()
+ except Exception:
+ # pass over the workers joining failure. it is more
+ # important to surface the inital exception than the
+ # fact that one of workers may be processing a large
+ # task and does not get to handle the interruption.
+ ui.warn(_("failed to kill worker threads while handling "
+ "an exception"))
+ raise t.exception
threads.remove(t)
while not resultqueue.empty():
yield resultqueue.get()