mercurial/thirdparty/concurrent/futures/process.py
author Jordi Guti?rrez Hermoso <jordigh@octave.org>
Thu, 12 Dec 2019 11:41:28 -0500
changeset 43886 fe0daceb51d0
parent 37626 0a9c0d3480b2
permissions -rw-r--r--
hgweb: fix error in docstring Despite the subtle semantic difference, this sentence really meant to say "overridden", not "overwritten".
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
37623
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     1
# Copyright 2009 Brian Quinlan. All Rights Reserved.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     2
# Licensed to PSF under a Contributor Agreement.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     3
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     4
"""Implements ProcessPoolExecutor.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     5
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     6
The follow diagram and text describe the data-flow through the system:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     7
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     8
|======================= In-process =====================|== Out-of-process ==|
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     9
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    10
+----------+     +----------+       +--------+     +-----------+    +---------+
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    11
|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    12
|          |     +----------+       |        |     +-----------+    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    13
|          |     | ...      |       |        |     | ...       |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    14
|          |     | 6        |       |        |     | 5, call() |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    15
|          |     | 7        |       |        |     | ...       |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    16
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    17
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    18
| Executor |                        | Thread |                      |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    19
|          |     +----------- +     |        |     +-----------+    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    20
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    21
|          |     +------------+     |        |     +-----------+    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    22
|          |     | 6: call()  |     |        |     | ...       |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    23
|          |     |    future  |     |        |     | 4, result |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    24
|          |     | ...        |     |        |     | 3, except |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    25
+----------+     +------------+     +--------+     +-----------+    +---------+
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    26
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    27
Executor.submit() called:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    28
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    29
- adds the id of the _WorkItem to the "Work Ids" queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    30
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    31
Local worker thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    32
- reads work ids from the "Work Ids" queue and looks up the corresponding
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    33
  WorkItem from the "Work Items" dict: if the work item has been cancelled then
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    34
  it is simply removed from the dict, otherwise it is repackaged as a
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    35
  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    36
  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    37
  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    38
- reads _ResultItems from "Result Q", updates the future stored in the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    39
  "Work Items" dict and deletes the dict entry
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    40
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    41
Process #1..n:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    42
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    43
  _ResultItems in "Request Q"
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    44
"""
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    45
37626
0a9c0d3480b2 futures: switch to absolute and relative imports
Gregory Szorc <gregory.szorc@gmail.com>
parents: 37623
diff changeset
    46
from __future__ import absolute_import
0a9c0d3480b2 futures: switch to absolute and relative imports
Gregory Szorc <gregory.szorc@gmail.com>
parents: 37623
diff changeset
    47
37623
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    48
import atexit
37626
0a9c0d3480b2 futures: switch to absolute and relative imports
Gregory Szorc <gregory.szorc@gmail.com>
parents: 37623
diff changeset
    49
