mercurial/worker.py
changeset 42522 d29db0a0c4eb
parent 42455 5ca136bbd3f6
child 43076 2372284d9457
equal deleted inserted replaced
42521:64a873ca7135 42522:d29db0a0c4eb
    98 
    98 
    99     args - arguments to split into chunks, to pass to individual
    99     args - arguments to split into chunks, to pass to individual
   100     workers
   100     workers
   101 
   101 
   102     hasretval - when True, func and the current function return an progress
   102     hasretval - when True, func and the current function return an progress
   103     iterator then a list (encoded as an iterator that yield many (False, ..)
   103     iterator then a dict (encoded as an iterator that yield many (False, ..)
   104     then a (True, list)). The resulting list is in the natural order.
   104     then a (True, dict)). The dicts are joined in some arbitrary order, so
       
   105     overlapping keys are a bad idea.
   105 
   106 
   106     threadsafe - whether work items are thread safe and can be executed using
   107     threadsafe - whether work items are thread safe and can be executed using
   107     a thread-based worker. Should be disabled for CPU heavy tasks that don't
   108     a thread-based worker. Should be disabled for CPU heavy tasks that don't
   108     release the GIL.
   109     release the GIL.
   109     '''
   110     '''
   160             killworkers()
   161             killworkers()
   161     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
   162     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
   162     ui.flush()
   163     ui.flush()
   163     parentpid = os.getpid()
   164     parentpid = os.getpid()
   164     pipes = []
   165     pipes = []
   165     retvals = []
   166     retval = {}
   166     for i, pargs in enumerate(partition(args, workers)):
   167     for pargs in partition(args, workers):
   167         # Every worker gets its own pipe to send results on, so we don't have to
   168         # Every worker gets its own pipe to send results on, so we don't have to
   168         # implement atomic writes larger than PIPE_BUF. Each forked process has
   169         # implement atomic writes larger than PIPE_BUF. Each forked process has
   169         # its own pipe's descriptors in the local variables, and the parent
   170         # its own pipe's descriptors in the local variables, and the parent
   170         # process has the full list of pipe descriptors (and it doesn't really
   171         # process has the full list of pipe descriptors (and it doesn't really
   171         # care what order they're in).
   172         # care what order they're in).
   172         rfd, wfd = os.pipe()
   173         rfd, wfd = os.pipe()
   173         pipes.append((rfd, wfd))
   174         pipes.append((rfd, wfd))
   174         retvals.append(None)
       
   175         # make sure we use os._exit in all worker code paths. otherwise the
   175         # make sure we use os._exit in all worker code paths. otherwise the
   176         # worker may do some clean-ups which could cause surprises like
   176         # worker may do some clean-ups which could cause surprises like
   177         # deadlock. see sshpeer.cleanup for example.
   177         # deadlock. see sshpeer.cleanup for example.
   178         # override error handling *before* fork. this is necessary because
   178         # override error handling *before* fork. this is necessary because
   179         # exception (signal) may arrive after fork, before "pid =" assignment
   179         # exception (signal) may arrive after fork, before "pid =" assignment
   190                     for r, w in pipes[:-1]:
   190                     for r, w in pipes[:-1]:
   191                         os.close(r)
   191                         os.close(r)
   192                         os.close(w)
   192                         os.close(w)
   193                     os.close(rfd)
   193                     os.close(rfd)
   194                     for result in func(*(staticargs + (pargs,))):
   194                     for result in func(*(staticargs + (pargs,))):
   195                         os.write(wfd, util.pickle.dumps((i, result)))
   195                         os.write(wfd, util.pickle.dumps(result))
   196                     return 0
   196                     return 0
   197 
   197 
   198                 ret = scmutil.callcatch(ui, workerfunc)
   198                 ret = scmutil.callcatch(ui, workerfunc)
   199         except: # parent re-raises, child never returns
   199         except: # parent re-raises, child never returns
   200             if os.getpid() == parentpid:
   200             if os.getpid() == parentpid:
   224     try:
   224     try:
   225         openpipes = len(pipes)
   225         openpipes = len(pipes)
   226         while openpipes > 0:
   226         while openpipes > 0:
   227             for key, events in selector.select():
   227             for key, events in selector.select():
   228                 try:
   228                 try:
   229                     i, res = util.pickle.load(key.fileobj)
   229                     res = util.pickle.load(key.fileobj)
   230                     if hasretval and res[0]:
   230                     if hasretval and res[0]:
   231                         retvals[i] = res[1]
   231                         retval.update(res[1])
   232                     else:
   232                     else:
   233                         yield res
   233                         yield res
   234                 except EOFError:
   234                 except EOFError:
   235                     selector.unregister(key.fileobj)
   235                     selector.unregister(key.fileobj)
   236                     key.fileobj.close()
   236                     key.fileobj.close()
   247     if status:
   247     if status:
   248         if status < 0:
   248         if status < 0:
   249             os.kill(os.getpid(), -status)
   249             os.kill(os.getpid(), -status)
   250         sys.exit(status)
   250         sys.exit(status)
   251     if hasretval:
   251     if hasretval:
   252         yield True, sum(retvals, [])
   252         yield True, retval
   253 
   253 
   254 def _posixexitstatus(code):
   254 def _posixexitstatus(code):
   255     '''convert a posix exit status into the same form returned by
   255     '''convert a posix exit status into the same form returned by
   256     os.spawnv
   256     os.spawnv
   257 
   257 
   279 
   279 
   280         def run(self):
   280         def run(self):
   281             try:
   281             try:
   282                 while not self._taskqueue.empty():
   282                 while not self._taskqueue.empty():
   283                     try:
   283                     try:
   284                         i, args = self._taskqueue.get_nowait()
   284                         args = self._taskqueue.get_nowait()
   285                         for res in self._func(*self._staticargs + (args,)):
   285                         for res in self._func(*self._staticargs + (args,)):
   286                             self._resultqueue.put((i, res))
   286                             self._resultqueue.put(res)
   287                             # threading doesn't provide a native way to
   287                             # threading doesn't provide a native way to
   288                             # interrupt execution. handle it manually at every
   288                             # interrupt execution. handle it manually at every
   289                             # iteration.
   289                             # iteration.
   290                             if self._interrupted:
   290                             if self._interrupted:
   291                                 return
   291                                 return
   316                 return
   316                 return
   317 
   317 
   318     workers = _numworkers(ui)
   318     workers = _numworkers(ui)
   319     resultqueue = pycompat.queue.Queue()
   319     resultqueue = pycompat.queue.Queue()
   320     taskqueue = pycompat.queue.Queue()
   320     taskqueue = pycompat.queue.Queue()
   321     retvals = []
   321     retval = {}
   322     # partition work to more pieces than workers to minimize the chance
   322     # partition work to more pieces than workers to minimize the chance
   323     # of uneven distribution of large tasks between the workers
   323     # of uneven distribution of large tasks between the workers
   324     for pargs in enumerate(partition(args, workers * 20)):
   324     for pargs in partition(args, workers * 20):
   325         retvals.append(None)
       
   326         taskqueue.put(pargs)
   325         taskqueue.put(pargs)
   327     for _i in range(workers):
   326     for _i in range(workers):
   328         t = Worker(taskqueue, resultqueue, func, staticargs)
   327         t = Worker(taskqueue, resultqueue, func, staticargs)
   329         threads.append(t)
   328         threads.append(t)
   330         t.start()
   329         t.start()
   331     try:
   330     try:
   332         while len(threads) > 0:
   331         while len(threads) > 0:
   333             while not resultqueue.empty():
   332             while not resultqueue.empty():
   334                 (i, res) = resultqueue.get()
   333                 res = resultqueue.get()
   335                 if hasretval and res[0]:
   334                 if hasretval and res[0]:
   336                     retvals[i] = res[1]
   335                     retval.update(res[1])
   337                 else:
   336                 else:
   338                     yield res
   337                     yield res
   339             threads[0].join(0.05)
   338             threads[0].join(0.05)
   340             finishedthreads = [_t for _t in threads if not _t.is_alive()]
   339             finishedthreads = [_t for _t in threads if not _t.is_alive()]
   341             for t in finishedthreads:
   340             for t in finishedthreads:
   344                 threads.remove(t)
   343                 threads.remove(t)
   345     except (Exception, KeyboardInterrupt): # re-raises
   344     except (Exception, KeyboardInterrupt): # re-raises
   346         trykillworkers()
   345         trykillworkers()
   347         raise
   346         raise
   348     while not resultqueue.empty():
   347     while not resultqueue.empty():
   349         (i, res) = resultqueue.get()
   348         res = resultqueue.get()
   350         if hasretval and res[0]:
   349         if hasretval and res[0]:
   351             retvals[i] = res[1]
   350             retval.update(res[1])
   352         else:
   351         else:
   353             yield res
   352             yield res
   354     if hasretval:
   353     if hasretval:
   355         yield True, sum(retvals, [])
   354         yield True, retval
   356 
   355 
   357 if pycompat.iswindows:
   356 if pycompat.iswindows:
   358     _platformworker = _windowsworker
   357     _platformworker = _windowsworker
   359 else:
   358 else:
   360     _platformworker = _posixworker
   359     _platformworker = _posixworker