|
1 # Copyright 2009 Brian Quinlan. All Rights Reserved. |
|
2 # Licensed to PSF under a Contributor Agreement. |
|
3 |
|
4 import collections |
|
5 import logging |
|
6 import threading |
|
7 import itertools |
|
8 import time |
|
9 import types |
|
10 |
|
11 __author__ = 'Brian Quinlan (brian@sweetapp.com)' |
|
12 |
|
13 FIRST_COMPLETED = 'FIRST_COMPLETED' |
|
14 FIRST_EXCEPTION = 'FIRST_EXCEPTION' |
|
15 ALL_COMPLETED = 'ALL_COMPLETED' |
|
16 _AS_COMPLETED = '_AS_COMPLETED' |
|
17 |
|
18 # Possible future states (for internal use by the futures package). |
|
19 PENDING = 'PENDING' |
|
20 RUNNING = 'RUNNING' |
|
21 # The future was cancelled by the user... |
|
22 CANCELLED = 'CANCELLED' |
|
23 # ...and _Waiter.add_cancelled() was called by a worker. |
|
24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' |
|
25 FINISHED = 'FINISHED' |
|
26 |
|
27 _FUTURE_STATES = [ |
|
28 PENDING, |
|
29 RUNNING, |
|
30 CANCELLED, |
|
31 CANCELLED_AND_NOTIFIED, |
|
32 FINISHED |
|
33 ] |
|
34 |
|
35 _STATE_TO_DESCRIPTION_MAP = { |
|
36 PENDING: "pending", |
|
37 RUNNING: "running", |
|
38 CANCELLED: "cancelled", |
|
39 CANCELLED_AND_NOTIFIED: "cancelled", |
|
40 FINISHED: "finished" |
|
41 } |
|
42 |
|
43 # Logger for internal use by the futures package. |
|
44 LOGGER = logging.getLogger("concurrent.futures") |
|
45 |
|
46 class Error(Exception): |
|
47 """Base class for all future-related exceptions.""" |
|
48 pass |
|
49 |
|
50 class CancelledError(Error): |
|
51 """The Future was cancelled.""" |
|
52 pass |
|
53 |
|
54 class TimeoutError(Error): |
|
55 """The operation exceeded the given deadline.""" |
|
56 pass |
|
57 |
|
58 class _Waiter(object): |
|
59 """Provides the event that wait() and as_completed() block on.""" |
|
60 def __init__(self): |
|
61 self.event = threading.Event() |
|
62 self.finished_futures = [] |
|
63 |
|
64 def add_result(self, future): |
|
65 self.finished_futures.append(future) |
|
66 |
|
67 def add_exception(self, future): |
|
68 self.finished_futures.append(future) |
|
69 |
|
70 def add_cancelled(self, future): |
|
71 self.finished_futures.append(future) |
|
72 |
|
73 class _AsCompletedWaiter(_Waiter): |
|
74 """Used by as_completed().""" |
|
75 |
|
76 def __init__(self): |
|
77 super(_AsCompletedWaiter, self).__init__() |
|
78 self.lock = threading.Lock() |
|
79 |
|
80 def add_result(self, future): |
|
81 with self.lock: |
|
82 super(_AsCompletedWaiter, self).add_result(future) |
|
83 self.event.set() |
|
84 |
|
85 def add_exception(self, future): |
|
86 with self.lock: |
|
87 super(_AsCompletedWaiter, self).add_exception(future) |
|
88 self.event.set() |
|
89 |
|
90 def add_cancelled(self, future): |
|
91 with self.lock: |
|
92 super(_AsCompletedWaiter, self).add_cancelled(future) |
|
93 self.event.set() |
|
94 |
|
95 class _FirstCompletedWaiter(_Waiter): |
|
96 """Used by wait(return_when=FIRST_COMPLETED).""" |
|
97 |
|
98 def add_result(self, future): |
|
99 super(_FirstCompletedWaiter, self).add_result(future) |
|
100 self.event.set() |
|
101 |
|
102 def add_exception(self, future): |
|
103 super(_FirstCompletedWaiter, self).add_exception(future) |
|
104 self.event.set() |
|
105 |
|
106 def add_cancelled(self, future): |
|
107 super(_FirstCompletedWaiter, self).add_cancelled(future) |
|
108 self.event.set() |
|
109 |
|
110 class _AllCompletedWaiter(_Waiter): |
|
111 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" |
|
112 |
|
113 def __init__(self, num_pending_calls, stop_on_exception): |
|
114 self.num_pending_calls = num_pending_calls |
|
115 self.stop_on_exception = stop_on_exception |
|
116 self.lock = threading.Lock() |
|
117 super(_AllCompletedWaiter, self).__init__() |
|
118 |
|
119 def _decrement_pending_calls(self): |
|
120 with self.lock: |
|
121 self.num_pending_calls -= 1 |
|
122 if not self.num_pending_calls: |
|
123 self.event.set() |
|
124 |
|
125 def add_result(self, future): |
|
126 super(_AllCompletedWaiter, self).add_result(future) |
|
127 self._decrement_pending_calls() |
|
128 |
|
129 def add_exception(self, future): |
|
130 super(_AllCompletedWaiter, self).add_exception(future) |
|
131 if self.stop_on_exception: |
|
132 self.event.set() |
|
133 else: |
|
134 self._decrement_pending_calls() |
|
135 |
|
136 def add_cancelled(self, future): |
|
137 super(_AllCompletedWaiter, self).add_cancelled(future) |
|
138 self._decrement_pending_calls() |
|
139 |
|
140 class _AcquireFutures(object): |
|
141 """A context manager that does an ordered acquire of Future conditions.""" |
|
142 |
|
143 def __init__(self, futures): |
|
144 self.futures = sorted(futures, key=id) |
|
145 |
|
146 def __enter__(self): |
|
147 for future in self.futures: |
|
148 future._condition.acquire() |
|
149 |
|
150 def __exit__(self, *args): |
|
151 for future in self.futures: |
|
152 future._condition.release() |
|
153 |
|
154 def _create_and_install_waiters(fs, return_when): |
|
155 if return_when == _AS_COMPLETED: |
|
156 waiter = _AsCompletedWaiter() |
|
157 elif return_when == FIRST_COMPLETED: |
|
158 waiter = _FirstCompletedWaiter() |
|
159 else: |
|
160 pending_count = sum( |
|
161 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) |
|
162 |
|
163 if return_when == FIRST_EXCEPTION: |
|
164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) |
|
165 elif return_when == ALL_COMPLETED: |
|
166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) |
|
167 else: |
|
168 raise ValueError("Invalid return condition: %r" % return_when) |
|
169 |
|
170 for f in fs: |
|
171 f._waiters.append(waiter) |
|
172 |
|
173 return waiter |
|
174 |
|
175 |
|
176 def _yield_finished_futures(fs, waiter, ref_collect): |
|
177 """ |
|
178 Iterate on the list *fs*, yielding finished futures one by one in |
|
179 reverse order. |
|
180 Before yielding a future, *waiter* is removed from its waiters |
|
181 and the future is removed from each set in the collection of sets |
|
182 *ref_collect*. |
|
183 |
|
184 The aim of this function is to avoid keeping stale references after |
|
185 the future is yielded and before the iterator resumes. |
|
186 """ |
|
187 while fs: |
|
188 f = fs[-1] |
|
189 for futures_set in ref_collect: |
|
190 futures_set.remove(f) |
|
191 with f._condition: |
|
192 f._waiters.remove(waiter) |
|
193 del f |
|
194 # Careful not to keep a reference to the popped value |
|
195 yield fs.pop() |
|
196 |
|
197 |
|
198 def as_completed(fs, timeout=None): |
|
199 """An iterator over the given futures that yields each as it completes. |
|
200 |
|
201 Args: |
|
202 fs: The sequence of Futures (possibly created by different Executors) to |
|
203 iterate over. |
|
204 timeout: The maximum number of seconds to wait. If None, then there |
|
205 is no limit on the wait time. |
|
206 |
|
207 Returns: |
|
208 An iterator that yields the given Futures as they complete (finished or |
|
209 cancelled). If any given Futures are duplicated, they will be returned |
|
210 once. |
|
211 |
|
212 Raises: |
|
213 TimeoutError: If the entire result iterator could not be generated |
|
214 before the given timeout. |
|
215 """ |
|
216 if timeout is not None: |
|
217 end_time = timeout + time.time() |
|
218 |
|
219 fs = set(fs) |
|
220 total_futures = len(fs) |
|
221 with _AcquireFutures(fs): |
|
222 finished = set( |
|
223 f for f in fs |
|
224 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) |
|
225 pending = fs - finished |
|
226 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) |
|
227 finished = list(finished) |
|
228 try: |
|
229 for f in _yield_finished_futures(finished, waiter, |
|
230 ref_collect=(fs,)): |
|
231 f = [f] |
|
232 yield f.pop() |
|
233 |
|
234 while pending: |
|
235 if timeout is None: |
|
236 wait_timeout = None |
|
237 else: |
|
238 wait_timeout = end_time - time.time() |
|
239 if wait_timeout < 0: |
|
240 raise TimeoutError( |
|
241 '%d (of %d) futures unfinished' % ( |
|
242 len(pending), total_futures)) |
|
243 |
|
244 waiter.event.wait(wait_timeout) |
|
245 |
|
246 with waiter.lock: |
|
247 finished = waiter.finished_futures |
|
248 waiter.finished_futures = [] |
|
249 waiter.event.clear() |
|
250 |
|
251 # reverse to keep finishing order |
|
252 finished.reverse() |
|
253 for f in _yield_finished_futures(finished, waiter, |
|
254 ref_collect=(fs, pending)): |
|
255 f = [f] |
|
256 yield f.pop() |
|
257 |
|
258 finally: |
|
259 # Remove waiter from unfinished futures |
|
260 for f in fs: |
|
261 with f._condition: |
|
262 f._waiters.remove(waiter) |
|
263 |
|
264 DoneAndNotDoneFutures = collections.namedtuple( |
|
265 'DoneAndNotDoneFutures', 'done not_done') |
|
266 def wait(fs, timeout=None, return_when=ALL_COMPLETED): |
|
267 """Wait for the futures in the given sequence to complete. |
|
268 |
|
269 Args: |
|
270 fs: The sequence of Futures (possibly created by different Executors) to |
|
271 wait upon. |
|
272 timeout: The maximum number of seconds to wait. If None, then there |
|
273 is no limit on the wait time. |
|
274 return_when: Indicates when this function should return. The options |
|
275 are: |
|
276 |
|
277 FIRST_COMPLETED - Return when any future finishes or is |
|
278 cancelled. |
|
279 FIRST_EXCEPTION - Return when any future finishes by raising an |
|
280 exception. If no future raises an exception |
|
281 then it is equivalent to ALL_COMPLETED. |
|
282 ALL_COMPLETED - Return when all futures finish or are cancelled. |
|
283 |
|
284 Returns: |
|
285 A named 2-tuple of sets. The first set, named 'done', contains the |
|
286 futures that completed (is finished or cancelled) before the wait |
|
287 completed. The second set, named 'not_done', contains uncompleted |
|
288 futures. |
|
289 """ |
|
290 with _AcquireFutures(fs): |
|
291 done = set(f for f in fs |
|
292 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) |
|
293 not_done = set(fs) - done |
|
294 |
|
295 if (return_when == FIRST_COMPLETED) and done: |
|
296 return DoneAndNotDoneFutures(done, not_done) |
|
297 elif (return_when == FIRST_EXCEPTION) and done: |
|
298 if any(f for f in done |
|
299 if not f.cancelled() and f.exception() is not None): |
|
300 return DoneAndNotDoneFutures(done, not_done) |
|
301 |
|
302 if len(done) == len(fs): |
|
303 return DoneAndNotDoneFutures(done, not_done) |
|
304 |
|
305 waiter = _create_and_install_waiters(fs, return_when) |
|
306 |
|
307 waiter.event.wait(timeout) |
|
308 for f in fs: |
|
309 with f._condition: |
|
310 f._waiters.remove(waiter) |
|
311 |
|
312 done.update(waiter.finished_futures) |
|
313 return DoneAndNotDoneFutures(done, set(fs) - done) |
|
314 |
|
315 class Future(object): |
|
316 """Represents the result of an asynchronous computation.""" |
|
317 |
|
318 def __init__(self): |
|
319 """Initializes the future. Should not be called by clients.""" |
|
320 self._condition = threading.Condition() |
|
321 self._state = PENDING |
|
322 self._result = None |
|
323 self._exception = None |
|
324 self._traceback = None |
|
325 self._waiters = [] |
|
326 self._done_callbacks = [] |
|
327 |
|
328 def _invoke_callbacks(self): |
|
329 for callback in self._done_callbacks: |
|
330 try: |
|
331 callback(self) |
|
332 except Exception: |
|
333 LOGGER.exception('exception calling callback for %r', self) |
|
334 except BaseException: |
|
335 # Explicitly let all other new-style exceptions through so |
|
336 # that we can catch all old-style exceptions with a simple |
|
337 # "except:" clause below. |
|
338 # |
|
339 # All old-style exception objects are instances of |
|
340 # types.InstanceType, but "except types.InstanceType:" does |
|
341 # not catch old-style exceptions for some reason. Thus, the |
|
342 # only way to catch all old-style exceptions without catching |
|
343 # any new-style exceptions is to filter out the new-style |
|
344 # exceptions, which all derive from BaseException. |
|
345 raise |
|
346 except: |
|
347 # Because of the BaseException clause above, this handler only |
|
348 # executes for old-style exception objects. |
|
349 LOGGER.exception('exception calling callback for %r', self) |
|
350 |
|
351 def __repr__(self): |
|
352 with self._condition: |
|
353 if self._state == FINISHED: |
|
354 if self._exception: |
|
355 return '<%s at %#x state=%s raised %s>' % ( |
|
356 self.__class__.__name__, |
|
357 id(self), |
|
358 _STATE_TO_DESCRIPTION_MAP[self._state], |
|
359 self._exception.__class__.__name__) |
|
360 else: |
|
361 return '<%s at %#x state=%s returned %s>' % ( |
|
362 self.__class__.__name__, |
|
363 id(self), |
|
364 _STATE_TO_DESCRIPTION_MAP[self._state], |
|
365 self._result.__class__.__name__) |
|
366 return '<%s at %#x state=%s>' % ( |
|
367 self.__class__.__name__, |
|
368 id(self), |
|
369 _STATE_TO_DESCRIPTION_MAP[self._state]) |
|
370 |
|
371 def cancel(self): |
|
372 """Cancel the future if possible. |
|
373 |
|
374 Returns True if the future was cancelled, False otherwise. A future |
|
375 cannot be cancelled if it is running or has already completed. |
|
376 """ |
|
377 with self._condition: |
|
378 if self._state in [RUNNING, FINISHED]: |
|
379 return False |
|
380 |
|
381 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
|
382 return True |
|
383 |
|
384 self._state = CANCELLED |
|
385 self._condition.notify_all() |
|
386 |
|
387 self._invoke_callbacks() |
|
388 return True |
|
389 |
|
390 def cancelled(self): |
|
391 """Return True if the future was cancelled.""" |
|
392 with self._condition: |
|
393 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] |
|
394 |
|
395 def running(self): |
|
396 """Return True if the future is currently executing.""" |
|
397 with self._condition: |
|
398 return self._state == RUNNING |
|
399 |
|
400 def done(self): |
|
401 """Return True of the future was cancelled or finished executing.""" |
|
402 with self._condition: |
|
403 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] |
|
404 |
|
405 def __get_result(self): |
|
406 if self._exception: |
|
407 if isinstance(self._exception, types.InstanceType): |
|
408 # The exception is an instance of an old-style class, which |
|
409 # means type(self._exception) returns types.ClassType instead |
|
410 # of the exception's actual class type. |
|
411 exception_type = self._exception.__class__ |
|
412 else: |
|
413 exception_type = type(self._exception) |
|
414 raise exception_type, self._exception, self._traceback |
|
415 else: |
|
416 return self._result |
|
417 |
|
418 def add_done_callback(self, fn): |
|
419 """Attaches a callable that will be called when the future finishes. |
|
420 |
|
421 Args: |
|
422 fn: A callable that will be called with this future as its only |
|
423 argument when the future completes or is cancelled. The callable |
|
424 will always be called by a thread in the same process in which |
|
425 it was added. If the future has already completed or been |
|
426 cancelled then the callable will be called immediately. These |
|
427 callables are called in the order that they were added. |
|
428 """ |
|
429 with self._condition: |
|
430 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: |
|
431 self._done_callbacks.append(fn) |
|
432 return |
|
433 fn(self) |
|
434 |
|
435 def result(self, timeout=None): |
|
436 """Return the result of the call that the future represents. |
|
437 |
|
438 Args: |
|
439 timeout: The number of seconds to wait for the result if the future |
|
440 isn't done. If None, then there is no limit on the wait time. |
|
441 |
|
442 Returns: |
|
443 The result of the call that the future represents. |
|
444 |
|
445 Raises: |
|
446 CancelledError: If the future was cancelled. |
|
447 TimeoutError: If the future didn't finish executing before the given |
|
448 timeout. |
|
449 Exception: If the call raised then that exception will be raised. |
|
450 """ |
|
451 with self._condition: |
|
452 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
|
453 raise CancelledError() |
|
454 elif self._state == FINISHED: |
|
455 return self.__get_result() |
|
456 |
|
457 self._condition.wait(timeout) |
|
458 |
|
459 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
|
460 raise CancelledError() |
|
461 elif self._state == FINISHED: |
|
462 return self.__get_result() |
|
463 else: |
|
464 raise TimeoutError() |
|
465 |
|
466 def exception_info(self, timeout=None): |
|
467 """Return a tuple of (exception, traceback) raised by the call that the |
|
468 future represents. |
|
469 |
|
470 Args: |
|
471 timeout: The number of seconds to wait for the exception if the |
|
472 future isn't done. If None, then there is no limit on the wait |
|
473 time. |
|
474 |
|
475 Returns: |
|
476 The exception raised by the call that the future represents or None |
|
477 if the call completed without raising. |
|
478 |
|
479 Raises: |
|
480 CancelledError: If the future was cancelled. |
|
481 TimeoutError: If the future didn't finish executing before the given |
|
482 timeout. |
|
483 """ |
|
484 with self._condition: |
|
485 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
|
486 raise CancelledError() |
|
487 elif self._state == FINISHED: |
|
488 return self._exception, self._traceback |
|
489 |
|
490 self._condition.wait(timeout) |
|
491 |
|
492 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
|
493 raise CancelledError() |
|
494 elif self._state == FINISHED: |
|
495 return self._exception, self._traceback |
|
496 else: |
|
497 raise TimeoutError() |
|
498 |
|
499 def exception(self, timeout=None): |
|
500 """Return the exception raised by the call that the future represents. |
|
501 |
|
502 Args: |
|
503 timeout: The number of seconds to wait for the exception if the |
|
504 future isn't done. If None, then there is no limit on the wait |
|
505 time. |
|
506 |
|
507 Returns: |
|
508 The exception raised by the call that the future represents or None |
|
509 if the call completed without raising. |
|
510 |
|
511 Raises: |
|
512 CancelledError: If the future was cancelled. |
|
513 TimeoutError: If the future didn't finish executing before the given |
|
514 timeout. |
|
515 """ |
|
516 return self.exception_info(timeout)[0] |
|
517 |
|
518 # The following methods should only be used by Executors and in tests. |
|
519 def set_running_or_notify_cancel(self): |
|
520 """Mark the future as running or process any cancel notifications. |
|
521 |
|
522 Should only be used by Executor implementations and unit tests. |
|
523 |
|
524 If the future has been cancelled (cancel() was called and returned |
|
525 True) then any threads waiting on the future completing (though calls |
|
526 to as_completed() or wait()) are notified and False is returned. |
|
527 |
|
528 If the future was not cancelled then it is put in the running state |
|
529 (future calls to running() will return True) and True is returned. |
|
530 |
|
531 This method should be called by Executor implementations before |
|
532 executing the work associated with this future. If this method returns |
|
533 False then the work should not be executed. |
|
534 |
|
535 Returns: |
|
536 False if the Future was cancelled, True otherwise. |
|
537 |
|
538 Raises: |
|
539 RuntimeError: if this method was already called or if set_result() |
|
540 or set_exception() was called. |
|
541 """ |
|
542 with self._condition: |
|
543 if self._state == CANCELLED: |
|
544 self._state = CANCELLED_AND_NOTIFIED |
|
545 for waiter in self._waiters: |
|
546 waiter.add_cancelled(self) |
|
547 # self._condition.notify_all() is not necessary because |
|
548 # self.cancel() triggers a notification. |
|
549 return False |
|
550 elif self._state == PENDING: |
|
551 self._state = RUNNING |
|
552 return True |
|
553 else: |
|
554 LOGGER.critical('Future %s in unexpected state: %s', |
|
555 id(self), |
|
556 self._state) |
|
557 raise RuntimeError('Future in unexpected state') |
|
558 |
|
559 def set_result(self, result): |
|
560 """Sets the return value of work associated with the future. |
|
561 |
|
562 Should only be used by Executor implementations and unit tests. |
|
563 """ |
|
564 with self._condition: |
|
565 self._result = result |
|
566 self._state = FINISHED |
|
567 for waiter in self._waiters: |
|
568 waiter.add_result(self) |
|
569 self._condition.notify_all() |
|
570 self._invoke_callbacks() |
|
571 |
|
572 def set_exception_info(self, exception, traceback): |
|
573 """Sets the result of the future as being the given exception |
|
574 and traceback. |
|
575 |
|
576 Should only be used by Executor implementations and unit tests. |
|
577 """ |
|
578 with self._condition: |
|
579 self._exception = exception |
|
580 self._traceback = traceback |
|
581 self._state = FINISHED |
|
582 for waiter in self._waiters: |
|
583 waiter.add_exception(self) |
|
584 self._condition.notify_all() |
|
585 self._invoke_callbacks() |
|
586 |
|
587 def set_exception(self, exception): |
|
588 """Sets the result of the future as being the given exception. |
|
589 |
|
590 Should only be used by Executor implementations and unit tests. |
|
591 """ |
|
592 self.set_exception_info(exception, None) |
|
593 |
|
594 class Executor(object): |
|
595 """This is an abstract base class for concrete asynchronous executors.""" |
|
596 |
|
597 def submit(self, fn, *args, **kwargs): |
|
598 """Submits a callable to be executed with the given arguments. |
|
599 |
|
600 Schedules the callable to be executed as fn(*args, **kwargs) and returns |
|
601 a Future instance representing the execution of the callable. |
|
602 |
|
603 Returns: |
|
604 A Future representing the given call. |
|
605 """ |
|
606 raise NotImplementedError() |
|
607 |
|
608 def map(self, fn, *iterables, **kwargs): |
|
609 """Returns an iterator equivalent to map(fn, iter). |
|
610 |
|
611 Args: |
|
612 fn: A callable that will take as many arguments as there are |
|
613 passed iterables. |
|
614 timeout: The maximum number of seconds to wait. If None, then there |
|
615 is no limit on the wait time. |
|
616 |
|
617 Returns: |
|
618 An iterator equivalent to: map(func, *iterables) but the calls may |
|
619 be evaluated out-of-order. |
|
620 |
|
621 Raises: |
|
622 TimeoutError: If the entire result iterator could not be generated |
|
623 before the given timeout. |
|
624 Exception: If fn(*args) raises for any values. |
|
625 """ |
|
626 timeout = kwargs.get('timeout') |
|
627 if timeout is not None: |
|
628 end_time = timeout + time.time() |
|
629 |
|
630 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] |
|
631 |
|
632 # Yield must be hidden in closure so that the futures are submitted |
|
633 # before the first iterator value is required. |
|
634 def result_iterator(): |
|
635 try: |
|
636 # reverse to keep finishing order |
|
637 fs.reverse() |
|
638 while fs: |
|
639 # Careful not to keep a reference to the popped future |
|
640 if timeout is None: |
|
641 yield fs.pop().result() |
|
642 else: |
|
643 yield fs.pop().result(end_time - time.time()) |
|
644 finally: |
|
645 for future in fs: |
|
646 future.cancel() |
|
647 return result_iterator() |
|
648 |
|
649 def shutdown(self, wait=True): |
|
650 """Clean-up the resources associated with the Executor. |
|
651 |
|
652 It is safe to call this method several times. Otherwise, no other |
|
653 methods can be called after this one. |
|
654 |
|
655 Args: |
|
656 wait: If True then shutdown will not return until all running |
|
657 futures have finished executing and the resources used by the |
|
658 executor have been reclaimed. |
|
659 """ |
|
660 pass |
|
661 |
|
662 def __enter__(self): |
|
663 return self |
|
664 |
|
665 def __exit__(self, exc_type, exc_val, exc_tb): |
|
666 self.shutdown(wait=True) |
|
667 return False |