mercurial/worker.py
changeset 35432 86b8cc1f244e
parent 35431 471918fa7f46
child 35453 44fd4cfc6c0a
equal deleted inserted replaced
35431:471918fa7f46 35432:86b8cc1f244e
    10 import errno
    10 import errno
    11 import os
    11 import os
    12 import signal
    12 import signal
    13 import sys
    13 import sys
    14 import threading
    14 import threading
       
    15 import time
    15 
    16 
    16 from .i18n import _
    17 from .i18n import _
    17 from . import (
    18 from . import (
    18     encoding,
    19     encoding,
    19     error,
    20     error,
   214             self._taskqueue = taskqueue
   215             self._taskqueue = taskqueue
   215             self._resultqueue = resultqueue
   216             self._resultqueue = resultqueue
   216             self._func = func
   217             self._func = func
   217             self._staticargs = staticargs
   218             self._staticargs = staticargs
   218             self._interrupted = False
   219             self._interrupted = False
       
   220             self.daemon = True
   219             self.exception = None
   221             self.exception = None
   220 
   222 
   221         def interrupt(self):
   223         def interrupt(self):
   222             self._interrupted = True
   224             self._interrupted = True
   223 
   225 
   240                 # it as if the func was running without workers.
   242                 # it as if the func was running without workers.
   241                 self.exception = e
   243                 self.exception = e
   242                 raise
   244                 raise
   243 
   245 
   244     threads = []
   246     threads = []
   245     def killworkers():
   247     def trykillworkers():
       
   248         # Allow up to 1 second to clean worker threads nicely
       
   249         cleanupend = time.time() + 1
   246         for t in threads:
   250         for t in threads:
   247             t.interrupt()
   251             t.interrupt()
   248         for t in threads:
   252         for t in threads:
   249             # try to let the threads handle interruption, but don't wait
   253             remainingtime = cleanupend - time.time()
   250             # indefintely. the thread could be in infinite loop, handling
   254             t.join(remainingtime)
   251             # a very long task or in a deadlock situation
       
   252             t.join(5)
       
   253             if t.is_alive():
   255             if t.is_alive():
   254                 raise error.Abort(_('failed to join worker thread'))
   256                 # pass over the workers joining failure. it is more
       
   257                 # important to surface the inital exception than the
       
   258                 # fact that one of workers may be processing a large
       
   259                 # task and does not get to handle the interruption.
       
   260                 ui.warn(_("failed to kill worker threads while "
       
   261                           "handling an exception\n"))
       
   262                 return
   255 
   263 
   256     workers = _numworkers(ui)
   264     workers = _numworkers(ui)
   257     resultqueue = util.queue()
   265     resultqueue = util.queue()
   258     taskqueue = util.queue()
   266     taskqueue = util.queue()
   259     # partition work to more pieces than workers to minimize the chance
   267     # partition work to more pieces than workers to minimize the chance
   262         taskqueue.put(pargs)
   270         taskqueue.put(pargs)
   263     for _i in range(workers):
   271     for _i in range(workers):
   264         t = Worker(taskqueue, resultqueue, func, staticargs)
   272         t = Worker(taskqueue, resultqueue, func, staticargs)
   265         threads.append(t)
   273         threads.append(t)
   266         t.start()
   274         t.start()
   267 
   275     try:
   268     while len(threads) > 0:
   276         while len(threads) > 0:
   269         while not resultqueue.empty():
   277             while not resultqueue.empty():
   270             yield resultqueue.get()
   278                 yield resultqueue.get()
   271         threads[0].join(0.05)
   279             threads[0].join(0.05)
   272         finishedthreads = [_t for _t in threads if not _t.is_alive()]
   280             finishedthreads = [_t for _t in threads if not _t.is_alive()]
   273         for t in finishedthreads:
   281             for t in finishedthreads:
   274             if t.exception is not None:
   282                 if t.exception is not None:
   275                 try:
   283                     raise t.exception
   276                     killworkers()
   284                 threads.remove(t)
   277                 except Exception:
   285     except Exception: # re-raises
   278                     # pass over the workers joining failure. it is more
   286         trykillworkers()
   279                     # important to surface the inital exception than the
   287         raise
   280                     # fact that one of workers may be processing a large
       
   281                     # task and does not get to handle the interruption.
       
   282                     ui.warn(_("failed to kill worker threads while handling "
       
   283                               "an exception"))
       
   284                 raise t.exception
       
   285             threads.remove(t)
       
   286     while not resultqueue.empty():
   288     while not resultqueue.empty():
   287         yield resultqueue.get()
   289         yield resultqueue.get()
   288 
   290 
   289 if pycompat.iswindows:
   291 if pycompat.iswindows:
   290     _platformworker = _windowsworker
   292     _platformworker = _windowsworker