214 self._taskqueue = taskqueue |
215 self._taskqueue = taskqueue |
215 self._resultqueue = resultqueue |
216 self._resultqueue = resultqueue |
216 self._func = func |
217 self._func = func |
217 self._staticargs = staticargs |
218 self._staticargs = staticargs |
218 self._interrupted = False |
219 self._interrupted = False |
|
220 self.daemon = True |
219 self.exception = None |
221 self.exception = None |
220 |
222 |
221 def interrupt(self): |
223 def interrupt(self): |
222 self._interrupted = True |
224 self._interrupted = True |
223 |
225 |
240 # it as if the func was running without workers. |
242 # it as if the func was running without workers. |
241 self.exception = e |
243 self.exception = e |
242 raise |
244 raise |
243 |
245 |
244 threads = [] |
246 threads = [] |
245 def killworkers(): |
247 def trykillworkers(): |
|
248 # Allow up to 1 second to clean worker threads nicely |
|
249 cleanupend = time.time() + 1 |
246 for t in threads: |
250 for t in threads: |
247 t.interrupt() |
251 t.interrupt() |
248 for t in threads: |
252 for t in threads: |
249 # try to let the threads handle interruption, but don't wait |
253 remainingtime = cleanupend - time.time() |
250 # indefintely. the thread could be in infinite loop, handling |
254 t.join(remainingtime) |
251 # a very long task or in a deadlock situation |
|
252 t.join(5) |
|
253 if t.is_alive(): |
255 if t.is_alive(): |
254 raise error.Abort(_('failed to join worker thread')) |
256 # pass over the workers joining failure. it is more |
|
257 # important to surface the inital exception than the |
|
258 # fact that one of workers may be processing a large |
|
259 # task and does not get to handle the interruption. |
|
260 ui.warn(_("failed to kill worker threads while " |
|
261 "handling an exception\n")) |
|
262 return |
255 |
263 |
256 workers = _numworkers(ui) |
264 workers = _numworkers(ui) |
257 resultqueue = util.queue() |
265 resultqueue = util.queue() |
258 taskqueue = util.queue() |
266 taskqueue = util.queue() |
259 # partition work to more pieces than workers to minimize the chance |
267 # partition work to more pieces than workers to minimize the chance |
262 taskqueue.put(pargs) |
270 taskqueue.put(pargs) |
263 for _i in range(workers): |
271 for _i in range(workers): |
264 t = Worker(taskqueue, resultqueue, func, staticargs) |
272 t = Worker(taskqueue, resultqueue, func, staticargs) |
265 threads.append(t) |
273 threads.append(t) |
266 t.start() |
274 t.start() |
267 |
275 try: |
268 while len(threads) > 0: |
276 while len(threads) > 0: |
269 while not resultqueue.empty(): |
277 while not resultqueue.empty(): |
270 yield resultqueue.get() |
278 yield resultqueue.get() |
271 threads[0].join(0.05) |
279 threads[0].join(0.05) |
272 finishedthreads = [_t for _t in threads if not _t.is_alive()] |
280 finishedthreads = [_t for _t in threads if not _t.is_alive()] |
273 for t in finishedthreads: |
281 for t in finishedthreads: |
274 if t.exception is not None: |
282 if t.exception is not None: |
275 try: |
283 raise t.exception |
276 killworkers() |
284 threads.remove(t) |
277 except Exception: |
285 except Exception: # re-raises |
278 # pass over the workers joining failure. it is more |
286 trykillworkers() |
279 # important to surface the inital exception than the |
287 raise |
280 # fact that one of workers may be processing a large |
|
281 # task and does not get to handle the interruption. |
|
282 ui.warn(_("failed to kill worker threads while handling " |
|
283 "an exception")) |
|
284 raise t.exception |
|
285 threads.remove(t) |
|
286 while not resultqueue.empty(): |
288 while not resultqueue.empty(): |
287 yield resultqueue.get() |
289 yield resultqueue.get() |
288 |
290 |
289 if pycompat.iswindows: |
291 if pycompat.iswindows: |
290 _platformworker = _windowsworker |
292 _platformworker = _windowsworker |