Mercurial > public > mercurial-scm > hg
comparison mercurial/thirdparty/concurrent/futures/_base.py @ 37623:eb687c28a915
thirdparty: vendor futures 3.2.0
Python 3 has a concurrent.futures package in the standard library
for representing futures. The "futures" package on PyPI is a backport
of this package to work with Python 2.
The wire protocol code today has its own future concept for handling
of "batch" requests. The frame-based protocol will also want to
use futures.
I've heavily used the "futures" package on Python 2 in other projects
and it is pretty nice. It even has a built-in thread and process pool
for running functions in parallel. I've used this heavily for concurrent
I/O and other GIL-less activities.
The existing futures API in the wire protocol code is not as nice as
concurrent.futures. Since concurrent.futures is in the Python standard
library and will presumably be the long-term future for futures in our
code base, let's vendor the backport so we can use proper futures today.
# no-check-commit because of style violations
Differential Revision: https://phab.mercurial-scm.org/D3261
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 11 Apr 2018 14:48:24 -0700 |
parents | |
children | 0a9c0d3480b2 |
comparison
equal
deleted
inserted
replaced
37622:bfdd20d22a86 | 37623:eb687c28a915 |
---|---|
1 # Copyright 2009 Brian Quinlan. All Rights Reserved. | |
2 # Licensed to PSF under a Contributor Agreement. | |
3 | |
4 import collections | |
5 import logging | |
6 import threading | |
7 import itertools | |
8 import time | |
9 import types | |
10 | |
11 __author__ = 'Brian Quinlan (brian@sweetapp.com)' | |
12 | |
13 FIRST_COMPLETED = 'FIRST_COMPLETED' | |
14 FIRST_EXCEPTION = 'FIRST_EXCEPTION' | |
15 ALL_COMPLETED = 'ALL_COMPLETED' | |
16 _AS_COMPLETED = '_AS_COMPLETED' | |
17 | |
18 # Possible future states (for internal use by the futures package). | |
19 PENDING = 'PENDING' | |
20 RUNNING = 'RUNNING' | |
21 # The future was cancelled by the user... | |
22 CANCELLED = 'CANCELLED' | |
23 # ...and _Waiter.add_cancelled() was called by a worker. | |
24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' | |
25 FINISHED = 'FINISHED' | |
26 | |
27 _FUTURE_STATES = [ | |
28 PENDING, | |
29 RUNNING, | |
30 CANCELLED, | |
31 CANCELLED_AND_NOTIFIED, | |
32 FINISHED | |
33 ] | |
34 | |
35 _STATE_TO_DESCRIPTION_MAP = { | |
36 PENDING: "pending", | |
37 RUNNING: "running", | |
38 CANCELLED: "cancelled", | |
39 CANCELLED_AND_NOTIFIED: "cancelled", | |
40 FINISHED: "finished" | |
41 } | |
42 | |
43 # Logger for internal use by the futures package. | |
44 LOGGER = logging.getLogger("concurrent.futures") | |
45 | |
46 class Error(Exception): | |
47 """Base class for all future-related exceptions.""" | |
48 pass | |
49 | |
50 class CancelledError(Error): | |
51 """The Future was cancelled.""" | |
52 pass | |
53 | |
54 class TimeoutError(Error): | |
55 """The operation exceeded the given deadline.""" | |
56 pass | |
57 | |
58 class _Waiter(object): | |
59 """Provides the event that wait() and as_completed() block on.""" | |
60 def __init__(self): | |
61 self.event = threading.Event() | |
62 self.finished_futures = [] | |
63 | |
64 def add_result(self, future): | |
65 self.finished_futures.append(future) | |
66 | |
67 def add_exception(self, future): | |
68 self.finished_futures.append(future) | |
69 | |
70 def add_cancelled(self, future): | |
71 self.finished_futures.append(future) | |
72 | |
73 class _AsCompletedWaiter(_Waiter): | |
74 """Used by as_completed().""" | |
75 | |
76 def __init__(self): | |
77 super(_AsCompletedWaiter, self).__init__() | |
78 self.lock = threading.Lock() | |
79 | |
80 def add_result(self, future): | |
81 with self.lock: | |
82 super(_AsCompletedWaiter, self).add_result(future) | |
83 self.event.set() | |
84 | |
85 def add_exception(self, future): | |
86 with self.lock: | |
87 super(_AsCompletedWaiter, self).add_exception(future) | |
88 self.event.set() | |
89 | |
90 def add_cancelled(self, future): | |
91 with self.lock: | |
92 super(_AsCompletedWaiter, self).add_cancelled(future) | |
93 self.event.set() | |
94 | |
95 class _FirstCompletedWaiter(_Waiter): | |
96 """Used by wait(return_when=FIRST_COMPLETED).""" | |
97 | |
98 def add_result(self, future): | |
99 super(_FirstCompletedWaiter, self).add_result(future) | |
100 self.event.set() | |
101 | |
102 def add_exception(self, future): | |
103 super(_FirstCompletedWaiter, self).add_exception(future) | |
104 self.event.set() | |
105 | |
106 def add_cancelled(self, future): | |
107 super(_FirstCompletedWaiter, self).add_cancelled(future) | |
108 self.event.set() | |
109 | |
110 class _AllCompletedWaiter(_Waiter): | |
111 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" | |
112 | |
113 def __init__(self, num_pending_calls, stop_on_exception): | |
114 self.num_pending_calls = num_pending_calls | |
115 self.stop_on_exception = stop_on_exception | |
116 self.lock = threading.Lock() | |
117 super(_AllCompletedWaiter, self).__init__() | |
118 | |
119 def _decrement_pending_calls(self): | |
120 with self.lock: | |
121 self.num_pending_calls -= 1 | |
122 if not self.num_pending_calls: | |
123 self.event.set() | |
124 | |
125 def add_result(self, future): | |
126 super(_AllCompletedWaiter, self).add_result(future) | |
127 self._decrement_pending_calls() | |
128 | |
129 def add_exception(self, future): | |
130 super(_AllCompletedWaiter, self).add_exception(future) | |
131 if self.stop_on_exception: | |
132 self.event.set() | |
133 else: | |
134 self._decrement_pending_calls() | |
135 | |
136 def add_cancelled(self, future): | |
137 super(_AllCompletedWaiter, self).add_cancelled(future) | |
138 self._decrement_pending_calls() | |
139 | |
140 class _AcquireFutures(object): | |
141 """A context manager that does an ordered acquire of Future conditions.""" | |
142 | |
143 def __init__(self, futures): | |
144 self.futures = sorted(futures, key=id) | |
145 | |
146 def __enter__(self): | |
147 for future in self.futures: | |
148 future._condition.acquire() | |
149 | |
150 def __exit__(self, *args): | |
151 for future in self.futures: | |
152 future._condition.release() | |
153 | |
154 def _create_and_install_waiters(fs, return_when): | |
155 if return_when == _AS_COMPLETED: | |
156 waiter = _AsCompletedWaiter() | |
157 elif return_when == FIRST_COMPLETED: | |
158 waiter = _FirstCompletedWaiter() | |
159 else: | |
160 pending_count = sum( | |
161 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) | |
162 | |
163 if return_when == FIRST_EXCEPTION: | |
164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) | |
165 elif return_when == ALL_COMPLETED: | |
166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) | |
167 else: | |
168 raise ValueError("Invalid return condition: %r" % return_when) | |
169 | |
170 for f in fs: | |
171 f._waiters.append(waiter) | |
172 | |
173 return waiter | |
174 | |
175 | |
176 def _yield_finished_futures(fs, waiter, ref_collect): | |
177 """ | |
178 Iterate on the list *fs*, yielding finished futures one by one in | |
179 reverse order. | |
180 Before yielding a future, *waiter* is removed from its waiters | |
181 and the future is removed from each set in the collection of sets | |
182 *ref_collect*. | |
183 | |
184 The aim of this function is to avoid keeping stale references after | |
185 the future is yielded and before the iterator resumes. | |
186 """ | |
187 while fs: | |
188 f = fs[-1] | |
189 for futures_set in ref_collect: | |
190 futures_set.remove(f) | |
191 with f._condition: | |
192 f._waiters.remove(waiter) | |
193 del f | |
194 # Careful not to keep a reference to the popped value | |
195 yield fs.pop() | |
196 | |
197 | |
198 def as_completed(fs, timeout=None): | |
199 """An iterator over the given futures that yields each as it completes. | |
200 | |
201 Args: | |
202 fs: The sequence of Futures (possibly created by different Executors) to | |
203 iterate over. | |
204 timeout: The maximum number of seconds to wait. If None, then there | |
205 is no limit on the wait time. | |
206 | |
207 Returns: | |
208 An iterator that yields the given Futures as they complete (finished or | |
209 cancelled). If any given Futures are duplicated, they will be returned | |
210 once. | |
211 | |
212 Raises: | |
213 TimeoutError: If the entire result iterator could not be generated | |
214 before the given timeout. | |
215 """ | |
216 if timeout is not None: | |
217 end_time = timeout + time.time() | |
218 | |
219 fs = set(fs) | |
220 total_futures = len(fs) | |
221 with _AcquireFutures(fs): | |
222 finished = set( | |
223 f for f in fs | |
224 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) | |
225 pending = fs - finished | |
226 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) | |
227 finished = list(finished) | |
228 try: | |
229 for f in _yield_finished_futures(finished, waiter, | |
230 ref_collect=(fs,)): | |
231 f = [f] | |
232 yield f.pop() | |
233 | |
234 while pending: | |
235 if timeout is None: | |
236 wait_timeout = None | |
237 else: | |
238 wait_timeout = end_time - time.time() | |
239 if wait_timeout < 0: | |
240 raise TimeoutError( | |
241 '%d (of %d) futures unfinished' % ( | |
242 len(pending), total_futures)) | |
243 | |
244 waiter.event.wait(wait_timeout) | |
245 | |
246 with waiter.lock: | |
247 finished = waiter.finished_futures | |
248 waiter.finished_futures = [] | |
249 waiter.event.clear() | |
250 | |
251 # reverse to keep finishing order | |
252 finished.reverse() | |
253 for f in _yield_finished_futures(finished, waiter, | |
254 ref_collect=(fs, pending)): | |
255 f = [f] | |
256 yield f.pop() | |
257 | |
258 finally: | |
259 # Remove waiter from unfinished futures | |
260 for f in fs: | |
261 with f._condition: | |
262 f._waiters.remove(waiter) | |
263 | |
264 DoneAndNotDoneFutures = collections.namedtuple( | |
265 'DoneAndNotDoneFutures', 'done not_done') | |
266 def wait(fs, timeout=None, return_when=ALL_COMPLETED): | |
267 """Wait for the futures in the given sequence to complete. | |
268 | |
269 Args: | |
270 fs: The sequence of Futures (possibly created by different Executors) to | |
271 wait upon. | |
272 timeout: The maximum number of seconds to wait. If None, then there | |
273 is no limit on the wait time. | |
274 return_when: Indicates when this function should return. The options | |
275 are: | |
276 | |
277 FIRST_COMPLETED - Return when any future finishes or is | |
278 cancelled. | |
279 FIRST_EXCEPTION - Return when any future finishes by raising an | |
280 exception. If no future raises an exception | |
281 then it is equivalent to ALL_COMPLETED. | |
282 ALL_COMPLETED - Return when all futures finish or are cancelled. | |
283 | |
284 Returns: | |
285 A named 2-tuple of sets. The first set, named 'done', contains the | |
286 futures that completed (is finished or cancelled) before the wait | |
287 completed. The second set, named 'not_done', contains uncompleted | |
288 futures. | |
289 """ | |
290 with _AcquireFutures(fs): | |
291 done = set(f for f in fs | |
292 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) | |
293 not_done = set(fs) - done | |
294 | |
295 if (return_when == FIRST_COMPLETED) and done: | |
296 return DoneAndNotDoneFutures(done, not_done) | |
297 elif (return_when == FIRST_EXCEPTION) and done: | |
298 if any(f for f in done | |
299 if not f.cancelled() and f.exception() is not None): | |
300 return DoneAndNotDoneFutures(done, not_done) | |
301 | |
302 if len(done) == len(fs): | |
303 return DoneAndNotDoneFutures(done, not_done) | |
304 | |
305 waiter = _create_and_install_waiters(fs, return_when) | |
306 | |
307 waiter.event.wait(timeout) | |
308 for f in fs: | |
309 with f._condition: | |
310 f._waiters.remove(waiter) | |
311 | |
312 done.update(waiter.finished_futures) | |
313 return DoneAndNotDoneFutures(done, set(fs) - done) | |
314 | |
315 class Future(object): | |
316 """Represents the result of an asynchronous computation.""" | |
317 | |
318 def __init__(self): | |
319 """Initializes the future. Should not be called by clients.""" | |
320 self._condition = threading.Condition() | |
321 self._state = PENDING | |
322 self._result = None | |
323 self._exception = None | |
324 self._traceback = None | |
325 self._waiters = [] | |
326 self._done_callbacks = [] | |
327 | |
328 def _invoke_callbacks(self): | |
329 for callback in self._done_callbacks: | |
330 try: | |
331 callback(self) | |
332 except Exception: | |
333 LOGGER.exception('exception calling callback for %r', self) | |
334 except BaseException: | |
335 # Explicitly let all other new-style exceptions through so | |
336 # that we can catch all old-style exceptions with a simple | |
337 # "except:" clause below. | |
338 # | |
339 # All old-style exception objects are instances of | |
340 # types.InstanceType, but "except types.InstanceType:" does | |
341 # not catch old-style exceptions for some reason. Thus, the | |
342 # only way to catch all old-style exceptions without catching | |
343 # any new-style exceptions is to filter out the new-style | |
344 # exceptions, which all derive from BaseException. | |
345 raise | |
346 except: | |
347 # Because of the BaseException clause above, this handler only | |
348 # executes for old-style exception objects. | |
349 LOGGER.exception('exception calling callback for %r', self) | |
350 | |
351 def __repr__(self): | |
352 with self._condition: | |
353 if self._state == FINISHED: | |
354 if self._exception: | |
355 return '<%s at %#x state=%s raised %s>' % ( | |
356 self.__class__.__name__, | |
357 id(self), | |
358 _STATE_TO_DESCRIPTION_MAP[self._state], | |
359 self._exception.__class__.__name__) | |
360 else: | |
361 return '<%s at %#x state=%s returned %s>' % ( | |
362 self.__class__.__name__, | |
363 id(self), | |
364 _STATE_TO_DESCRIPTION_MAP[self._state], | |
365 self._result.__class__.__name__) | |
366 return '<%s at %#x state=%s>' % ( | |
367 self.__class__.__name__, | |
368 id(self), | |
369 _STATE_TO_DESCRIPTION_MAP[self._state]) | |
370 | |
371 def cancel(self): | |
372 """Cancel the future if possible. | |
373 | |
374 Returns True if the future was cancelled, False otherwise. A future | |
375 cannot be cancelled if it is running or has already completed. | |
376 """ | |
377 with self._condition: | |
378 if self._state in [RUNNING, FINISHED]: | |
379 return False | |
380 | |
381 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |
382 return True | |
383 | |
384 self._state = CANCELLED | |
385 self._condition.notify_all() | |
386 | |
387 self._invoke_callbacks() | |
388 return True | |
389 | |
390 def cancelled(self): | |
391 """Return True if the future was cancelled.""" | |
392 with self._condition: | |
393 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] | |
394 | |
395 def running(self): | |
396 """Return True if the future is currently executing.""" | |
397 with self._condition: | |
398 return self._state == RUNNING | |
399 | |
400 def done(self): | |
401 """Return True of the future was cancelled or finished executing.""" | |
402 with self._condition: | |
403 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] | |
404 | |
405 def __get_result(self): | |
406 if self._exception: | |
407 if isinstance(self._exception, types.InstanceType): | |
408 # The exception is an instance of an old-style class, which | |
409 # means type(self._exception) returns types.ClassType instead | |
410 # of the exception's actual class type. | |
411 exception_type = self._exception.__class__ | |
412 else: | |
413 exception_type = type(self._exception) | |
414 raise exception_type, self._exception, self._traceback | |
415 else: | |
416 return self._result | |
417 | |
418 def add_done_callback(self, fn): | |
419 """Attaches a callable that will be called when the future finishes. | |
420 | |
421 Args: | |
422 fn: A callable that will be called with this future as its only | |
423 argument when the future completes or is cancelled. The callable | |
424 will always be called by a thread in the same process in which | |
425 it was added. If the future has already completed or been | |
426 cancelled then the callable will be called immediately. These | |
427 callables are called in the order that they were added. | |
428 """ | |
429 with self._condition: | |
430 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: | |
431 self._done_callbacks.append(fn) | |
432 return | |
433 fn(self) | |
434 | |
435 def result(self, timeout=None): | |
436 """Return the result of the call that the future represents. | |
437 | |
438 Args: | |
439 timeout: The number of seconds to wait for the result if the future | |
440 isn't done. If None, then there is no limit on the wait time. | |
441 | |
442 Returns: | |
443 The result of the call that the future represents. | |
444 | |
445 Raises: | |
446 CancelledError: If the future was cancelled. | |
447 TimeoutError: If the future didn't finish executing before the given | |
448 timeout. | |
449 Exception: If the call raised then that exception will be raised. | |
450 """ | |
451 with self._condition: | |
452 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |
453 raise CancelledError() | |
454 elif self._state == FINISHED: | |
455 return self.__get_result() | |
456 | |
457 self._condition.wait(timeout) | |
458 | |
459 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |
460 raise CancelledError() | |
461 elif self._state == FINISHED: | |
462 return self.__get_result() | |
463 else: | |
464 raise TimeoutError() | |
465 | |
466 def exception_info(self, timeout=None): | |
467 """Return a tuple of (exception, traceback) raised by the call that the | |
468 future represents. | |
469 | |
470 Args: | |
471 timeout: The number of seconds to wait for the exception if the | |
472 future isn't done. If None, then there is no limit on the wait | |
473 time. | |
474 | |
475 Returns: | |
476 The exception raised by the call that the future represents or None | |
477 if the call completed without raising. | |
478 | |
479 Raises: | |
480 CancelledError: If the future was cancelled. | |
481 TimeoutError: If the future didn't finish executing before the given | |
482 timeout. | |
483 """ | |
484 with self._condition: | |
485 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |
486 raise CancelledError() | |
487 elif self._state == FINISHED: | |
488 return self._exception, self._traceback | |
489 | |
490 self._condition.wait(timeout) | |
491 | |
492 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |
493 raise CancelledError() | |
494 elif self._state == FINISHED: | |
495 return self._exception, self._traceback | |
496 else: | |
497 raise TimeoutError() | |
498 | |
499 def exception(self, timeout=None): | |
500 """Return the exception raised by the call that the future represents. | |
501 | |
502 Args: | |
503 timeout: The number of seconds to wait for the exception if the | |
504 future isn't done. If None, then there is no limit on the wait | |
505 time. | |
506 | |
507 Returns: | |
508 The exception raised by the call that the future represents or None | |
509 if the call completed without raising. | |
510 | |
511 Raises: | |
512 CancelledError: If the future was cancelled. | |
513 TimeoutError: If the future didn't finish executing before the given | |
514 timeout. | |
515 """ | |
516 return self.exception_info(timeout)[0] | |
517 | |
518 # The following methods should only be used by Executors and in tests. | |
519 def set_running_or_notify_cancel(self): | |
520 """Mark the future as running or process any cancel notifications. | |
521 | |
522 Should only be used by Executor implementations and unit tests. | |
523 | |
524 If the future has been cancelled (cancel() was called and returned | |
525 True) then any threads waiting on the future completing (though calls | |
526 to as_completed() or wait()) are notified and False is returned. | |
527 | |
528 If the future was not cancelled then it is put in the running state | |
529 (future calls to running() will return True) and True is returned. | |
530 | |
531 This method should be called by Executor implementations before | |
532 executing the work associated with this future. If this method returns | |
533 False then the work should not be executed. | |
534 | |
535 Returns: | |
536 False if the Future was cancelled, True otherwise. | |
537 | |
538 Raises: | |
539 RuntimeError: if this method was already called or if set_result() | |
540 or set_exception() was called. | |
541 """ | |
542 with self._condition: | |
543 if self._state == CANCELLED: | |
544 self._state = CANCELLED_AND_NOTIFIED | |
545 for waiter in self._waiters: | |
546 waiter.add_cancelled(self) | |
547 # self._condition.notify_all() is not necessary because | |
548 # self.cancel() triggers a notification. | |
549 return False | |
550 elif self._state == PENDING: | |
551 self._state = RUNNING | |
552 return True | |
553 else: | |
554 LOGGER.critical('Future %s in unexpected state: %s', | |
555 id(self), | |
556 self._state) | |
557 raise RuntimeError('Future in unexpected state') | |
558 | |
559 def set_result(self, result): | |
560 """Sets the return value of work associated with the future. | |
561 | |
562 Should only be used by Executor implementations and unit tests. | |
563 """ | |
564 with self._condition: | |
565 self._result = result | |
566 self._state = FINISHED | |
567 for waiter in self._waiters: | |
568 waiter.add_result(self) | |
569 self._condition.notify_all() | |
570 self._invoke_callbacks() | |
571 | |
572 def set_exception_info(self, exception, traceback): | |
573 """Sets the result of the future as being the given exception | |
574 and traceback. | |
575 | |
576 Should only be used by Executor implementations and unit tests. | |
577 """ | |
578 with self._condition: | |
579 self._exception = exception | |
580 self._traceback = traceback | |
581 self._state = FINISHED | |
582 for waiter in self._waiters: | |
583 waiter.add_exception(self) | |
584 self._condition.notify_all() | |
585 self._invoke_callbacks() | |
586 | |
587 def set_exception(self, exception): | |
588 """Sets the result of the future as being the given exception. | |
589 | |
590 Should only be used by Executor implementations and unit tests. | |
591 """ | |
592 self.set_exception_info(exception, None) | |
593 | |
594 class Executor(object): | |
595 """This is an abstract base class for concrete asynchronous executors.""" | |
596 | |
597 def submit(self, fn, *args, **kwargs): | |
598 """Submits a callable to be executed with the given arguments. | |
599 | |
600 Schedules the callable to be executed as fn(*args, **kwargs) and returns | |
601 a Future instance representing the execution of the callable. | |
602 | |
603 Returns: | |
604 A Future representing the given call. | |
605 """ | |
606 raise NotImplementedError() | |
607 | |
608 def map(self, fn, *iterables, **kwargs): | |
609 """Returns an iterator equivalent to map(fn, iter). | |
610 | |
611 Args: | |
612 fn: A callable that will take as many arguments as there are | |
613 passed iterables. | |
614 timeout: The maximum number of seconds to wait. If None, then there | |
615 is no limit on the wait time. | |
616 | |
617 Returns: | |
618 An iterator equivalent to: map(func, *iterables) but the calls may | |
619 be evaluated out-of-order. | |
620 | |
621 Raises: | |
622 TimeoutError: If the entire result iterator could not be generated | |
623 before the given timeout. | |
624 Exception: If fn(*args) raises for any values. | |
625 """ | |
626 timeout = kwargs.get('timeout') | |
627 if timeout is not None: | |
628 end_time = timeout + time.time() | |
629 | |
630 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] | |
631 | |
632 # Yield must be hidden in closure so that the futures are submitted | |
633 # before the first iterator value is required. | |
634 def result_iterator(): | |
635 try: | |
636 # reverse to keep finishing order | |
637 fs.reverse() | |
638 while fs: | |
639 # Careful not to keep a reference to the popped future | |
640 if timeout is None: | |
641 yield fs.pop().result() | |
642 else: | |
643 yield fs.pop().result(end_time - time.time()) | |
644 finally: | |
645 for future in fs: | |
646 future.cancel() | |
647 return result_iterator() | |
648 | |
649 def shutdown(self, wait=True): | |
650 """Clean-up the resources associated with the Executor. | |
651 | |
652 It is safe to call this method several times. Otherwise, no other | |
653 methods can be called after this one. | |
654 | |
655 Args: | |
656 wait: If True then shutdown will not return until all running | |
657 futures have finished executing and the resources used by the | |
658 executor have been reclaimed. | |
659 """ | |
660 pass | |
661 | |
662 def __enter__(self): | |
663 return self | |
664 | |
665 def __exit__(self, exc_type, exc_val, exc_tb): | |
666 self.shutdown(wait=True) | |
667 return False |