mercurial/worker.py
changeset 35428 71427ff1dff8
parent 35427 02b36e860e0b
child 35431 471918fa7f46
--- a/mercurial/worker.py	Mon Nov 20 10:25:29 2017 -0800
+++ b/mercurial/worker.py	Mon Nov 20 10:27:41 2017 -0800
@@ -214,18 +214,45 @@
             self._resultqueue = resultqueue
             self._func = func
             self._staticargs = staticargs
+            self._interrupted = False
+            self.exception = None
+
+        def interrupt(self):
+            self._interrupted = True
 
         def run(self):
-            while not self._taskqueue.empty():
-                try:
-                    args = self._taskqueue.get_nowait()
-                    for res in self._func(*self._staticargs + (args,)):
-                        self._resultqueue.put(res)
-                except util.empty:
-                    break
+            try:
+                while not self._taskqueue.empty():
+                    try:
+                        args = self._taskqueue.get_nowait()
+                        for res in self._func(*self._staticargs + (args,)):
+                            self._resultqueue.put(res)
+                            # threading doesn't provide a native way to
+                            # interrupt execution. handle it manually at every
+                            # iteration.
+                            if self._interrupted:
+                                return
+                    except util.empty:
+                        break
+            except Exception as e:
+                # store the exception such that the main thread can resurface
+                # it as if the func was running without workers.
+                self.exception = e
+                raise
+
+    threads = []
+    def killworkers():
+        for t in threads:
+            t.interrupt()
+        for t in threads:
+            # try to let the threads handle interruption, but don't wait
+            # indefintely. the thread could be in infinite loop, handling
+            # a very long task or in a deadlock situation
+            t.join(5)
+            if t.is_alive():
+                raise error.Abort(_('failed to join worker thread'))
 
     workers = _numworkers(ui)
-    threads = []
     resultqueue = util.queue()
     taskqueue = util.queue()
     # partition work to more pieces than workers to minimize the chance
@@ -236,12 +263,24 @@
         t = Worker(taskqueue, resultqueue, func, staticargs)
         threads.append(t)
         t.start()
-    while any(t.is_alive() for t in threads):
+
+    while len(threads) > 0:
         while not resultqueue.empty():
             yield resultqueue.get()
-        t = threads[0]
-        t.join(0.05)
-        if not t.is_alive():
+        threads[0].join(0.05)
+        finishedthreads = [_t for _t in threads if not _t.is_alive()]
+        for t in finishedthreads:
+            if t.exception is not None:
+                try:
+                    killworkers()
+                except Exception:
+                    # pass over the workers joining failure. it is more
+                    # important to surface the inital exception than the
+                    # fact that one of workers may be processing a large
+                    # task and does not get to handle the interruption.
+                    ui.warn(_("failed to kill worker threads while handling "
+                              "an exception"))
+                raise t.exception
             threads.remove(t)
     while not resultqueue.empty():
         yield resultqueue.get()