98 |
98 |
99 args - arguments to split into chunks, to pass to individual |
99 args - arguments to split into chunks, to pass to individual |
100 workers |
100 workers |
101 |
101 |
102 hasretval - when True, func and the current function return an progress |
102 hasretval - when True, func and the current function return an progress |
103 iterator then a list (encoded as an iterator that yield many (False, ..) |
103 iterator then a dict (encoded as an iterator that yield many (False, ..) |
104 then a (True, list)). The resulting list is in the natural order. |
104 then a (True, dict)). The dicts are joined in some arbitrary order, so |
|
105 overlapping keys are a bad idea. |
105 |
106 |
106 threadsafe - whether work items are thread safe and can be executed using |
107 threadsafe - whether work items are thread safe and can be executed using |
107 a thread-based worker. Should be disabled for CPU heavy tasks that don't |
108 a thread-based worker. Should be disabled for CPU heavy tasks that don't |
108 release the GIL. |
109 release the GIL. |
109 ''' |
110 ''' |
160 killworkers() |
161 killworkers() |
161 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
162 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) |
162 ui.flush() |
163 ui.flush() |
163 parentpid = os.getpid() |
164 parentpid = os.getpid() |
164 pipes = [] |
165 pipes = [] |
165 retvals = [] |
166 retval = {} |
166 for i, pargs in enumerate(partition(args, workers)): |
167 for pargs in partition(args, workers): |
167 # Every worker gets its own pipe to send results on, so we don't have to |
168 # Every worker gets its own pipe to send results on, so we don't have to |
168 # implement atomic writes larger than PIPE_BUF. Each forked process has |
169 # implement atomic writes larger than PIPE_BUF. Each forked process has |
169 # its own pipe's descriptors in the local variables, and the parent |
170 # its own pipe's descriptors in the local variables, and the parent |
170 # process has the full list of pipe descriptors (and it doesn't really |
171 # process has the full list of pipe descriptors (and it doesn't really |
171 # care what order they're in). |
172 # care what order they're in). |
172 rfd, wfd = os.pipe() |
173 rfd, wfd = os.pipe() |
173 pipes.append((rfd, wfd)) |
174 pipes.append((rfd, wfd)) |
174 retvals.append(None) |
|
175 # make sure we use os._exit in all worker code paths. otherwise the |
175 # make sure we use os._exit in all worker code paths. otherwise the |
176 # worker may do some clean-ups which could cause surprises like |
176 # worker may do some clean-ups which could cause surprises like |
177 # deadlock. see sshpeer.cleanup for example. |
177 # deadlock. see sshpeer.cleanup for example. |
178 # override error handling *before* fork. this is necessary because |
178 # override error handling *before* fork. this is necessary because |
179 # exception (signal) may arrive after fork, before "pid =" assignment |
179 # exception (signal) may arrive after fork, before "pid =" assignment |
190 for r, w in pipes[:-1]: |
190 for r, w in pipes[:-1]: |
191 os.close(r) |
191 os.close(r) |
192 os.close(w) |
192 os.close(w) |
193 os.close(rfd) |
193 os.close(rfd) |
194 for result in func(*(staticargs + (pargs,))): |
194 for result in func(*(staticargs + (pargs,))): |
195 os.write(wfd, util.pickle.dumps((i, result))) |
195 os.write(wfd, util.pickle.dumps(result)) |
196 return 0 |
196 return 0 |
197 |
197 |
198 ret = scmutil.callcatch(ui, workerfunc) |
198 ret = scmutil.callcatch(ui, workerfunc) |
199 except: # parent re-raises, child never returns |
199 except: # parent re-raises, child never returns |
200 if os.getpid() == parentpid: |
200 if os.getpid() == parentpid: |
224 try: |
224 try: |
225 openpipes = len(pipes) |
225 openpipes = len(pipes) |
226 while openpipes > 0: |
226 while openpipes > 0: |
227 for key, events in selector.select(): |
227 for key, events in selector.select(): |
228 try: |
228 try: |
229 i, res = util.pickle.load(key.fileobj) |
229 res = util.pickle.load(key.fileobj) |
230 if hasretval and res[0]: |
230 if hasretval and res[0]: |
231 retvals[i] = res[1] |
231 retval.update(res[1]) |
232 else: |
232 else: |
233 yield res |
233 yield res |
234 except EOFError: |
234 except EOFError: |
235 selector.unregister(key.fileobj) |
235 selector.unregister(key.fileobj) |
236 key.fileobj.close() |
236 key.fileobj.close() |
279 |
279 |
280 def run(self): |
280 def run(self): |
281 try: |
281 try: |
282 while not self._taskqueue.empty(): |
282 while not self._taskqueue.empty(): |
283 try: |
283 try: |
284 i, args = self._taskqueue.get_nowait() |
284 args = self._taskqueue.get_nowait() |
285 for res in self._func(*self._staticargs + (args,)): |
285 for res in self._func(*self._staticargs + (args,)): |
286 self._resultqueue.put((i, res)) |
286 self._resultqueue.put(res) |
287 # threading doesn't provide a native way to |
287 # threading doesn't provide a native way to |
288 # interrupt execution. handle it manually at every |
288 # interrupt execution. handle it manually at every |
289 # iteration. |
289 # iteration. |
290 if self._interrupted: |
290 if self._interrupted: |
291 return |
291 return |
316 return |
316 return |
317 |
317 |
318 workers = _numworkers(ui) |
318 workers = _numworkers(ui) |
319 resultqueue = pycompat.queue.Queue() |
319 resultqueue = pycompat.queue.Queue() |
320 taskqueue = pycompat.queue.Queue() |
320 taskqueue = pycompat.queue.Queue() |
321 retvals = [] |
321 retval = {} |
322 # partition work to more pieces than workers to minimize the chance |
322 # partition work to more pieces than workers to minimize the chance |
323 # of uneven distribution of large tasks between the workers |
323 # of uneven distribution of large tasks between the workers |
324 for pargs in enumerate(partition(args, workers * 20)): |
324 for pargs in partition(args, workers * 20): |
325 retvals.append(None) |
|
326 taskqueue.put(pargs) |
325 taskqueue.put(pargs) |
327 for _i in range(workers): |
326 for _i in range(workers): |
328 t = Worker(taskqueue, resultqueue, func, staticargs) |
327 t = Worker(taskqueue, resultqueue, func, staticargs) |
329 threads.append(t) |
328 threads.append(t) |
330 t.start() |
329 t.start() |
331 try: |
330 try: |
332 while len(threads) > 0: |
331 while len(threads) > 0: |
333 while not resultqueue.empty(): |
332 while not resultqueue.empty(): |
334 (i, res) = resultqueue.get() |
333 res = resultqueue.get() |
335 if hasretval and res[0]: |
334 if hasretval and res[0]: |
336 retvals[i] = res[1] |
335 retval.update(res[1]) |
337 else: |
336 else: |
338 yield res |
337 yield res |
339 threads[0].join(0.05) |
338 threads[0].join(0.05) |
340 finishedthreads = [_t for _t in threads if not _t.is_alive()] |
339 finishedthreads = [_t for _t in threads if not _t.is_alive()] |
341 for t in finishedthreads: |
340 for t in finishedthreads: |
344 threads.remove(t) |
343 threads.remove(t) |
345 except (Exception, KeyboardInterrupt): # re-raises |
344 except (Exception, KeyboardInterrupt): # re-raises |
346 trykillworkers() |
345 trykillworkers() |
347 raise |
346 raise |
348 while not resultqueue.empty(): |
347 while not resultqueue.empty(): |
349 (i, res) = resultqueue.get() |
348 res = resultqueue.get() |
350 if hasretval and res[0]: |
349 if hasretval and res[0]: |
351 retvals[i] = res[1] |
350 retval.update(res[1]) |
352 else: |
351 else: |
353 yield res |
352 yield res |
354 if hasretval: |
353 if hasretval: |
355 yield True, sum(retvals, []) |
354 yield True, retval |
356 |
355 |
357 if pycompat.iswindows: |
356 if pycompat.iswindows: |
358 _platformworker = _windowsworker |
357 _platformworker = _windowsworker |
359 else: |
358 else: |
360 _platformworker = _posixworker |
359 _platformworker = _posixworker |