Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/worker.py @ 49975:3eef8baf6b92
worker: avoid reading 1 byte at a time from the OS pipe
Apparently `pickle.load` does a lot of small reads, many of them
literally 1-byte, so it benefits greatly from buffering.
This change enables the buffering, at the cost of more complicated
interaction with the `selector` API.
On one repository with ~400k files this reduces the time by about ~30s,
from ~60 to ~30s. The difference is so large because the actual updating
work is parallellized, while these small reads are bottlenecking the
central hg process.
author | Arseniy Alekseyev <aalekseyev@janestreet.com> |
---|---|
date | Fri, 06 Jan 2023 15:17:14 +0000 |
parents | 3556f0392808 |
children | d0b8bbf603d7 |
comparison
equal
deleted
inserted
replaced
49974:024e0580b853 | 49975:3eef8baf6b92 |
---|---|
57 return min(max(countcpus(), 4), 32) | 57 return min(max(countcpus(), 4), 32) |
58 | 58 |
59 | 59 |
60 def ismainthread(): | 60 def ismainthread(): |
61 return threading.current_thread() == threading.main_thread() | 61 return threading.current_thread() == threading.main_thread() |
62 | |
63 | |
64 class _blockingreader: | |
65 """Wrap unbuffered stream such that pickle.load() works with it. | |
66 | |
67 pickle.load() expects that calls to read() and readinto() read as many | |
68 bytes as requested. On EOF, it is fine to read fewer bytes. In this case, | |
69 pickle.load() raises an EOFError. | |
70 """ | |
71 | |
72 def __init__(self, wrapped): | |
73 self._wrapped = wrapped | |
74 | |
75 def readline(self): | |
76 return self._wrapped.readline() | |
77 | |
78 def readinto(self, buf): | |
79 pos = 0 | |
80 size = len(buf) | |
81 | |
82 with memoryview(buf) as view: | |
83 while pos < size: | |
84 with view[pos:] as subview: | |
85 ret = self._wrapped.readinto(subview) | |
86 if not ret: | |
87 break | |
88 pos += ret | |
89 | |
90 return pos | |
91 | |
92 # issue multiple reads until size is fulfilled (or EOF is encountered) | |
93 def read(self, size=-1): | |
94 if size < 0: | |
95 return self._wrapped.readall() | |
96 | |
97 buf = bytearray(size) | |
98 n_read = self.readinto(buf) | |
99 del buf[n_read:] | |
100 return bytes(buf) | |
101 | 62 |
102 | 63 |
103 if pycompat.isposix or pycompat.iswindows: | 64 if pycompat.isposix or pycompat.iswindows: |
104 _STARTUP_COST = 0.01 | 65 _STARTUP_COST = 0.01 |
105 # The Windows worker is thread based. If tasks are CPU bound, threads | 66 # The Windows worker is thread based. If tasks are CPU bound, threads |
274 os._exit(ret & 255) | 235 os._exit(ret & 255) |
275 pids.add(pid) | 236 pids.add(pid) |
276 selector = selectors.DefaultSelector() | 237 selector = selectors.DefaultSelector() |
277 for rfd, wfd in pipes: | 238 for rfd, wfd in pipes: |
278 os.close(wfd) | 239 os.close(wfd) |
279 # The stream has to be unbuffered. Otherwise, if all data is read from | 240 # Buffering is needed for performance, but it also presents a problem: |
280 # the raw file into the buffer, the selector thinks that the FD is not | 241 # selector doesn't take the buffered data into account, |
281 # ready to read while pickle.load() could read from the buffer. This | 242 # so we have to arrange it so that the buffers are empty when select is called |
282 # would delay the processing of readable items. | 243 # (see [peek_nonblock]) |
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ) | 244 selector.register(os.fdopen(rfd, 'rb', 4096), selectors.EVENT_READ) |
245 | |
246 def peek_nonblock(f): | |
247 os.set_blocking(f.fileno(), False) | |
248 res = f.peek() | |
249 os.set_blocking(f.fileno(), True) | |
250 return res | |
251 | |
252 def load_all(f): | |
253 # The pytype error likely goes away on a modern version of | |
254 # pytype having a modern typeshed snapshot. | |
255 # pytype: disable=wrong-arg-types | |
256 yield pickle.load(f) | |
257 while len(peek_nonblock(f)) > 0: | |
258 yield pickle.load(f) | |
259 # pytype: enable=wrong-arg-types | |
284 | 260 |
285 def cleanup(): | 261 def cleanup(): |
286 signal.signal(signal.SIGINT, oldhandler) | 262 signal.signal(signal.SIGINT, oldhandler) |
287 waitforworkers() | 263 waitforworkers() |
288 signal.signal(signal.SIGCHLD, oldchldhandler) | 264 signal.signal(signal.SIGCHLD, oldchldhandler) |
292 try: | 268 try: |
293 openpipes = len(pipes) | 269 openpipes = len(pipes) |
294 while openpipes > 0: | 270 while openpipes > 0: |
295 for key, events in selector.select(): | 271 for key, events in selector.select(): |
296 try: | 272 try: |
297 # The pytype error likely goes away on a modern version of | 273 for res in load_all(key.fileobj): |
298 # pytype having a modern typeshed snapshot. | 274 if hasretval and res[0]: |
299 # pytype: disable=wrong-arg-types | 275 retval.update(res[1]) |
300 res = pickle.load(_blockingreader(key.fileobj)) | 276 else: |
301 # pytype: enable=wrong-arg-types | 277 yield res |
302 if hasretval and res[0]: | |
303 retval.update(res[1]) | |
304 else: | |
305 yield res | |
306 except EOFError: | 278 except EOFError: |
307 selector.unregister(key.fileobj) | 279 selector.unregister(key.fileobj) |
308 # pytype: disable=attribute-error | 280 # pytype: disable=attribute-error |
309 key.fileobj.close() | 281 key.fileobj.close() |
310 # pytype: enable=attribute-error | 282 # pytype: enable=attribute-error |