--- a/mercurial/worker.py Thu Jun 27 11:09:09 2019 +0200
+++ b/mercurial/worker.py Thu Jun 27 11:39:35 2019 +0200
@@ -100,8 +100,9 @@
workers
hasretval - when True, func and the current function return an progress
- iterator then a list (encoded as an iterator that yield many (False, ..)
- then a (True, list)). The resulting list is in the natural order.
+ iterator then a dict (encoded as an iterator that yield many (False, ..)
+ then a (True, dict)). The dicts are joined in some arbitrary order, so
+ overlapping keys are a bad idea.
threadsafe - whether work items are thread safe and can be executed using
a thread-based worker. Should be disabled for CPU heavy tasks that don't
@@ -162,8 +163,8 @@
ui.flush()
parentpid = os.getpid()
pipes = []
- retvals = []
- for i, pargs in enumerate(partition(args, workers)):
+ retval = {}
+ for pargs in partition(args, workers):
# Every worker gets its own pipe to send results on, so we don't have to
# implement atomic writes larger than PIPE_BUF. Each forked process has
# its own pipe's descriptors in the local variables, and the parent
@@ -171,7 +172,6 @@
# care what order they're in).
rfd, wfd = os.pipe()
pipes.append((rfd, wfd))
- retvals.append(None)
# make sure we use os._exit in all worker code paths. otherwise the
# worker may do some clean-ups which could cause surprises like
# deadlock. see sshpeer.cleanup for example.
@@ -192,7 +192,7 @@
os.close(w)
os.close(rfd)
for result in func(*(staticargs + (pargs,))):
- os.write(wfd, util.pickle.dumps((i, result)))
+ os.write(wfd, util.pickle.dumps(result))
return 0
ret = scmutil.callcatch(ui, workerfunc)
@@ -226,9 +226,9 @@
while openpipes > 0:
for key, events in selector.select():
try:
- i, res = util.pickle.load(key.fileobj)
+ res = util.pickle.load(key.fileobj)
if hasretval and res[0]:
- retvals[i] = res[1]
+ retval.update(res[1])
else:
yield res
except EOFError:
@@ -249,7 +249,7 @@
os.kill(os.getpid(), -status)
sys.exit(status)
if hasretval:
- yield True, sum(retvals, [])
+ yield True, retval
def _posixexitstatus(code):
'''convert a posix exit status into the same form returned by
@@ -281,9 +281,9 @@
try:
while not self._taskqueue.empty():
try:
- i, args = self._taskqueue.get_nowait()
+ args = self._taskqueue.get_nowait()
for res in self._func(*self._staticargs + (args,)):
- self._resultqueue.put((i, res))
+ self._resultqueue.put(res)
# threading doesn't provide a native way to
# interrupt execution. handle it manually at every
# iteration.
@@ -318,11 +318,10 @@
workers = _numworkers(ui)
resultqueue = pycompat.queue.Queue()
taskqueue = pycompat.queue.Queue()
- retvals = []
+ retval = {}
# partition work to more pieces than workers to minimize the chance
# of uneven distribution of large tasks between the workers
- for pargs in enumerate(partition(args, workers * 20)):
- retvals.append(None)
+ for pargs in partition(args, workers * 20):
taskqueue.put(pargs)
for _i in range(workers):
t = Worker(taskqueue, resultqueue, func, staticargs)
@@ -331,9 +330,9 @@
try:
while len(threads) > 0:
while not resultqueue.empty():
- (i, res) = resultqueue.get()
+ res = resultqueue.get()
if hasretval and res[0]:
- retvals[i] = res[1]
+ retval.update(res[1])
else:
yield res
threads[0].join(0.05)
@@ -346,13 +345,13 @@
trykillworkers()
raise
while not resultqueue.empty():
- (i, res) = resultqueue.get()
+ res = resultqueue.get()
if hasretval and res[0]:
- retvals[i] = res[1]
+ retval.update(res[1])
else:
yield res
if hasretval:
- yield True, sum(retvals, [])
+ yield True, retval
if pycompat.iswindows:
_platformworker = _windowsworker