Mercurial > public > mercurial-scm > hg
comparison mercurial/worker.py @ 18638:047110c0e2a8
worker: allow a function to be run in multiple worker processes
If we estimate that it will be worth the cost, we run the function in
multiple processes. Otherwise, we run it in-process.
Children report progress to the parent through a pipe.
Not yet implemented on Windows.
author | Bryan O'Sullivan <bryano@fb.com> |
---|---|
date | Sat, 09 Feb 2013 15:51:32 -0800 |
parents | ac4dbceeb14a |
children | d1a2b086d058 |
comparison
equal
deleted
inserted
replaced
18637:ac4dbceeb14a | 18638:047110c0e2a8 |
---|---|
4 # | 4 # |
5 # This software may be used and distributed according to the terms of the | 5 # This software may be used and distributed according to the terms of the |
6 # GNU General Public License version 2 or any later version. | 6 # GNU General Public License version 2 or any later version. |
7 | 7 |
8 from i18n import _ | 8 from i18n import _ |
9 import os, util | 9 import os, signal, sys, util |
10 | 10 |
11 def countcpus(): | 11 def countcpus(): |
12 '''try to count the number of CPUs on the system''' | 12 '''try to count the number of CPUs on the system''' |
13 | 13 |
14 # posix | 14 # posix |
51 linear = costperop * nops | 51 linear = costperop * nops |
52 workers = _numworkers(ui) | 52 workers = _numworkers(ui) |
53 benefit = linear - (_startupcost * workers + linear / workers) | 53 benefit = linear - (_startupcost * workers + linear / workers) |
54 return benefit >= 0.15 | 54 return benefit >= 0.15 |
55 | 55 |
56 def worker(ui, costperarg, func, staticargs, args): | |
57 '''run a function, possibly in parallel in multiple worker | |
58 processes. | |
59 | |
60 returns a progress iterator | |
61 | |
62 costperarg - cost of a single task | |
63 | |
64 func - function to run | |
65 | |
66 staticargs - arguments to pass to every invocation of the function | |
67 | |
68 args - arguments to split into chunks, to pass to individual | |
69 workers | |
70 ''' | |
71 if worthwhile(ui, costperarg, len(args)): | |
72 return _platformworker(ui, func, staticargs, args) | |
73 return func(*staticargs + (args,)) | |
74 | |
75 def _posixworker(ui, func, staticargs, args): | |
76 rfd, wfd = os.pipe() | |
77 workers = _numworkers(ui) | |
78 for pargs in partition(args, workers): | |
79 pid = os.fork() | |
80 if pid == 0: | |
81 try: | |
82 os.close(rfd) | |
83 for i, item in func(*(staticargs + (pargs,))): | |
84 os.write(wfd, '%d %s\n' % (i, item)) | |
85 os._exit(0) | |
86 except KeyboardInterrupt: | |
87 os._exit(255) | |
88 os.close(wfd) | |
89 fp = os.fdopen(rfd, 'rb', 0) | |
90 oldhandler = signal.getsignal(signal.SIGINT) | |
91 signal.signal(signal.SIGINT, signal.SIG_IGN) | |
92 def cleanup(): | |
93 # python 2.4 is too dumb for try/yield/finally | |
94 signal.signal(signal.SIGINT, oldhandler) | |
95 problems = 0 | |
96 for i in xrange(workers): | |
97 problems |= os.wait()[1] | |
98 if problems: | |
99 sys.exit(1) | |
100 try: | |
101 for line in fp: | |
102 l = line.split(' ', 1) | |
103 yield int(l[0]), l[1][:-1] | |
104 except: # re-raises | |
105 cleanup() | |
106 raise | |
107 cleanup() | |
108 | |
109 if os.name != 'nt': | |
110 _platformworker = _posixworker | |
111 | |
56 def partition(lst, nslices): | 112 def partition(lst, nslices): |
57 '''partition a list into N slices of equal size''' | 113 '''partition a list into N slices of equal size''' |
58 n = len(lst) | 114 n = len(lst) |
59 chunk, slop = n / nslices, n % nslices | 115 chunk, slop = n / nslices, n % nslices |
60 end = 0 | 116 end = 0 |