mercurial/thirdparty/concurrent/futures/_base.py
changeset 37623 eb687c28a915
child 37626 0a9c0d3480b2
equal deleted inserted replaced
37622:bfdd20d22a86 37623:eb687c28a915
       
     1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
       
     2 # Licensed to PSF under a Contributor Agreement.
       
     3 
       
     4 import collections
       
     5 import logging
       
     6 import threading
       
     7 import itertools
       
     8 import time
       
     9 import types
       
    10 
       
    11 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
       
    12 
       
    13 FIRST_COMPLETED = 'FIRST_COMPLETED'
       
    14 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
       
    15 ALL_COMPLETED = 'ALL_COMPLETED'
       
    16 _AS_COMPLETED = '_AS_COMPLETED'
       
    17 
       
    18 # Possible future states (for internal use by the futures package).
       
    19 PENDING = 'PENDING'
       
    20 RUNNING = 'RUNNING'
       
    21 # The future was cancelled by the user...
       
    22 CANCELLED = 'CANCELLED'
       
    23 # ...and _Waiter.add_cancelled() was called by a worker.
       
    24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
       
    25 FINISHED = 'FINISHED'
       
    26 
       
    27 _FUTURE_STATES = [
       
    28     PENDING,
       
    29     RUNNING,
       
    30     CANCELLED,
       
    31     CANCELLED_AND_NOTIFIED,
       
    32     FINISHED
       
    33 ]
       
    34 
       
    35 _STATE_TO_DESCRIPTION_MAP = {
       
    36     PENDING: "pending",
       
    37     RUNNING: "running",
       
    38     CANCELLED: "cancelled",
       
    39     CANCELLED_AND_NOTIFIED: "cancelled",
       
    40     FINISHED: "finished"
       
    41 }
       
    42 
       
    43 # Logger for internal use by the futures package.
       
    44 LOGGER = logging.getLogger("concurrent.futures")
       
    45 
       
    46 class Error(Exception):
       
    47     """Base class for all future-related exceptions."""
       
    48     pass
       
    49 
       
    50 class CancelledError(Error):
       
    51     """The Future was cancelled."""
       
    52     pass
       
    53 
       
    54 class TimeoutError(Error):
       
    55     """The operation exceeded the given deadline."""
       
    56     pass
       
    57 
       
    58 class _Waiter(object):
       
    59     """Provides the event that wait() and as_completed() block on."""
       
    60     def __init__(self):
       
    61         self.event = threading.Event()
       
    62         self.finished_futures = []
       
    63 
       
    64     def add_result(self, future):
       
    65         self.finished_futures.append(future)
       
    66 
       
    67     def add_exception(self, future):
       
    68         self.finished_futures.append(future)
       
    69 
       
    70     def add_cancelled(self, future):
       
    71         self.finished_futures.append(future)
       
    72 
       
    73 class _AsCompletedWaiter(_Waiter):
       
    74     """Used by as_completed()."""
       
    75 
       
    76     def __init__(self):
       
    77         super(_AsCompletedWaiter, self).__init__()
       
    78         self.lock = threading.Lock()
       
    79 
       
    80     def add_result(self, future):
       
    81         with self.lock:
       
    82             super(_AsCompletedWaiter, self).add_result(future)
       
    83             self.event.set()
       
    84 
       
    85     def add_exception(self, future):
       
    86         with self.lock:
       
    87             super(_AsCompletedWaiter, self).add_exception(future)
       
    88             self.event.set()
       
    89 
       
    90     def add_cancelled(self, future):
       
    91         with self.lock:
       
    92             super(_AsCompletedWaiter, self).add_cancelled(future)
       
    93             self.event.set()
       
    94 
       
    95 class _FirstCompletedWaiter(_Waiter):
       
    96     """Used by wait(return_when=FIRST_COMPLETED)."""
       
    97 
       
    98     def add_result(self, future):
       
    99         super(_FirstCompletedWaiter, self).add_result(future)
       
   100         self.event.set()
       
   101 
       
   102     def add_exception(self, future):
       
   103         super(_FirstCompletedWaiter, self).add_exception(future)
       
   104         self.event.set()
       
   105 
       
   106     def add_cancelled(self, future):
       
   107         super(_FirstCompletedWaiter, self).add_cancelled(future)
       
   108         self.event.set()
       
   109 
       
   110 class _AllCompletedWaiter(_Waiter):
       
   111     """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
       
   112 
       
   113     def __init__(self, num_pending_calls, stop_on_exception):
       
   114         self.num_pending_calls = num_pending_calls
       
   115         self.stop_on_exception = stop_on_exception
       
   116         self.lock = threading.Lock()
       
   117         super(_AllCompletedWaiter, self).__init__()
       
   118 
       
   119     def _decrement_pending_calls(self):
       
   120         with self.lock:
       
   121             self.num_pending_calls -= 1
       
   122             if not self.num_pending_calls:
       
   123                 self.event.set()
       
   124 
       
   125     def add_result(self, future):
       
   126         super(_AllCompletedWaiter, self).add_result(future)
       
   127         self._decrement_pending_calls()
       
   128 
       
   129     def add_exception(self, future):
       
   130         super(_AllCompletedWaiter, self).add_exception(future)
       
   131         if self.stop_on_exception:
       
   132             self.event.set()
       
   133         else:
       
   134             self._decrement_pending_calls()
       
   135 
       
   136     def add_cancelled(self, future):
       
   137         super(_AllCompletedWaiter, self).add_cancelled(future)
       
   138         self._decrement_pending_calls()
       
   139 
       
   140 class _AcquireFutures(object):
       
   141     """A context manager that does an ordered acquire of Future conditions."""
       
   142 
       
   143     def __init__(self, futures):
       
   144         self.futures = sorted(futures, key=id)
       
   145 
       
   146     def __enter__(self):
       
   147         for future in self.futures:
       
   148             future._condition.acquire()
       
   149 
       
   150     def __exit__(self, *args):
       
   151         for future in self.futures:
       
   152             future._condition.release()
       
   153 
       
   154 def _create_and_install_waiters(fs, return_when):
       
   155     if return_when == _AS_COMPLETED:
       
   156         waiter = _AsCompletedWaiter()
       
   157     elif return_when == FIRST_COMPLETED:
       
   158         waiter = _FirstCompletedWaiter()
       
   159     else:
       
   160         pending_count = sum(
       
   161                 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
       
   162 
       
   163         if return_when == FIRST_EXCEPTION:
       
   164             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
       
   165         elif return_when == ALL_COMPLETED:
       
   166             waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
       
   167         else:
       
   168             raise ValueError("Invalid return condition: %r" % return_when)
       
   169 
       
   170     for f in fs:
       
   171         f._waiters.append(waiter)
       
   172 
       
   173     return waiter
       
   174 
       
   175 
       
   176 def _yield_finished_futures(fs, waiter, ref_collect):
       
   177     """
       
   178     Iterate on the list *fs*, yielding finished futures one by one in
       
   179     reverse order.
       
   180     Before yielding a future, *waiter* is removed from its waiters
       
   181     and the future is removed from each set in the collection of sets
       
   182     *ref_collect*.
       
   183 
       
   184     The aim of this function is to avoid keeping stale references after
       
   185     the future is yielded and before the iterator resumes.
       
   186     """
       
   187     while fs:
       
   188         f = fs[-1]
       
   189         for futures_set in ref_collect:
       
   190             futures_set.remove(f)
       
   191         with f._condition:
       
   192             f._waiters.remove(waiter)
       
   193         del f
       
   194         # Careful not to keep a reference to the popped value
       
   195         yield fs.pop()
       
   196 
       
   197 
       
   198 def as_completed(fs, timeout=None):
       
   199     """An iterator over the given futures that yields each as it completes.
       
   200 
       
   201     Args:
       
   202         fs: The sequence of Futures (possibly created by different Executors) to
       
   203             iterate over.
       
   204         timeout: The maximum number of seconds to wait. If None, then there
       
   205             is no limit on the wait time.
       
   206 
       
   207     Returns:
       
   208         An iterator that yields the given Futures as they complete (finished or
       
   209         cancelled). If any given Futures are duplicated, they will be returned
       
   210         once.
       
   211 
       
   212     Raises:
       
   213         TimeoutError: If the entire result iterator could not be generated
       
   214             before the given timeout.
       
   215     """
       
   216     if timeout is not None:
       
   217         end_time = timeout + time.time()
       
   218 
       
   219     fs = set(fs)
       
   220     total_futures = len(fs)
       
   221     with _AcquireFutures(fs):
       
   222         finished = set(
       
   223                 f for f in fs
       
   224                 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
       
   225         pending = fs - finished
       
   226         waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
       
   227     finished = list(finished)
       
   228     try:
       
   229         for f in _yield_finished_futures(finished, waiter,
       
   230                                          ref_collect=(fs,)):
       
   231             f = [f]
       
   232             yield f.pop()
       
   233 
       
   234         while pending:
       
   235             if timeout is None:
       
   236                 wait_timeout = None
       
   237             else:
       
   238                 wait_timeout = end_time - time.time()
       
   239                 if wait_timeout < 0:
       
   240                     raise TimeoutError(
       
   241                             '%d (of %d) futures unfinished' % (
       
   242                             len(pending), total_futures))
       
   243 
       
   244             waiter.event.wait(wait_timeout)
       
   245 
       
   246             with waiter.lock:
       
   247                 finished = waiter.finished_futures
       
   248                 waiter.finished_futures = []
       
   249                 waiter.event.clear()
       
   250 
       
   251             # reverse to keep finishing order
       
   252             finished.reverse()
       
   253             for f in _yield_finished_futures(finished, waiter,
       
   254                                              ref_collect=(fs, pending)):
       
   255                 f = [f]
       
   256                 yield f.pop()
       
   257 
       
   258     finally:
       
   259         # Remove waiter from unfinished futures
       
   260         for f in fs:
       
   261             with f._condition:
       
   262                 f._waiters.remove(waiter)
       
   263 
       
   264 DoneAndNotDoneFutures = collections.namedtuple(
       
   265         'DoneAndNotDoneFutures', 'done not_done')
       
   266 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
       
   267     """Wait for the futures in the given sequence to complete.
       
   268 
       
   269     Args:
       
   270         fs: The sequence of Futures (possibly created by different Executors) to
       
   271             wait upon.
       
   272         timeout: The maximum number of seconds to wait. If None, then there
       
   273             is no limit on the wait time.
       
   274         return_when: Indicates when this function should return. The options
       
   275             are:
       
   276 
       
   277             FIRST_COMPLETED - Return when any future finishes or is
       
   278                               cancelled.
       
   279             FIRST_EXCEPTION - Return when any future finishes by raising an
       
   280                               exception. If no future raises an exception
       
   281                               then it is equivalent to ALL_COMPLETED.
       
   282             ALL_COMPLETED -   Return when all futures finish or are cancelled.
       
   283 
       
   284     Returns:
       
   285         A named 2-tuple of sets. The first set, named 'done', contains the
       
   286         futures that completed (is finished or cancelled) before the wait
       
   287         completed. The second set, named 'not_done', contains uncompleted
       
   288         futures.
       
   289     """
       
   290     with _AcquireFutures(fs):
       
   291         done = set(f for f in fs
       
   292                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
       
   293         not_done = set(fs) - done
       
   294 
       
   295         if (return_when == FIRST_COMPLETED) and done:
       
   296             return DoneAndNotDoneFutures(done, not_done)
       
   297         elif (return_when == FIRST_EXCEPTION) and done:
       
   298             if any(f for f in done
       
   299                    if not f.cancelled() and f.exception() is not None):
       
   300                 return DoneAndNotDoneFutures(done, not_done)
       
   301 
       
   302         if len(done) == len(fs):
       
   303             return DoneAndNotDoneFutures(done, not_done)
       
   304 
       
   305         waiter = _create_and_install_waiters(fs, return_when)
       
   306 
       
   307     waiter.event.wait(timeout)
       
   308     for f in fs:
       
   309         with f._condition:
       
   310             f._waiters.remove(waiter)
       
   311 
       
   312     done.update(waiter.finished_futures)
       
   313     return DoneAndNotDoneFutures(done, set(fs) - done)
       
   314 
       
   315 class Future(object):
       
   316     """Represents the result of an asynchronous computation."""
       
   317 
       
   318     def __init__(self):
       
   319         """Initializes the future. Should not be called by clients."""
       
   320         self._condition = threading.Condition()
       
   321         self._state = PENDING
       
   322         self._result = None
       
   323         self._exception = None
       
   324         self._traceback = None
       
   325         self._waiters = []
       
   326         self._done_callbacks = []
       
   327 
       
   328     def _invoke_callbacks(self):
       
   329         for callback in self._done_callbacks:
       
   330             try:
       
   331                 callback(self)
       
   332             except Exception:
       
   333                 LOGGER.exception('exception calling callback for %r', self)
       
   334             except BaseException:
       
   335                 # Explicitly let all other new-style exceptions through so
       
   336                 # that we can catch all old-style exceptions with a simple
       
   337                 # "except:" clause below.
       
   338                 #
       
   339                 # All old-style exception objects are instances of
       
   340                 # types.InstanceType, but "except types.InstanceType:" does
       
   341                 # not catch old-style exceptions for some reason.  Thus, the
       
   342                 # only way to catch all old-style exceptions without catching
       
   343                 # any new-style exceptions is to filter out the new-style
       
   344                 # exceptions, which all derive from BaseException.
       
   345                 raise
       
   346             except:
       
   347                 # Because of the BaseException clause above, this handler only
       
   348                 # executes for old-style exception objects.
       
   349                 LOGGER.exception('exception calling callback for %r', self)
       
   350 
       
   351     def __repr__(self):
       
   352         with self._condition:
       
   353             if self._state == FINISHED:
       
   354                 if self._exception:
       
   355                     return '<%s at %#x state=%s raised %s>' % (
       
   356                         self.__class__.__name__,
       
   357                         id(self),
       
   358                         _STATE_TO_DESCRIPTION_MAP[self._state],
       
   359                         self._exception.__class__.__name__)
       
   360                 else:
       
   361                     return '<%s at %#x state=%s returned %s>' % (
       
   362                         self.__class__.__name__,
       
   363                         id(self),
       
   364                         _STATE_TO_DESCRIPTION_MAP[self._state],
       
   365                         self._result.__class__.__name__)
       
   366             return '<%s at %#x state=%s>' % (
       
   367                     self.__class__.__name__,
       
   368                     id(self),
       
   369                    _STATE_TO_DESCRIPTION_MAP[self._state])
       
   370 
       
   371     def cancel(self):
       
   372         """Cancel the future if possible.
       
   373 
       
   374         Returns True if the future was cancelled, False otherwise. A future
       
   375         cannot be cancelled if it is running or has already completed.
       
   376         """
       
   377         with self._condition:
       
   378             if self._state in [RUNNING, FINISHED]:
       
   379                 return False
       
   380 
       
   381             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
       
   382                 return True
       
   383 
       
   384             self._state = CANCELLED
       
   385             self._condition.notify_all()
       
   386 
       
   387         self._invoke_callbacks()
       
   388         return True
       
   389 
       
   390     def cancelled(self):
       
   391         """Return True if the future was cancelled."""
       
   392         with self._condition:
       
   393             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
       
   394 
       
   395     def running(self):
       
   396         """Return True if the future is currently executing."""
       
   397         with self._condition:
       
   398             return self._state == RUNNING
       
   399 
       
   400     def done(self):
       
   401         """Return True of the future was cancelled or finished executing."""
       
   402         with self._condition:
       
   403             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
       
   404 
       
   405     def __get_result(self):
       
   406         if self._exception:
       
   407             if isinstance(self._exception, types.InstanceType):
       
   408                 # The exception is an instance of an old-style class, which
       
   409                 # means type(self._exception) returns types.ClassType instead
       
   410                 # of the exception's actual class type.
       
   411                 exception_type = self._exception.__class__
       
   412             else:
       
   413                 exception_type = type(self._exception)
       
   414             raise exception_type, self._exception, self._traceback
       
   415         else:
       
   416             return self._result
       
   417 
       
   418     def add_done_callback(self, fn):
       
   419         """Attaches a callable that will be called when the future finishes.
       
   420 
       
   421         Args:
       
   422             fn: A callable that will be called with this future as its only
       
   423                 argument when the future completes or is cancelled. The callable
       
   424                 will always be called by a thread in the same process in which
       
   425                 it was added. If the future has already completed or been
       
   426                 cancelled then the callable will be called immediately. These
       
   427                 callables are called in the order that they were added.
       
   428         """
       
   429         with self._condition:
       
   430             if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
       
   431                 self._done_callbacks.append(fn)
       
   432                 return
       
   433         fn(self)
       
   434 
       
   435     def result(self, timeout=None):
       
   436         """Return the result of the call that the future represents.
       
   437 
       
   438         Args:
       
   439             timeout: The number of seconds to wait for the result if the future
       
   440                 isn't done. If None, then there is no limit on the wait time.
       
   441 
       
   442         Returns:
       
   443             The result of the call that the future represents.
       
   444 
       
   445         Raises:
       
   446             CancelledError: If the future was cancelled.
       
   447             TimeoutError: If the future didn't finish executing before the given
       
   448                 timeout.
       
   449             Exception: If the call raised then that exception will be raised.
       
   450         """
       
   451         with self._condition:
       
   452             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
       
   453                 raise CancelledError()
       
   454             elif self._state == FINISHED:
       
   455                 return self.__get_result()
       
   456 
       
   457             self._condition.wait(timeout)
       
   458 
       
   459             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
       
   460                 raise CancelledError()
       
   461             elif self._state == FINISHED:
       
   462                 return self.__get_result()
       
   463             else:
       
   464                 raise TimeoutError()
       
   465 
       
   466     def exception_info(self, timeout=None):
       
   467         """Return a tuple of (exception, traceback) raised by the call that the
       
   468         future represents.
       
   469 
       
   470         Args:
       
   471             timeout: The number of seconds to wait for the exception if the
       
   472                 future isn't done. If None, then there is no limit on the wait
       
   473                 time.
       
   474 
       
   475         Returns:
       
   476             The exception raised by the call that the future represents or None
       
   477             if the call completed without raising.
       
   478 
       
   479         Raises:
       
   480             CancelledError: If the future was cancelled.
       
   481             TimeoutError: If the future didn't finish executing before the given
       
   482                 timeout.
       
   483         """
       
   484         with self._condition:
       
   485             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
       
   486                 raise CancelledError()
       
   487             elif self._state == FINISHED:
       
   488                 return self._exception, self._traceback
       
   489 
       
   490             self._condition.wait(timeout)
       
   491 
       
   492             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
       
   493                 raise CancelledError()
       
   494             elif self._state == FINISHED:
       
   495                 return self._exception, self._traceback
       
   496             else:
       
   497                 raise TimeoutError()
       
   498 
       
   499     def exception(self, timeout=None):
       
   500         """Return the exception raised by the call that the future represents.
       
   501 
       
   502         Args:
       
   503             timeout: The number of seconds to wait for the exception if the
       
   504                 future isn't done. If None, then there is no limit on the wait
       
   505                 time.
       
   506 
       
   507         Returns:
       
   508             The exception raised by the call that the future represents or None
       
   509             if the call completed without raising.
       
   510 
       
   511         Raises:
       
   512             CancelledError: If the future was cancelled.
       
   513             TimeoutError: If the future didn't finish executing before the given
       
   514                 timeout.
       
   515         """
       
   516         return self.exception_info(timeout)[0]
       
   517 
       
   518     # The following methods should only be used by Executors and in tests.
       
   519     def set_running_or_notify_cancel(self):
       
   520         """Mark the future as running or process any cancel notifications.
       
   521 
       
   522         Should only be used by Executor implementations and unit tests.
       
   523 
       
   524         If the future has been cancelled (cancel() was called and returned
       
   525         True) then any threads waiting on the future completing (though calls
       
   526         to as_completed() or wait()) are notified and False is returned.
       
   527 
       
   528         If the future was not cancelled then it is put in the running state
       
   529         (future calls to running() will return True) and True is returned.
       
   530 
       
   531         This method should be called by Executor implementations before
       
   532         executing the work associated with this future. If this method returns
       
   533         False then the work should not be executed.
       
   534 
       
   535         Returns:
       
   536             False if the Future was cancelled, True otherwise.
       
   537 
       
   538         Raises:
       
   539             RuntimeError: if this method was already called or if set_result()
       
   540                 or set_exception() was called.
       
   541         """
       
   542         with self._condition:
       
   543             if self._state == CANCELLED:
       
   544                 self._state = CANCELLED_AND_NOTIFIED
       
   545                 for waiter in self._waiters:
       
   546                     waiter.add_cancelled(self)
       
   547                 # self._condition.notify_all() is not necessary because
       
   548                 # self.cancel() triggers a notification.
       
   549                 return False
       
   550             elif self._state == PENDING:
       
   551                 self._state = RUNNING
       
   552                 return True
       
   553             else:
       
   554                 LOGGER.critical('Future %s in unexpected state: %s',
       
   555                                 id(self),
       
   556                                 self._state)
       
   557                 raise RuntimeError('Future in unexpected state')
       
   558 
       
   559     def set_result(self, result):
       
   560         """Sets the return value of work associated with the future.
       
   561 
       
   562         Should only be used by Executor implementations and unit tests.
       
   563         """
       
   564         with self._condition:
       
   565             self._result = result
       
   566             self._state = FINISHED
       
   567             for waiter in self._waiters:
       
   568                 waiter.add_result(self)
       
   569             self._condition.notify_all()
       
   570         self._invoke_callbacks()
       
   571 
       
   572     def set_exception_info(self, exception, traceback):
       
   573         """Sets the result of the future as being the given exception
       
   574         and traceback.
       
   575 
       
   576         Should only be used by Executor implementations and unit tests.
       
   577         """
       
   578         with self._condition:
       
   579             self._exception = exception
       
   580             self._traceback = traceback
       
   581             self._state = FINISHED
       
   582             for waiter in self._waiters:
       
   583                 waiter.add_exception(self)
       
   584             self._condition.notify_all()
       
   585         self._invoke_callbacks()
       
   586 
       
   587     def set_exception(self, exception):
       
   588         """Sets the result of the future as being the given exception.
       
   589 
       
   590         Should only be used by Executor implementations and unit tests.
       
   591         """
       
   592         self.set_exception_info(exception, None)
       
   593 
       
   594 class Executor(object):
       
   595     """This is an abstract base class for concrete asynchronous executors."""
       
   596 
       
   597     def submit(self, fn, *args, **kwargs):
       
   598         """Submits a callable to be executed with the given arguments.
       
   599 
       
   600         Schedules the callable to be executed as fn(*args, **kwargs) and returns
       
   601         a Future instance representing the execution of the callable.
       
   602 
       
   603         Returns:
       
   604             A Future representing the given call.
       
   605         """
       
   606         raise NotImplementedError()
       
   607 
       
   608     def map(self, fn, *iterables, **kwargs):
       
   609         """Returns an iterator equivalent to map(fn, iter).
       
   610 
       
   611         Args:
       
   612             fn: A callable that will take as many arguments as there are
       
   613                 passed iterables.
       
   614             timeout: The maximum number of seconds to wait. If None, then there
       
   615                 is no limit on the wait time.
       
   616 
       
   617         Returns:
       
   618             An iterator equivalent to: map(func, *iterables) but the calls may
       
   619             be evaluated out-of-order.
       
   620 
       
   621         Raises:
       
   622             TimeoutError: If the entire result iterator could not be generated
       
   623                 before the given timeout.
       
   624             Exception: If fn(*args) raises for any values.
       
   625         """
       
   626         timeout = kwargs.get('timeout')
       
   627         if timeout is not None:
       
   628             end_time = timeout + time.time()
       
   629 
       
   630         fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
       
   631 
       
   632         # Yield must be hidden in closure so that the futures are submitted
       
   633         # before the first iterator value is required.
       
   634         def result_iterator():
       
   635             try:
       
   636                 # reverse to keep finishing order
       
   637                 fs.reverse()
       
   638                 while fs:
       
   639                     # Careful not to keep a reference to the popped future
       
   640                     if timeout is None:
       
   641                         yield fs.pop().result()
       
   642                     else:
       
   643                         yield fs.pop().result(end_time - time.time())
       
   644             finally:
       
   645                 for future in fs:
       
   646                     future.cancel()
       
   647         return result_iterator()
       
   648 
       
   649     def shutdown(self, wait=True):
       
   650         """Clean-up the resources associated with the Executor.
       
   651 
       
   652         It is safe to call this method several times. Otherwise, no other
       
   653         methods can be called after this one.
       
   654 
       
   655         Args:
       
   656             wait: If True then shutdown will not return until all running
       
   657                 futures have finished executing and the resources used by the
       
   658                 executor have been reclaimed.
       
   659         """
       
   660         pass
       
   661 
       
   662     def __enter__(self):
       
   663         return self
       
   664 
       
   665     def __exit__(self, exc_type, exc_val, exc_tb):
       
   666         self.shutdown(wait=True)
       
   667         return False