annotate mercurial/worker.py @ 49245:cdb85d0512b8

branching: fix wrong merge conflict resolution from 13dfad0f9f7a 13dfad0f9f7a merged stable into default, but accidentally added the _blockingreader class from stable (but deindented) instead of merging the changes from stable (2fe4efaa59af) into the existing _blockingreader class. This resulted in the _blockingreader being there two times.
author Manuel Jacob <me@manueljacob.de>
date Wed, 25 May 2022 17:23:16 +0200
parents 13dfad0f9f7a
children 5d28246b9acc
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
1 # worker.py - master-slave parallelism support
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
2 #
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
3 # Copyright 2013 Facebook, Inc.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
4 #
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
5 # This software may be used and distributed according to the terms of the
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
6 # GNU General Public License version 2 or any later version.
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
7
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
8
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
9 import errno
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
10 import os
48961
df56e6bd37f6 py3: use pickle directly
Gregory Szorc <gregory.szorc@gmail.com>
parents: 46819
diff changeset
11 import pickle
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
12 import signal
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
13 import sys
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
14 import threading
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
15 import time
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
16
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
17 try:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
18 import selectors
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
19
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
20 selectors.BaseSelector
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
21 except ImportError:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
22 from .thirdparty import selectors2 as selectors
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
23
25992
2d76f8a2d831 worker: use absolute_import
Gregory Szorc <gregory.szorc@gmail.com>
parents: 25660
diff changeset
24 from .i18n import _
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
25 from . import (
30640
a150173da1c1 py3: replace os.environ with encoding.environ (part 2 of 5)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30530
diff changeset
26 encoding,
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
27 error,
30644
d524c88511a7 py3: replace os.name with pycompat.osname (part 1 of 2)
Pulkit Goyal <7895pulkit@gmail.com>
parents: 30640
diff changeset
28 pycompat,
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
29 scmutil,
30406
78a58dcf8853 worker: migrate to util.iterfile
Jun Wu <quark@fb.com>
parents: 28292
diff changeset
30 )
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
31
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
32
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
33 def countcpus():
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
34 '''try to count the number of CPUs on the system'''
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
35
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
36 # posix
18635
fed06dd07665 worker: count the number of CPUs
Bryan O'Sullivan <bryano@fb.com>
parents:
diff changeset
37 try:
43554
9f70512ae2cf cleanup: remove pointless r-prefixes on single-quoted strings
Augie Fackler <augie@google.com>
parents: 43077
diff changeset
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
39 if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
40 return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
41 except (AttributeError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
42 pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
43
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
44 # windows
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
45 try:
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
26568
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
47 if n > 0:
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
48 return n
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
49 except (KeyError, ValueError):
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
50 pass
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
51
c0501c26b05c worker: restore old countcpus code (issue4869)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26063
diff changeset
52 return 1
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
53
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
54
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
55 def _numworkers(ui):
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
56 s = ui.config(b'worker', b'numcpus')
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
57 if s:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
58 try:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
59 n = int(s)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
60 if n >= 1:
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
61 return n
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
62 except ValueError:
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
63 raise error.Abort(_(b'number of cpus must be an integer'))
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
64 return min(max(countcpus(), 4), 32)
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
65
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
66
48992
cc0e059d2af8 worker: remove Python 2 support code
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48966
diff changeset
67 def ismainthread():
cc0e059d2af8 worker: remove Python 2 support code
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48966
diff changeset
68 return threading.current_thread() == threading.main_thread()
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
69
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
70
49245
cdb85d0512b8 branching: fix wrong merge conflict resolution from 13dfad0f9f7a
Manuel Jacob <me@manueljacob.de>
parents: 49238
diff changeset
71 class _blockingreader:
49238
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
72 def __init__(self, wrapped):
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
73 self._wrapped = wrapped
48870
2fe4efaa59af worker: adapt _blockingreader to work around a python3.8.[0-1] bug (issue6444)
Matt Harbison <matt_harbison@yahoo.com>
parents: 46819
diff changeset
74
49238
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
75 # Do NOT implement readinto() by making it delegate to
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
77 # with just read() and readline(), so we don't need to implement it.
48870
2fe4efaa59af worker: adapt _blockingreader to work around a python3.8.[0-1] bug (issue6444)
Matt Harbison <matt_harbison@yahoo.com>
parents: 46819
diff changeset
78
49238
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
79 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
80
49238
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
81 # This is required for python 3.8, prior to 3.8.2. See issue6444.
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
82 def readinto(self, b):
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
83 pos = 0
49238
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
84 size = len(b)
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
85
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
86 while pos < size:
49238
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
87 ret = self._wrapped.readinto(b[pos:])
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
88 if not ret:
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
89 break
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
90 pos += ret
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
91
49238
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
92 return pos
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
93
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
94 def readline(self):
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
95 return self._wrapped.readline()
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
96
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
97 # issue multiple reads until size is fulfilled
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
98 def read(self, size=-1):
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
99 if size < 0:
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
100 return self._wrapped.readall()
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
101
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
102 buf = bytearray(size)
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
103 view = memoryview(buf)
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
104 pos = 0
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
105
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
106 while pos < size:
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
107 ret = self._wrapped.readinto(view[pos:])
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
108 if not ret:
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
109 break
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
110 pos += ret
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
111
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
112 del view
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
113 del buf[pos:]
13dfad0f9f7a branching: merge stable into default
Rapha?l Gom?s <rgomes@octobus.net>
parents: 49037 48870
diff changeset
114 return bytes(buf)
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
115
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
116
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
117 if pycompat.isposix or pycompat.iswindows:
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
118 _STARTUP_COST = 0.01
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
119 # The Windows worker is thread based. If tasks are CPU bound, threads
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
120 # in the presence of the GIL result in excessive context switching and
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
121 # this overhead can slow down execution.
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
122 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
123 else:
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
124 _STARTUP_COST = 1e30
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
125 _DISALLOW_THREAD_UNSAFE = False
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
126
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
127
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
128 def worthwhile(ui, costperop, nops, threadsafe=True):
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
129 """try to determine whether the benefit of multiple processes can
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
130 outweigh the cost of starting them"""
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
131
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
132 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
133 return False
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
134
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
135 linear = costperop * nops
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
136 workers = _numworkers(ui)
38730
69ed2cff4277 worker: rename variable to reflect constant
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38729
diff changeset
137 benefit = linear - (_STARTUP_COST * workers + linear / workers)
18636
dcb27c153a40 worker: estimate whether it's worth running a task in parallel
Bryan O'Sullivan <bryano@fb.com>
parents: 18635
diff changeset
138 return benefit >= 0.15
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
139
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
140
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
141 def worker(
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
142 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
143 ):
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
144 """run a function, possibly in parallel in multiple worker
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
145 processes.
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
146
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
147 returns a progress iterator
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
148
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
149 costperarg - cost of a single task
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
150
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
151 func - function to run. It is expected to return a progress iterator.
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
152
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
153 staticargs - arguments to pass to every invocation of the function
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
154
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
155 args - arguments to split into chunks, to pass to individual
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
156 workers
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
157
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
158 hasretval - when True, func and the current function return an progress
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
159 iterator then a dict (encoded as an iterator that yield many (False, ..)
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
160 then a (True, dict)). The dicts are joined in some arbitrary order, so
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
161 overlapping keys are a bad idea.
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
162
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
163 threadsafe - whether work items are thread safe and can be executed using
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
164 a thread-based worker. Should be disabled for CPU heavy tasks that don't
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
165 release the GIL.
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
166 """
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
167 enabled = ui.configbool(b'worker', b'enabled')
46240
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
168 if enabled and _platformworker is _posixworker and not ismainthread():
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
169 # The POSIX worker has to install a handler for SIGCHLD.
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
170 # Python up to 3.9 only allows this in the main thread.
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
171 enabled = False
a42502e9ae6d worker: POSIX only supports workers from main thread (issue6460)
Joerg Sonnenberger <joerg@bec.de>
parents: 45957
diff changeset
172
38731
ef3838a47503 worker: ability to disable thread unsafe tasks
Gregory Szorc <gregory.szorc@gmail.com>
parents: 38730
diff changeset
173 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
174 return _platformworker(ui, func, staticargs, args, hasretval)
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
175 return func(*staticargs + (args,))
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
176
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
177
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
178 def _posixworker(ui, func, staticargs, args, hasretval):
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
179 workers = _numworkers(ui)
18708
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
180 oldhandler = signal.getsignal(signal.SIGINT)
86524a70c0f6 worker: fix a race in SIGINT handling
Bryan O'Sullivan <bryano@fb.com>
parents: 18707
diff changeset
181 signal.signal(signal.SIGINT, signal.SIG_IGN)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
182 pids, problem = set(), [0]
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
183
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
184 def killworkers():
30432
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
185 # unregister SIGCHLD handler as all children will be killed. This
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
186 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
187 # could be updated while iterating, which would cause inconsistency.
237b2883cbd8 worker: make sure killworkers() never be interrupted by another SIGCHLD
Yuya Nishihara <yuya@tcha.org>
parents: 30431
diff changeset
188 signal.signal(signal.SIGCHLD, oldchldhandler)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
189 # if one worker bails, there's no good reason to wait for the rest
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
190 for p in pids:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
191 try:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
192 os.kill(p, signal.SIGTERM)
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
193 except OSError as err:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
194 if err.errno != errno.ESRCH:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
195 raise
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
196
30422
7bc25549e084 worker: allow waitforworkers to be non-blocking
Jun Wu <quark@fb.com>
parents: 30421
diff changeset
197 def waitforworkers(blocking=True):
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
198 for pid in pids.copy():
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
199 p = st = 0
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
200 while True:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
201 try:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
202 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
30431
0e6ce6313e47 worker: fix missed break on successful waitpid()
Yuya Nishihara <yuya@tcha.org>
parents: 30426
diff changeset
203 break
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
204 except OSError as e:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
205 if e.errno == errno.EINTR:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
206 continue
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
207 elif e.errno == errno.ECHILD:
30434
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
208 # child would already be reaped, but pids yet been
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
209 # updated (maybe interrupted just after waitpid)
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
210 pids.discard(pid)
03f7aa2bd0e3 worker: discard waited pid by anyone who noticed it first
Yuya Nishihara <yuya@tcha.org>
parents: 30433
diff changeset
211 break
30424
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
212 else:
5069a8a40b1b worker: make waitforworkers reentrant
Jun Wu <quark@fb.com>
parents: 30423
diff changeset
213 raise
30878
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
214 if not p:
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
215 # skip subsequent steps, because child process should
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
216 # be still running in this case
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
217 continue
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
218 pids.discard(p)
18fb3cf572b4 worker: ignore meaningless exit status indication returned by os.waitpid()
FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
parents: 30644
diff changeset
219 st = _exitstatus(st)
30420
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
220 if st and not problem[0]:
7a5d6e2fd2d5 worker: move killworkers and waitforworkers up
Jun Wu <quark@fb.com>
parents: 30406
diff changeset
221 problem[0] = st
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
222
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
223 def sigchldhandler(signum, frame):
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
224 waitforworkers(blocking=False)
30433
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30432
diff changeset
225 if problem[0]:
f2d13eb85198 worker: kill workers after all zombie processes are reaped
Yuya Nishihara <yuya@tcha.org>
parents: 30432
diff changeset
226 killworkers()
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
227
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
228 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
31701
9d3d56aa1a9f worker: flush ui buffers before running the worker
David Soria Parra <davidsp@fb.com>
parents: 31134
diff changeset
229 ui.flush()
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
230 parentpid = os.getpid()
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
231 pipes = []
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
232 retval = {}
45396
26eb62bd0550 posixworker: avoid creating workers that end up getting no work
Martin von Zweigbergk <martinvonz@google.com>
parents: 44165
diff changeset
233 for pargs in partition(args, min(workers, len(args))):
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
234 # Every worker gets its own pipe to send results on, so we don't have to
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
235 # implement atomic writes larger than PIPE_BUF. Each forked process has
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
236 # its own pipe's descriptors in the local variables, and the parent
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
237 # process has the full list of pipe descriptors (and it doesn't really
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
238 # care what order they're in).
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
239 rfd, wfd = os.pipe()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
240 pipes.append((rfd, wfd))
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
241 # make sure we use os._exit in all worker code paths. otherwise the
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
242 # worker may do some clean-ups which could cause surprises like
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
243 # deadlock. see sshpeer.cleanup for example.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
244 # override error handling *before* fork. this is necessary because
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
245 # exception (signal) may arrive after fork, before "pid =" assignment
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
246 # completes, and other exception handler (dispatch.py) can lead to
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
247 # unexpected code path without os._exit.
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
248 ret = -1
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
249 try:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
250 pid = os.fork()
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
251 if pid == 0:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
252 signal.signal(signal.SIGINT, oldhandler)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
253 signal.signal(signal.SIGCHLD, oldchldhandler)
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
254
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
255 def workerfunc():
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
256 for r, w in pipes[:-1]:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
257 os.close(r)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
258 os.close(w)
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
259 os.close(rfd)
38536
8c38d2948217 worker: support more return types in posix worker
Danny Hooper <hooper@google.com>
parents: 37890
diff changeset
260 for result in func(*(staticargs + (pargs,))):
48961
df56e6bd37f6 py3: use pickle directly
Gregory Szorc <gregory.szorc@gmail.com>
parents: 46819
diff changeset
261 os.write(wfd, pickle.dumps(result))
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
262 return 0
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
263
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
264 ret = scmutil.callcatch(ui, workerfunc)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
265 except: # parent re-raises, child never returns
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
266 if os.getpid() == parentpid:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
267 raise
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
268 exctype = sys.exc_info()[0]
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
269 force = not issubclass(exctype, KeyboardInterrupt)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
270 ui.traceback(force=force)
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
271 finally:
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
272 if os.getpid() != parentpid:
30882
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 30878
diff changeset
273 try:
a91c62752d08 worker: flush messages written by child processes before exit
Yuya Nishihara <yuya@tcha.org>
parents: 30878
diff changeset
274 ui.flush()
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
275 except: # never returns, no re-raises
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
276 pass
30530
86cd09bc13ba worker: use os._exit for posix worker in all cases
Jun Wu <quark@fb.com>
parents: 30434
diff changeset
277 finally:
32166
31763785094b worker: rewrite error handling so os._exit covers all cases
Jun Wu <quark@fb.com>
parents: 32043
diff changeset
278 os._exit(ret & 255)
30423
9c25a1a8c685 worker: change "pids" to a set
Jun Wu <quark@fb.com>
parents: 30422
diff changeset
279 pids.add(pid)
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
280 selector = selectors.DefaultSelector()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
281 for rfd, wfd in pipes:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
282 os.close(wfd)
44165
12491abf93bd worker: manually buffer reads from pickle stream
Jan Alexander Steffens (heftig) <jan.steffens@gmail.com>
parents: 44155
diff changeset
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
284
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
285 def cleanup():
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
286 signal.signal(signal.SIGINT, oldhandler)
30426
c27614f2dec1 worker: stop using a separate thread waiting for children
Jun Wu <quark@fb.com>
parents: 30425
diff changeset
287 waitforworkers()
30425
e8fb03cfbbde worker: add a SIGCHLD handler to collect worker immediately
Jun Wu <quark@fb.com>
parents: 30424
diff changeset
288 signal.signal(signal.SIGCHLD, oldchldhandler)
38740
c08ea1e219c0 worker: call selector.close() to release polling resources
Yuya Nishihara <yuya@tcha.org>
parents: 38731
diff changeset
289 selector.close()
40472
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
290 return problem[0]
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
291
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
292 try:
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
293 openpipes = len(pipes)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
294 while openpipes > 0:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
295 for key, events in selector.select():
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
296 try:
49003
a0674e916fb6 worker: silence type error when calling pickle
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48992
diff changeset
297 # The pytype error likely goes away on a modern version of
a0674e916fb6 worker: silence type error when calling pickle
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48992
diff changeset
298 # pytype having a modern typeshed snapshot.
a0674e916fb6 worker: silence type error when calling pickle
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48992
diff changeset
299 # pytype: disable=wrong-arg-types
48961
df56e6bd37f6 py3: use pickle directly
Gregory Szorc <gregory.szorc@gmail.com>
parents: 46819
diff changeset
300 res = pickle.load(_blockingreader(key.fileobj))
49003
a0674e916fb6 worker: silence type error when calling pickle
Gregory Szorc <gregory.szorc@gmail.com>
parents: 48992
diff changeset
301 # pytype: enable=wrong-arg-types
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
302 if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
303 retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
304 else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
305 yield res
38729
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
306 except EOFError:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
307 selector.unregister(key.fileobj)
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
308 key.fileobj.close()
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
309 openpipes -= 1
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
310 except IOError as e:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
311 if e.errno == errno.EINTR:
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
312 continue
9e6afe7fca31 worker: use one pipe per posix worker and select() in parent process
Danny Hooper <hooper@google.com>
parents: 38536
diff changeset
313 raise
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
314 except: # re-raises
18709
9955fc5ee24b worker: handle worker failures more aggressively
Bryan O'Sullivan <bryano@fb.com>
parents: 18708
diff changeset
315 killworkers()
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
316 cleanup()
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
317 raise
40472
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
318 status = cleanup()
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
319 if status:
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
320 if status < 0:
03f7d0822ec1 worker: do not swallow exception occurred in main process
Yuya Nishihara <yuya@tcha.org>
parents: 38740
diff changeset
321 os.kill(os.getpid(), -status)
45844
8f07f5a9c3de worker: raise exception instead of calling sys.exit() with child's code
Martin von Zweigbergk <martinvonz@google.com>
parents: 45409
diff changeset
322 raise error.WorkerError(status)
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
323 if hasretval:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
324 yield True, retval
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
325
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
326
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
327 def _posixexitstatus(code):
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
328 """convert a posix exit status into the same form returned by
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
329 os.spawnv
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
330
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
331 returns None if the process was stopped instead of exiting"""
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
332 if os.WIFEXITED(code):
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
333 return os.WEXITSTATUS(code)
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
334 elif os.WIFSIGNALED(code):
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
335 return -(os.WTERMSIG(code))
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
336
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
337
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
338 def _windowsworker(ui, func, staticargs, args, hasretval):
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
339 class Worker(threading.Thread):
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
340 def __init__(
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
341 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
342 ):
40508
909c31805f54 py3: roll up threading.Thread constructor args into **kwargs
Matt Harbison <matt_harbison@yahoo.com>
parents: 38740
diff changeset
343 threading.Thread.__init__(self, *args, **kwargs)
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
344 self._taskqueue = taskqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
345 self._resultqueue = resultqueue
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
346 self._func = func
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
347 self._staticargs = staticargs
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
348 self._interrupted = False
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
349 self.daemon = True
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
350 self.exception = None
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
351
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
352 def interrupt(self):
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
353 self._interrupted = True
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
354
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
355 def run(self):
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
356 try:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
357 while not self._taskqueue.empty():
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
358 try:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
359 args = self._taskqueue.get_nowait()
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
360 for res in self._func(*self._staticargs + (args,)):
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
361 self._resultqueue.put(res)
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
362 # threading doesn't provide a native way to
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
363 # interrupt execution. handle it manually at every
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
364 # iteration.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
365 if self._interrupted:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
366 return
37890
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
367 except pycompat.queue.Empty:
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
368 break
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
369 except Exception as e:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
370 # store the exception such that the main thread can resurface
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
371 # it as if the func was running without workers.
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
372 self.exception = e
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
373 raise
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
374
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
375 threads = []
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
376
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
377 def trykillworkers():
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
378 # Allow up to 1 second to clean worker threads nicely
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
379 cleanupend = time.time() + 1
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
380 for t in threads:
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
381 t.interrupt()
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
382 for t in threads:
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
383 remainingtime = cleanupend - time.time()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
384 t.join(remainingtime)
35434
71427ff1dff8 workers: handling exceptions in windows workers
Wojciech Lis <wlis@fb.com>
parents: 35433
diff changeset
385 if t.is_alive():
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
386 # pass over the workers joining failure. it is more
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
387 # important to surface the inital exception than the
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
388 # fact that one of workers may be processing a large
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
389 # task and does not get to handle the interruption.
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
390 ui.warn(
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
391 _(
43077
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
392 b"failed to kill worker threads while "
687b865b95ad formatting: byteify all mercurial/ and hgext/ string literals
Augie Fackler <augie@google.com>
parents: 43076
diff changeset
393 b"handling an exception\n"
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
394 )
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
395 )
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
396 return
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
397
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
398 workers = _numworkers(ui)
37890
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
399 resultqueue = pycompat.queue.Queue()
8fb9985382be pycompat: export queue module instead of symbols in module (API)
Gregory Szorc <gregory.szorc@gmail.com>
parents: 36843
diff changeset
400 taskqueue = pycompat.queue.Queue()
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
401 retval = {}
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
402 # partition work to more pieces than workers to minimize the chance
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
403 # of uneven distribution of large tasks between the workers
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
404 for pargs in partition(args, workers * 20):
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
405 taskqueue.put(pargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
406 for _i in range(workers):
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
407 t = Worker(taskqueue, resultqueue, func, staticargs)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
408 threads.append(t)
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
409 t.start()
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
410 try:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
411 while len(threads) > 0:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
412 while not resultqueue.empty():
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
413 res = resultqueue.get()
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
414 if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
415 retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
416 else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
417 yield res
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
418 threads[0].join(0.05)
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
419 finishedthreads = [_t for _t in threads if not _t.is_alive()]
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
420 for t in finishedthreads:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
421 if t.exception is not None:
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
422 raise t.exception
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
423 threads.remove(t)
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
424 except (Exception, KeyboardInterrupt): # re-raises
35436
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
425 trykillworkers()
86b8cc1f244e worker: make windows workers daemons
Wojciech Lis <wlis@fb.com>
parents: 35435
diff changeset
426 raise
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
427 while not resultqueue.empty():
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
428 res = resultqueue.get()
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
429 if hasretval and res[0]:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
430 retval.update(res[1])
42471
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
431 else:
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
432 yield res
5ca136bbd3f6 worker: support parallelization of functions with return values
Valentin Gatien-Baron <vgatien-baron@janestreet.com>
parents: 40999
diff changeset
433 if hasretval:
42529
d29db0a0c4eb update: fix spurious unclean status bug shown by previous commit
Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
parents: 42471
diff changeset
434 yield True, retval
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
435
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
436
35433
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
437 if pycompat.iswindows:
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
438 _platformworker = _windowsworker
02b36e860e0b workers: implemented worker on windows
Wojciech Lis <wlis@fb.com>
parents: 34646
diff changeset
439 else:
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
440 _platformworker = _posixworker
18707
d1a2b086d058 worker: on error, exit similarly to the first failing worker
Bryan O'Sullivan <bryano@fb.com>
parents: 18638
diff changeset
441 _exitstatus = _posixexitstatus
18638
047110c0e2a8 worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan <bryano@fb.com>
parents: 18637
diff changeset
442
43076
2372284d9457 formatting: blacken the codebase
Augie Fackler <augie@google.com>
parents: 42529
diff changeset
443
18637
ac4dbceeb14a worker: partition a list (of tasks) into equal-sized chunks
Bryan O'Sullivan <bryano@fb.com>
parents: 18636
diff changeset
444 def partition(lst, nslices):
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
445 """partition a list into N slices of roughly equal size
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
446
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
447 The current strategy takes every Nth element from the input. If
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
448 we ever write workers that need to preserve grouping in input
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
449 we should consider allowing callers to specify a partition strategy.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
450
46819
d4ba4d51f85f contributor: change mentions of mpm to olivia
Rapha?l Gom?s <rgomes@octobus.net>
parents: 46405
diff changeset
451 olivia is not a fan of this partitioning strategy when files are involved.
28292
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
452 In his words:
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
453
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
454 Single-threaded Mercurial makes a point of creating and visiting
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
455 files in a fixed order (alphabetical). When creating files in order,
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
456 a typical filesystem is likely to allocate them on nearby regions on
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
457 disk. Thus, when revisiting in the same order, locality is maximized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
458 and various forms of OS and disk-level caching and read-ahead get a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
459 chance to work.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
460
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
461 This effect can be quite significant on spinning disks. I discovered it
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
462 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
463 Tarring a repo and copying it to another disk effectively randomized
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
464 the revlog ordering on disk by sorting the revlogs by hash and suddenly
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
465 performance of my kernel checkout benchmark dropped by ~10x because the
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
466 "working set" of sectors visited no longer fit in the drive's cache and
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
467 the workload switched from streaming to random I/O.
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
468
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
469 What we should really be doing is have workers read filenames from a
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
470 ordered queue. This preserves locality and also keeps any worker from
3eb7faf6d958 worker: document poor partitioning scheme impact
Gregory Szorc <gregory.szorc@gmail.com>
parents: 28181
diff changeset
471 getting more than one file out of balance.
45957
89a2afe31e82 formating: upgrade to black 20.8b1
Augie Fackler <raf@durin42.com>
parents: 45844
diff changeset
472 """
28181
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
473 for i in range(nslices):
f8efc8a3a991 worker: change partition strategy to every Nth element
Gregory Szorc <gregory.szorc@gmail.com>
parents: 26587
diff changeset
474 yield lst[i::nslices]