comparison mercurial/worker.py @ 18638:047110c0e2a8

worker: allow a function to be run in multiple worker processes If we estimate that it will be worth the cost, we run the function in multiple processes. Otherwise, we run it in-process. Children report progress to the parent through a pipe. Not yet implemented on Windows.
author Bryan O'Sullivan <bryano@fb.com>
date Sat, 09 Feb 2013 15:51:32 -0800
parents ac4dbceeb14a
children d1a2b086d058
comparison
equal deleted inserted replaced
18637:ac4dbceeb14a 18638:047110c0e2a8
4 # 4 #
5 # This software may be used and distributed according to the terms of the 5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version. 6 # GNU General Public License version 2 or any later version.
7 7
8 from i18n import _ 8 from i18n import _
9 import os, util 9 import os, signal, sys, util
10 10
11 def countcpus(): 11 def countcpus():
12 '''try to count the number of CPUs on the system''' 12 '''try to count the number of CPUs on the system'''
13 13
14 # posix 14 # posix
51 linear = costperop * nops 51 linear = costperop * nops
52 workers = _numworkers(ui) 52 workers = _numworkers(ui)
53 benefit = linear - (_startupcost * workers + linear / workers) 53 benefit = linear - (_startupcost * workers + linear / workers)
54 return benefit >= 0.15 54 return benefit >= 0.15
55 55
56 def worker(ui, costperarg, func, staticargs, args):
57 '''run a function, possibly in parallel in multiple worker
58 processes.
59
60 returns a progress iterator
61
62 costperarg - cost of a single task
63
64 func - function to run
65
66 staticargs - arguments to pass to every invocation of the function
67
68 args - arguments to split into chunks, to pass to individual
69 workers
70 '''
71 if worthwhile(ui, costperarg, len(args)):
72 return _platformworker(ui, func, staticargs, args)
73 return func(*staticargs + (args,))
74
75 def _posixworker(ui, func, staticargs, args):
76 rfd, wfd = os.pipe()
77 workers = _numworkers(ui)
78 for pargs in partition(args, workers):
79 pid = os.fork()
80 if pid == 0:
81 try:
82 os.close(rfd)
83 for i, item in func(*(staticargs + (pargs,))):
84 os.write(wfd, '%d %s\n' % (i, item))
85 os._exit(0)
86 except KeyboardInterrupt:
87 os._exit(255)
88 os.close(wfd)
89 fp = os.fdopen(rfd, 'rb', 0)
90 oldhandler = signal.getsignal(signal.SIGINT)
91 signal.signal(signal.SIGINT, signal.SIG_IGN)
92 def cleanup():
93 # python 2.4 is too dumb for try/yield/finally
94 signal.signal(signal.SIGINT, oldhandler)
95 problems = 0
96 for i in xrange(workers):
97 problems |= os.wait()[1]
98 if problems:
99 sys.exit(1)
100 try:
101 for line in fp:
102 l = line.split(' ', 1)
103 yield int(l[0]), l[1][:-1]
104 except: # re-raises
105 cleanup()
106 raise
107 cleanup()
108
109 if os.name != 'nt':
110 _platformworker = _posixworker
111
56 def partition(lst, nslices): 112 def partition(lst, nslices):
57 '''partition a list into N slices of equal size''' 113 '''partition a list into N slices of equal size'''
58 n = len(lst) 114 n = len(lst)
59 chunk, slop = n / nslices, n % nslices 115 chunk, slop = n / nslices, n % nslices
60 end = 0 116 end = 0