comparison mercurial/thirdparty/concurrent/futures/_base.py @ 37623:eb687c28a915

thirdparty: vendor futures 3.2.0 Python 3 has a concurrent.futures package in the standard library for representing futures. The "futures" package on PyPI is a backport of this package to work with Python 2. The wire protocol code today has its own future concept for handling of "batch" requests. The frame-based protocol will also want to use futures. I've heavily used the "futures" package on Python 2 in other projects and it is pretty nice. It even has a built-in thread and process pool for running functions in parallel. I've used this heavily for concurrent I/O and other GIL-less activities. The existing futures API in the wire protocol code is not as nice as concurrent.futures. Since concurrent.futures is in the Python standard library and will presumably be the long-term future for futures in our code base, let's vendor the backport so we can use proper futures today. # no-check-commit because of style violations Differential Revision: https://phab.mercurial-scm.org/D3261
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 11 Apr 2018 14:48:24 -0700
parents
children 0a9c0d3480b2
comparison
equal deleted inserted replaced
37622:bfdd20d22a86 37623:eb687c28a915
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 import collections
5 import logging
6 import threading
7 import itertools
8 import time
9 import types
10
11 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
12
13 FIRST_COMPLETED = 'FIRST_COMPLETED'
14 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
15 ALL_COMPLETED = 'ALL_COMPLETED'
16 _AS_COMPLETED = '_AS_COMPLETED'
17
18 # Possible future states (for internal use by the futures package).
19 PENDING = 'PENDING'
20 RUNNING = 'RUNNING'
21 # The future was cancelled by the user...
22 CANCELLED = 'CANCELLED'
23 # ...and _Waiter.add_cancelled() was called by a worker.
24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
25 FINISHED = 'FINISHED'
26
27 _FUTURE_STATES = [
28 PENDING,
29 RUNNING,
30 CANCELLED,
31 CANCELLED_AND_NOTIFIED,
32 FINISHED
33 ]
34
35 _STATE_TO_DESCRIPTION_MAP = {
36 PENDING: "pending",
37 RUNNING: "running",
38 CANCELLED: "cancelled",
39 CANCELLED_AND_NOTIFIED: "cancelled",
40 FINISHED: "finished"
41 }
42
43 # Logger for internal use by the futures package.
44 LOGGER = logging.getLogger("concurrent.futures")
45
46 class Error(Exception):
47 """Base class for all future-related exceptions."""
48 pass
49
50 class CancelledError(Error):
51 """The Future was cancelled."""
52 pass
53
54 class TimeoutError(Error):
55 """The operation exceeded the given deadline."""
56 pass
57
58 class _Waiter(object):
59 """Provides the event that wait() and as_completed() block on."""
60 def __init__(self):
61 self.event = threading.Event()
62 self.finished_futures = []
63
64 def add_result(self, future):
65 self.finished_futures.append(future)
66
67 def add_exception(self, future):
68 self.finished_futures.append(future)
69
70 def add_cancelled(self, future):
71 self.finished_futures.append(future)
72
73 class _AsCompletedWaiter(_Waiter):
74 """Used by as_completed()."""
75
76 def __init__(self):
77 super(_AsCompletedWaiter, self).__init__()
78 self.lock = threading.Lock()
79
80 def add_result(self, future):
81 with self.lock:
82 super(_AsCompletedWaiter, self).add_result(future)
83 self.event.set()
84
85 def add_exception(self, future):
86 with self.lock:
87 super(_AsCompletedWaiter, self).add_exception(future)
88 self.event.set()
89
90 def add_cancelled(self, future):
91 with self.lock:
92 super(_AsCompletedWaiter, self).add_cancelled(future)
93 self.event.set()
94
95 class _FirstCompletedWaiter(_Waiter):
96 """Used by wait(return_when=FIRST_COMPLETED)."""
97
98 def add_result(self, future):
99 super(_FirstCompletedWaiter, self).add_result(future)
100 self.event.set()
101
102 def add_exception(self, future):
103 super(_FirstCompletedWaiter, self).add_exception(future)
104 self.event.set()
105
106 def add_cancelled(self, future):
107 super(_FirstCompletedWaiter, self).add_cancelled(future)
108 self.event.set()
109
110 class _AllCompletedWaiter(_Waiter):
111 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
112
113 def __init__(self, num_pending_calls, stop_on_exception):
114 self.num_pending_calls = num_pending_calls
115 self.stop_on_exception = stop_on_exception
116 self.lock = threading.Lock()
117 super(_AllCompletedWaiter, self).__init__()
118
119 def _decrement_pending_calls(self):
120 with self.lock:
121 self.num_pending_calls -= 1
122 if not self.num_pending_calls:
123 self.event.set()
124
125 def add_result(self, future):
126 super(_AllCompletedWaiter, self).add_result(future)
127 self._decrement_pending_calls()
128
129 def add_exception(self, future):
130 super(_AllCompletedWaiter, self).add_exception(future)
131 if self.stop_on_exception:
132 self.event.set()
133 else:
134 self._decrement_pending_calls()
135
136 def add_cancelled(self, future):
137 super(_AllCompletedWaiter, self).add_cancelled(future)
138 self._decrement_pending_calls()
139
140 class _AcquireFutures(object):
141 """A context manager that does an ordered acquire of Future conditions."""
142
143 def __init__(self, futures):
144 self.futures = sorted(futures, key=id)
145
146 def __enter__(self):
147 for future in self.futures:
148 future._condition.acquire()
149
150 def __exit__(self, *args):
151 for future in self.futures:
152 future._condition.release()
153
154 def _create_and_install_waiters(fs, return_when):
155 if return_when == _AS_COMPLETED:
156 waiter = _AsCompletedWaiter()
157 elif return_when == FIRST_COMPLETED:
158 waiter = _FirstCompletedWaiter()
159 else:
160 pending_count = sum(
161 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
162
163 if return_when == FIRST_EXCEPTION:
164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
165 elif return_when == ALL_COMPLETED:
166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
167 else:
168 raise ValueError("Invalid return condition: %r" % return_when)
169
170 for f in fs:
171 f._waiters.append(waiter)
172
173 return waiter
174
175
176 def _yield_finished_futures(fs, waiter, ref_collect):
177 """
178 Iterate on the list *fs*, yielding finished futures one by one in
179 reverse order.
180 Before yielding a future, *waiter* is removed from its waiters
181 and the future is removed from each set in the collection of sets
182 *ref_collect*.
183
184 The aim of this function is to avoid keeping stale references after
185 the future is yielded and before the iterator resumes.
186 """
187 while fs:
188 f = fs[-1]
189 for futures_set in ref_collect:
190 futures_set.remove(f)
191 with f._condition:
192 f._waiters.remove(waiter)
193 del f
194 # Careful not to keep a reference to the popped value
195 yield fs.pop()
196
197
198 def as_completed(fs, timeout=None):
199 """An iterator over the given futures that yields each as it completes.
200
201 Args:
202 fs: The sequence of Futures (possibly created by different Executors) to
203 iterate over.
204 timeout: The maximum number of seconds to wait. If None, then there
205 is no limit on the wait time.
206
207 Returns:
208 An iterator that yields the given Futures as they complete (finished or
209 cancelled). If any given Futures are duplicated, they will be returned
210 once.
211
212 Raises:
213 TimeoutError: If the entire result iterator could not be generated
214 before the given timeout.
215 """
216 if timeout is not None:
217 end_time = timeout + time.time()
218
219 fs = set(fs)
220 total_futures = len(fs)
221 with _AcquireFutures(fs):
222 finished = set(
223 f for f in fs
224 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
225 pending = fs - finished
226 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
227 finished = list(finished)
228 try:
229 for f in _yield_finished_futures(finished, waiter,
230 ref_collect=(fs,)):
231 f = [f]
232 yield f.pop()
233
234 while pending:
235 if timeout is None:
236 wait_timeout = None
237 else:
238 wait_timeout = end_time - time.time()
239 if wait_timeout < 0:
240 raise TimeoutError(
241 '%d (of %d) futures unfinished' % (
242 len(pending), total_futures))
243
244 waiter.event.wait(wait_timeout)
245
246 with waiter.lock:
247 finished = waiter.finished_futures
248 waiter.finished_futures = []
249 waiter.event.clear()
250
251 # reverse to keep finishing order
252 finished.reverse()
253 for f in _yield_finished_futures(finished, waiter,
254 ref_collect=(fs, pending)):
255 f = [f]
256 yield f.pop()
257
258 finally:
259 # Remove waiter from unfinished futures
260 for f in fs:
261 with f._condition:
262 f._waiters.remove(waiter)
263
264 DoneAndNotDoneFutures = collections.namedtuple(
265 'DoneAndNotDoneFutures', 'done not_done')
266 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
267 """Wait for the futures in the given sequence to complete.
268
269 Args:
270 fs: The sequence of Futures (possibly created by different Executors) to
271 wait upon.
272 timeout: The maximum number of seconds to wait. If None, then there
273 is no limit on the wait time.
274 return_when: Indicates when this function should return. The options
275 are:
276
277 FIRST_COMPLETED - Return when any future finishes or is
278 cancelled.
279 FIRST_EXCEPTION - Return when any future finishes by raising an
280 exception. If no future raises an exception
281 then it is equivalent to ALL_COMPLETED.
282 ALL_COMPLETED - Return when all futures finish or are cancelled.
283
284 Returns:
285 A named 2-tuple of sets. The first set, named 'done', contains the
286 futures that completed (is finished or cancelled) before the wait
287 completed. The second set, named 'not_done', contains uncompleted
288 futures.
289 """
290 with _AcquireFutures(fs):
291 done = set(f for f in fs
292 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
293 not_done = set(fs) - done
294
295 if (return_when == FIRST_COMPLETED) and done:
296 return DoneAndNotDoneFutures(done, not_done)
297 elif (return_when == FIRST_EXCEPTION) and done:
298 if any(f for f in done
299 if not f.cancelled() and f.exception() is not None):
300 return DoneAndNotDoneFutures(done, not_done)
301
302 if len(done) == len(fs):
303 return DoneAndNotDoneFutures(done, not_done)
304
305 waiter = _create_and_install_waiters(fs, return_when)
306
307 waiter.event.wait(timeout)
308 for f in fs:
309 with f._condition:
310 f._waiters.remove(waiter)
311
312 done.update(waiter.finished_futures)
313 return DoneAndNotDoneFutures(done, set(fs) - done)
314
315 class Future(object):
316 """Represents the result of an asynchronous computation."""
317
318 def __init__(self):
319 """Initializes the future. Should not be called by clients."""
320 self._condition = threading.Condition()
321 self._state = PENDING
322 self._result = None
323 self._exception = None
324 self._traceback = None
325 self._waiters = []
326 self._done_callbacks = []
327
328 def _invoke_callbacks(self):
329 for callback in self._done_callbacks:
330 try:
331 callback(self)
332 except Exception:
333 LOGGER.exception('exception calling callback for %r', self)
334 except BaseException:
335 # Explicitly let all other new-style exceptions through so
336 # that we can catch all old-style exceptions with a simple
337 # "except:" clause below.
338 #
339 # All old-style exception objects are instances of
340 # types.InstanceType, but "except types.InstanceType:" does
341 # not catch old-style exceptions for some reason. Thus, the
342 # only way to catch all old-style exceptions without catching
343 # any new-style exceptions is to filter out the new-style
344 # exceptions, which all derive from BaseException.
345 raise
346 except:
347 # Because of the BaseException clause above, this handler only
348 # executes for old-style exception objects.
349 LOGGER.exception('exception calling callback for %r', self)
350
351 def __repr__(self):
352 with self._condition:
353 if self._state == FINISHED:
354 if self._exception:
355 return '<%s at %#x state=%s raised %s>' % (
356 self.__class__.__name__,
357 id(self),
358 _STATE_TO_DESCRIPTION_MAP[self._state],
359 self._exception.__class__.__name__)
360 else:
361 return '<%s at %#x state=%s returned %s>' % (
362 self.__class__.__name__,
363 id(self),
364 _STATE_TO_DESCRIPTION_MAP[self._state],
365 self._result.__class__.__name__)
366 return '<%s at %#x state=%s>' % (
367 self.__class__.__name__,
368 id(self),
369 _STATE_TO_DESCRIPTION_MAP[self._state])
370
371 def cancel(self):
372 """Cancel the future if possible.
373
374 Returns True if the future was cancelled, False otherwise. A future
375 cannot be cancelled if it is running or has already completed.
376 """
377 with self._condition:
378 if self._state in [RUNNING, FINISHED]:
379 return False
380
381 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
382 return True
383
384 self._state = CANCELLED
385 self._condition.notify_all()
386
387 self._invoke_callbacks()
388 return True
389
390 def cancelled(self):
391 """Return True if the future was cancelled."""
392 with self._condition:
393 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
394
395 def running(self):
396 """Return True if the future is currently executing."""
397 with self._condition:
398 return self._state == RUNNING
399
400 def done(self):
401 """Return True of the future was cancelled or finished executing."""
402 with self._condition:
403 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
404
405 def __get_result(self):
406 if self._exception:
407 if isinstance(self._exception, types.InstanceType):
408 # The exception is an instance of an old-style class, which
409 # means type(self._exception) returns types.ClassType instead
410 # of the exception's actual class type.
411 exception_type = self._exception.__class__
412 else:
413 exception_type = type(self._exception)
414 raise exception_type, self._exception, self._traceback
415 else:
416 return self._result
417
418 def add_done_callback(self, fn):
419 """Attaches a callable that will be called when the future finishes.
420
421 Args:
422 fn: A callable that will be called with this future as its only
423 argument when the future completes or is cancelled. The callable
424 will always be called by a thread in the same process in which
425 it was added. If the future has already completed or been
426 cancelled then the callable will be called immediately. These
427 callables are called in the order that they were added.
428 """
429 with self._condition:
430 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
431 self._done_callbacks.append(fn)
432 return
433 fn(self)
434
435 def result(self, timeout=None):
436 """Return the result of the call that the future represents.
437
438 Args:
439 timeout: The number of seconds to wait for the result if the future
440 isn't done. If None, then there is no limit on the wait time.
441
442 Returns:
443 The result of the call that the future represents.
444
445 Raises:
446 CancelledError: If the future was cancelled.
447 TimeoutError: If the future didn't finish executing before the given
448 timeout.
449 Exception: If the call raised then that exception will be raised.
450 """
451 with self._condition:
452 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
453 raise CancelledError()
454 elif self._state == FINISHED:
455 return self.__get_result()
456
457 self._condition.wait(timeout)
458
459 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
460 raise CancelledError()
461 elif self._state == FINISHED:
462 return self.__get_result()
463 else:
464 raise TimeoutError()
465
466 def exception_info(self, timeout=None):
467 """Return a tuple of (exception, traceback) raised by the call that the
468 future represents.
469
470 Args:
471 timeout: The number of seconds to wait for the exception if the
472 future isn't done. If None, then there is no limit on the wait
473 time.
474
475 Returns:
476 The exception raised by the call that the future represents or None
477 if the call completed without raising.
478
479 Raises:
480 CancelledError: If the future was cancelled.
481 TimeoutError: If the future didn't finish executing before the given
482 timeout.
483 """
484 with self._condition:
485 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
486 raise CancelledError()
487 elif self._state == FINISHED:
488 return self._exception, self._traceback
489
490 self._condition.wait(timeout)
491
492 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
493 raise CancelledError()
494 elif self._state == FINISHED:
495 return self._exception, self._traceback
496 else:
497 raise TimeoutError()
498
499 def exception(self, timeout=None):
500 """Return the exception raised by the call that the future represents.
501
502 Args:
503 timeout: The number of seconds to wait for the exception if the
504 future isn't done. If None, then there is no limit on the wait
505 time.
506
507 Returns:
508 The exception raised by the call that the future represents or None
509 if the call completed without raising.
510
511 Raises:
512 CancelledError: If the future was cancelled.
513 TimeoutError: If the future didn't finish executing before the given
514 timeout.
515 """
516 return self.exception_info(timeout)[0]
517
518 # The following methods should only be used by Executors and in tests.
519 def set_running_or_notify_cancel(self):
520 """Mark the future as running or process any cancel notifications.
521
522 Should only be used by Executor implementations and unit tests.
523
524 If the future has been cancelled (cancel() was called and returned
525 True) then any threads waiting on the future completing (though calls
526 to as_completed() or wait()) are notified and False is returned.
527
528 If the future was not cancelled then it is put in the running state
529 (future calls to running() will return True) and True is returned.
530
531 This method should be called by Executor implementations before
532 executing the work associated with this future. If this method returns
533 False then the work should not be executed.
534
535 Returns:
536 False if the Future was cancelled, True otherwise.
537
538 Raises:
539 RuntimeError: if this method was already called or if set_result()
540 or set_exception() was called.
541 """
542 with self._condition:
543 if self._state == CANCELLED:
544 self._state = CANCELLED_AND_NOTIFIED
545 for waiter in self._waiters:
546 waiter.add_cancelled(self)
547 # self._condition.notify_all() is not necessary because
548 # self.cancel() triggers a notification.
549 return False
550 elif self._state == PENDING:
551 self._state = RUNNING
552 return True
553 else:
554 LOGGER.critical('Future %s in unexpected state: %s',
555 id(self),
556 self._state)
557 raise RuntimeError('Future in unexpected state')
558
559 def set_result(self, result):
560 """Sets the return value of work associated with the future.
561
562 Should only be used by Executor implementations and unit tests.
563 """
564 with self._condition:
565 self._result = result
566 self._state = FINISHED
567 for waiter in self._waiters:
568 waiter.add_result(self)
569 self._condition.notify_all()
570 self._invoke_callbacks()
571
572 def set_exception_info(self, exception, traceback):
573 """Sets the result of the future as being the given exception
574 and traceback.
575
576 Should only be used by Executor implementations and unit tests.
577 """
578 with self._condition:
579 self._exception = exception
580 self._traceback = traceback
581 self._state = FINISHED
582 for waiter in self._waiters:
583 waiter.add_exception(self)
584 self._condition.notify_all()
585 self._invoke_callbacks()
586
587 def set_exception(self, exception):
588 """Sets the result of the future as being the given exception.
589
590 Should only be used by Executor implementations and unit tests.
591 """
592 self.set_exception_info(exception, None)
593
594 class Executor(object):
595 """This is an abstract base class for concrete asynchronous executors."""
596
597 def submit(self, fn, *args, **kwargs):
598 """Submits a callable to be executed with the given arguments.
599
600 Schedules the callable to be executed as fn(*args, **kwargs) and returns
601 a Future instance representing the execution of the callable.
602
603 Returns:
604 A Future representing the given call.
605 """
606 raise NotImplementedError()
607
608 def map(self, fn, *iterables, **kwargs):
609 """Returns an iterator equivalent to map(fn, iter).
610
611 Args:
612 fn: A callable that will take as many arguments as there are
613 passed iterables.
614 timeout: The maximum number of seconds to wait. If None, then there
615 is no limit on the wait time.
616
617 Returns:
618 An iterator equivalent to: map(func, *iterables) but the calls may
619 be evaluated out-of-order.
620
621 Raises:
622 TimeoutError: If the entire result iterator could not be generated
623 before the given timeout.
624 Exception: If fn(*args) raises for any values.
625 """
626 timeout = kwargs.get('timeout')
627 if timeout is not None:
628 end_time = timeout + time.time()
629
630 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
631
632 # Yield must be hidden in closure so that the futures are submitted
633 # before the first iterator value is required.
634 def result_iterator():
635 try:
636 # reverse to keep finishing order
637 fs.reverse()
638 while fs:
639 # Careful not to keep a reference to the popped future
640 if timeout is None:
641 yield fs.pop().result()
642 else:
643 yield fs.pop().result(end_time - time.time())
644 finally:
645 for future in fs:
646 future.cancel()
647 return result_iterator()
648
649 def shutdown(self, wait=True):
650 """Clean-up the resources associated with the Executor.
651
652 It is safe to call this method several times. Otherwise, no other
653 methods can be called after this one.
654
655 Args:
656 wait: If True then shutdown will not return until all running
657 futures have finished executing and the resources used by the
658 executor have been reclaimed.
659 """
660 pass
661
662 def __enter__(self):
663 return self
664
665 def __exit__(self, exc_type, exc_val, exc_tb):
666 self.shutdown(wait=True)
667 return False