from . import _base
37623
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    50
import Queue as queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    51
import multiprocessing
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    52
import threading
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    53
import weakref
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    54
import sys
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    55
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    56
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    57
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    58
# Workers are created as daemon threads and processes. This is done to allow the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    59
# interpreter to exit when there are still idle processes in a
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    60
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    61
# allowing workers to die with the interpreter has two undesirable properties:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    62
#   - The workers would still be running during interpretor shutdown,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    63
#     meaning that they would fail in unpredictable ways.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    64
#   - The workers could be killed while evaluating a work item, which could
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    65
#     be bad if the callable being evaluated has external side-effects e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    66
#     writing to a file.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    67
#
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    68
# To work around this problem, an exit handler is installed which tells the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    69
# workers to exit when their work queues are empty and then waits until the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    70
# threads/processes finish.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    71
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    72
_threads_queues = weakref.WeakKeyDictionary()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    73
_shutdown = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    74
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    75
def _python_exit():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    76
    global _shutdown
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    77
    _shutdown = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    78
    items = list(_threads_queues.items()) if _threads_queues else ()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    79
    for t, q in items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    80
        q.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    81
    for t, q in items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    82
        t.join(sys.maxint)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    83
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    84
# Controls how many more calls than processes will be queued in the call queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    85
# A smaller number will mean that processes spend more time idle waiting for
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    86
# work while a larger number will make Future.cancel() succeed less frequently
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    87
# (Futures in the call queue cannot be cancelled).
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    88
EXTRA_QUEUED_CALLS = 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    89
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    90
class _WorkItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    91
    def __init__(self, future, fn, args, kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    92
        self.future = future
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    93
        self.fn = fn
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    94
        self.args = args
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    95
        self.kwargs = kwargs
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    96
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    97
class _ResultItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    98
    def __init__(self, work_id, exception=None, result=None):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    99
        self.work_id = work_id
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   100
        self.exception = exception
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   101
        self.result = result
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   102
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   103
class _CallItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   104
    def __init__(self, work_id, fn, args, kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   105
        self.work_id = work_id
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   106
        self.fn = fn
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   107
        self.args = args
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   108
        self.kwargs = kwargs
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   109
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   110
def _process_worker(call_queue, result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   111
    """Evaluates calls from call_queue and places the results in result_queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   112
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   113
    This worker is run in a separate process.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   114
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   115
    Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   116
        call_queue: A multiprocessing.Queue of _CallItems that will be read and
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   117
            evaluated by the worker.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   118
        result_queue: A multiprocessing.Queue of _ResultItems that will written
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   119
            to by the worker.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   120
        shutdown: A multiprocessing.Event that will be set as a signal to the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   121
            worker that it should exit when call_queue is empty.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   122
    """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   123
    while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   124
        call_item = call_queue.get(block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   125
        if call_item is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   126
            # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   127
            result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   128
            return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   129
        try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   130
            r = call_item.fn(*call_item.args, **call_item.kwargs)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   131
        except:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   132
            e = sys.exc_info()[1]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   133
            result_queue.put(_ResultItem(call_item.work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   134
                                         exception=e))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   135
        else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   136
            result_queue.put(_ResultItem(call_item.work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   137
                                         result=r))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   138
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   139
def _add_call_item_to_queue(pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   140
                            work_ids,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   141
                            call_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   142
    """Fills call_queue with _WorkItems from pending_work_items.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   143
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   144
    This function never blocks.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   145
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   146
    Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   147
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   148
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   149
        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   150
            are consumed and the corresponding _WorkItems from
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   151
            pending_work_items are transformed into _CallItems and put in
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   152
            call_queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   153
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   154
            derived from _WorkItems.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   155
    """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   156
    while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   157
        if call_queue.full():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   158
            return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   159
        try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   160
            work_id = work_ids.get(block=False)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   161
        except queue.Empty:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   162
            return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   163
        else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   164
            work_item = pending_work_items[work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   165
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   166
            if work_item.future.set_running_or_notify_cancel():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   167
                call_queue.put(_CallItem(work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   168
                                         work_item.fn,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   169
                                         work_item.args,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   170
                                         work_item.kwargs),
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   171
                               block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   172
            else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   173
                del pending_work_items[work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   174
                continue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   175
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   176
def _queue_management_worker(executor_reference,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   177
                             processes,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   178
                             pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   179
                             work_ids_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   180
                             call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   181
                             result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   182
    """Manages the communication between this process and the worker processes.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   183
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   184
    This function is run in a local thread.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   185
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   186
    Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   187
        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   188
            this thread. Used to determine if the ProcessPoolExecutor has been
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   189
            garbage collected and that this function can exit.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   190
        process: A list of the multiprocessing.Process instances used as
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   191
            workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   192
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   193
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   194
        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   195
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   196
            derived from _WorkItems for processing by the process workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   197
        result_queue: A multiprocessing.Queue of _ResultItems generated by the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   198
            process workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   199
    """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   200
    nb_shutdown_processes = [0]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   201
    def shutdown_one_process():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   202
        """Tell a worker to terminate, which will in turn wake us again"""
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   203
        call_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   204
        nb_shutdown_processes[0] += 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   205
    while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   206
        _add_call_item_to_queue(pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   207
                                work_ids_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   208
                                call_queue)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   209
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   210
        result_item = result_queue.get(block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   211
        if result_item is not None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   212
            work_item = pending_work_items[result_item.work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   213
            del pending_work_items[result_item.work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   214
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   215
            if result_item.exception:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   216
                work_item.future.set_exception(result_item.exception)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   217
            else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   218
                work_item.future.set_result(result_item.result)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   219
            # Delete references to object. See issue16284
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   220
            del work_item
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   221
        # Check whether we should start shutting down.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   222
        executor = executor_reference()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   223
        # No more work items can be added if:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   224
        #   - The interpreter is shutting down OR
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   225
        #   - The executor that owns this worker has been collected OR
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   226
        #   - The executor that owns this worker has been shutdown.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   227
        if _shutdown or executor is None or executor._shutdown_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   228
            # Since no new work items can be added, it is safe to shutdown
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   229
            # this thread if there are no pending work items.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   230
            if not pending_work_items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   231
                while nb_shutdown_processes[0] < len(processes):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   232
                    shutdown_one_process()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   233
                # If .join() is not called on the created processes then
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   234
                # some multiprocessing.Queue methods may deadlock on Mac OS
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   235
                # X.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   236
                for p in processes:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   237
                    p.join()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   238
                call_queue.close()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   239
                return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   240
        del executor
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   241
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   242
_system_limits_checked = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   243
_system_limited = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   244
def _check_system_limits():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   245
    global _system_limits_checked, _system_limited
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   246
    if _system_limits_checked:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   247
        if _system_limited:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   248
            raise NotImplementedError(_system_limited)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   249
    _system_limits_checked = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   250
    try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   251
        import os
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   252
        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   253
    except (AttributeError, ValueError):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   254
        # sysconf not available or setting not available
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   255
        return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   256
    if nsems_max == -1:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   257
        # indetermine limit, assume that limit is determined
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   258
        # by available memory only
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   259
        return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   260
    if nsems_max >= 256:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   261
        # minimum number of semaphores available
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   262
        # according to POSIX
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   263
        return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   264
    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   265
    raise NotImplementedError(_system_limited)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   266
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   267
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   268
class ProcessPoolExecutor(_base.Executor):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   269
    def __init__(self, max_workers=None):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   270
        """Initializes a new ProcessPoolExecutor instance.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   271
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   272
        Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   273
            max_workers: The maximum number of processes that can be used to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   274
                execute the given calls. If None or not given then as many
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   275
                worker processes will be created as the machine has processors.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   276
        """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   277
        _check_system_limits()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   278
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   279
        if max_workers is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   280
            self._max_workers = multiprocessing.cpu_count()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   281
        else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   282
            if max_workers <= 0:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   283
                raise ValueError("max_workers must be greater than 0")
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   284
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   285
            self._max_workers = max_workers
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   286
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   287
        # Make the call queue slightly larger than the number of processes to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   288
        # prevent the worker processes from idling. But don't make it too big
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   289
        # because futures in the call queue cannot be cancelled.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   290
        self._call_queue = multiprocessing.Queue(self._max_workers +
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   291
                                                 EXTRA_QUEUED_CALLS)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   292
        self._result_queue = multiprocessing.Queue()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   293
        self._work_ids = queue.Queue()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   294
        self._queue_management_thread = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   295
        self._processes = set()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   296
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   297
        # Shutdown is a two-step process.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   298
        self._shutdown_thread = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   299
        self._shutdown_lock = threading.Lock()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   300
        self._queue_count = 0
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   301
        self._pending_work_items = {}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   302
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   303
    def _start_queue_management_thread(self):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   304
        # When the executor gets lost, the weakref callback will wake up
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   305
        # the queue management thread.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   306
        def weakref_cb(_, q=self._result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   307
            q.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   308
        if self._queue_management_thread is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   309
            self._queue_management_thread = threading.Thread(
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   310
                    target=_queue_management_worker,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   311
                    args=(weakref.ref(self, weakref_cb),
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   312
                          self._processes,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   313
                          self._pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   314
                          self._work_ids,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   315
                          self._call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   316
                          self._result_queue))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   317
            self._queue_management_thread.daemon = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   318
            self._queue_management_thread.start()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   319
            _threads_queues[self._queue_management_thread] = self._result_queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   320
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   321
    def _adjust_process_count(self):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   322
        for _ in range(len(self._processes), self._max_workers):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   323
            p = multiprocessing.Process(
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   324
                    target=_process_worker,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   325
                    args=(self._call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   326
                          self._result_queue))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   327
            p.start()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   328
            self._processes.add(p)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   329
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   330
    def submit(self, fn, *args, **kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   331
        with self._shutdown_lock:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   332
            if self._shutdown_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   333
                raise RuntimeError('cannot schedule new futures after shutdown')
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   334
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   335
            f = _base.Future()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   336
            w = _WorkItem(f, fn, args, kwargs)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   337
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   338
            self._pending_work_items[self._queue_count] = w
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   339
            self._work_ids.put(self._queue_count)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   340
            self._queue_count += 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   341
            # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   342
            self._result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   343
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   344
            self._start_queue_management_thread()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   345
            self._adjust_process_count()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   346
            return f
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   347
    submit.__doc__ = _base.Executor.submit.__doc__
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   348
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   349
    def shutdown(self, wait=True):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   350
        with self._shutdown_lock:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   351
            self._shutdown_thread = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   352
        if self._queue_management_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   353
            # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   354
            self._result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   355
            if wait:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   356
                self._queue_management_thread.join(sys.maxint)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   357
        # To reduce the risk of openning too many files, remove references to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   358
        # objects that use file descriptors.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   359
        self._queue_management_thread = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   360
        self._call_queue = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   361
        self._result_queue = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   362
        self._processes = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   363
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   364
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   365
atexit.register(_python_exit)