mercurial/thirdparty/concurrent/futures/process.py
author Gregory Szorc <gregory.szorc@gmail.com>
Wed, 11 Apr 2018 14:48:24 -0700
changeset 37623 eb687c28a915
child 37626 0a9c0d3480b2
permissions -rw-r--r--
thirdparty: vendor futures 3.2.0 Python 3 has a concurrent.futures package in the standard library for representing futures. The "futures" package on PyPI is a backport of this package to work with Python 2. The wire protocol code today has its own future concept for handling of "batch" requests. The frame-based protocol will also want to use futures. I've heavily used the "futures" package on Python 2 in other projects and it is pretty nice. It even has a built-in thread and process pool for running functions in parallel. I've used this heavily for concurrent I/O and other GIL-less activities. The existing futures API in the wire protocol code is not as nice as concurrent.futures. Since concurrent.futures is in the Python standard library and will presumably be the long-term future for futures in our code base, let's vendor the backport so we can use proper futures today. # no-check-commit because of style violations Differential Revision: https://phab.mercurial-scm.org/D3261
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
37623
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     1
# Copyright 2009 Brian Quinlan. All Rights Reserved.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     2
# Licensed to PSF under a Contributor Agreement.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     3
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     4
"""Implements ProcessPoolExecutor.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     5
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     6
The follow diagram and text describe the data-flow through the system:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     7
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     8
|======================= In-process =====================|== Out-of-process ==|
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
     9
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    10
+----------+     +----------+       +--------+     +-----------+    +---------+
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    11
|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    12
|          |     +----------+       |        |     +-----------+    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    13
|          |     | ...      |       |        |     | ...       |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    14
|          |     | 6        |       |        |     | 5, call() |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    15
|          |     | 7        |       |        |     | ...       |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    16
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    17
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    18
| Executor |                        | Thread |                      |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    19
|          |     +----------- +     |        |     +-----------+    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    20
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    21
|          |     +------------+     |        |     +-----------+    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    22
|          |     | 6: call()  |     |        |     | ...       |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    23
|          |     |    future  |     |        |     | 4, result |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    24
|          |     | ...        |     |        |     | 3, except |    |         |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    25
+----------+     +------------+     +--------+     +-----------+    +---------+
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    26
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    27
Executor.submit() called:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    28
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    29
- adds the id of the _WorkItem to the "Work Ids" queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    30
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    31
Local worker thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    32
- reads work ids from the "Work Ids" queue and looks up the corresponding
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    33
  WorkItem from the "Work Items" dict: if the work item has been cancelled then
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    34
  it is simply removed from the dict, otherwise it is repackaged as a
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    35
  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    36
  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    37
  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    38
- reads _ResultItems from "Result Q", updates the future stored in the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    39
  "Work Items" dict and deletes the dict entry
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    40
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    41
Process #1..n:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    42
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    43
  _ResultItems in "Request Q"
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    44
"""
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    45
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    46
import atexit
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    47
from concurrent.futures import _base
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    48
import Queue as queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    49
import multiprocessing
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    50
import threading
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    51
import weakref
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    52
import sys
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    53
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    54
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    55
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    56
# Workers are created as daemon threads and processes. This is done to allow the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    57
# interpreter to exit when there are still idle processes in a
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    58
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    59
# allowing workers to die with the interpreter has two undesirable properties:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    60
#   - The workers would still be running during interpretor shutdown,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    61
#     meaning that they would fail in unpredictable ways.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    62
#   - The workers could be killed while evaluating a work item, which could
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    63
#     be bad if the callable being evaluated has external side-effects e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    64
#     writing to a file.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    65
#
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    66
# To work around this problem, an exit handler is installed which tells the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    67
# workers to exit when their work queues are empty and then waits until the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    68
# threads/processes finish.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    69
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    70
_threads_queues = weakref.WeakKeyDictionary()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    71
_shutdown = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    72
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    73
def _python_exit():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    74
    global _shutdown
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    75
    _shutdown = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    76
    items = list(_threads_queues.items()) if _threads_queues else ()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    77
    for t, q in items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    78
        q.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    79
    for t, q in items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    80
        t.join(sys.maxint)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    81
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    82
# Controls how many more calls than processes will be queued in the call queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    83
# A smaller number will mean that processes spend more time idle waiting for
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    84
# work while a larger number will make Future.cancel() succeed less frequently
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    85
# (Futures in the call queue cannot be cancelled).
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    86
EXTRA_QUEUED_CALLS = 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    87
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    88
class _WorkItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    89
    def __init__(self, future, fn, args, kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    90
        self.future = future
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    91
        self.fn = fn
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    92
        self.args = args
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    93
        self.kwargs = kwargs
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    94
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    95
class _ResultItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    96
    def __init__(self, work_id, exception=None, result=None):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    97
        self.work_id = work_id
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    98
        self.exception = exception
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
    99
        self.result = result
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   100
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   101
class _CallItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   102
    def __init__(self, work_id, fn, args, kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   103
        self.work_id = work_id
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   104
        self.fn = fn
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   105
        self.args = args
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   106
        self.kwargs = kwargs
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   107
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   108
def _process_worker(call_queue, result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   109
    """Evaluates calls from call_queue and places the results in result_queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   110
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   111
    This worker is run in a separate process.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   112
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   113
    Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   114
        call_queue: A multiprocessing.Queue of _CallItems that will be read and
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   115
            evaluated by the worker.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   116
        result_queue: A multiprocessing.Queue of _ResultItems that will written
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   117
            to by the worker.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   118
        shutdown: A multiprocessing.Event that will be set as a signal to the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   119
            worker that it should exit when call_queue is empty.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   120
    """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   121
    while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   122
        call_item = call_queue.get(block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   123
        if call_item is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   124
            # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   125
            result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   126
            return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   127
        try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   128
            r = call_item.fn(*call_item.args, **call_item.kwargs)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   129
        except:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   130
            e = sys.exc_info()[1]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   131
            result_queue.put(_ResultItem(call_item.work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   132
                                         exception=e))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   133
        else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   134
            result_queue.put(_ResultItem(call_item.work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   135
                                         result=r))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   136
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   137
def _add_call_item_to_queue(pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   138
                            work_ids,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   139
                            call_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   140
    """Fills call_queue with _WorkItems from pending_work_items.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   141
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   142
    This function never blocks.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   143
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   144
    Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   145
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   146
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   147
        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   148
            are consumed and the corresponding _WorkItems from
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   149
            pending_work_items are transformed into _CallItems and put in
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   150
            call_queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   151
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   152
            derived from _WorkItems.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   153
    """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   154
    while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   155
        if call_queue.full():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   156
            return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   157
        try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   158
            work_id = work_ids.get(block=False)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   159
        except queue.Empty:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   160
            return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   161
        else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   162
            work_item = pending_work_items[work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   163
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   164
            if work_item.future.set_running_or_notify_cancel():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   165
                call_queue.put(_CallItem(work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   166
                                         work_item.fn,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   167
                                         work_item.args,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   168
                                         work_item.kwargs),
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   169
                               block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   170
            else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   171
                del pending_work_items[work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   172
                continue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   173
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   174
def _queue_management_worker(executor_reference,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   175
                             processes,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   176
                             pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   177
                             work_ids_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   178
                             call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   179
                             result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   180
    """Manages the communication between this process and the worker processes.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   181
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   182
    This function is run in a local thread.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   183
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   184
    Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   185
        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   186
            this thread. Used to determine if the ProcessPoolExecutor has been
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   187
            garbage collected and that this function can exit.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   188
        process: A list of the multiprocessing.Process instances used as
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   189
            workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   190
        pending_work_items: A dict mapping work ids to _WorkItems e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   191
            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   192
        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   193
        call_queue: A multiprocessing.Queue that will be filled with _CallItems
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   194
            derived from _WorkItems for processing by the process workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   195
        result_queue: A multiprocessing.Queue of _ResultItems generated by the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   196
            process workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   197
    """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   198
    nb_shutdown_processes = [0]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   199
    def shutdown_one_process():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   200
        """Tell a worker to terminate, which will in turn wake us again"""
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   201
        call_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   202
        nb_shutdown_processes[0] += 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   203
    while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   204
        _add_call_item_to_queue(pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   205
                                work_ids_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   206
                                call_queue)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   207
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   208
        result_item = result_queue.get(block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   209
        if result_item is not None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   210
            work_item = pending_work_items[result_item.work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   211
            del pending_work_items[result_item.work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   212
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   213
            if result_item.exception:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   214
                work_item.future.set_exception(result_item.exception)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   215
            else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   216
                work_item.future.set_result(result_item.result)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   217
            # Delete references to object. See issue16284
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   218
            del work_item
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   219
        # Check whether we should start shutting down.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   220
        executor = executor_reference()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   221
        # No more work items can be added if:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   222
        #   - The interpreter is shutting down OR
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   223
        #   - The executor that owns this worker has been collected OR
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   224
        #   - The executor that owns this worker has been shutdown.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   225
        if _shutdown or executor is None or executor._shutdown_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   226
            # Since no new work items can be added, it is safe to shutdown
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   227
            # this thread if there are no pending work items.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   228
            if not pending_work_items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   229
                while nb_shutdown_processes[0] < len(processes):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   230
                    shutdown_one_process()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   231
                # If .join() is not called on the created processes then
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   232
                # some multiprocessing.Queue methods may deadlock on Mac OS
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   233
                # X.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   234
                for p in processes:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   235
                    p.join()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   236
                call_queue.close()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   237
                return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   238
        del executor
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   239
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   240
_system_limits_checked = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   241
_system_limited = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   242
def _check_system_limits():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   243
    global _system_limits_checked, _system_limited
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   244
    if _system_limits_checked:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   245
        if _system_limited:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   246
            raise NotImplementedError(_system_limited)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   247
    _system_limits_checked = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   248
    try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   249
        import os
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   250
        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   251
    except (AttributeError, ValueError):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   252
        # sysconf not available or setting not available
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   253
        return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   254
    if nsems_max == -1:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   255
        # indetermine limit, assume that limit is determined
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   256
        # by available memory only
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   257
        return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   258
    if nsems_max >= 256:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   259
        # minimum number of semaphores available
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   260
        # according to POSIX
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   261
        return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   262
    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   263
    raise NotImplementedError(_system_limited)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   264
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   265
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   266
class ProcessPoolExecutor(_base.Executor):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   267
    def __init__(self, max_workers=None):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   268
        """Initializes a new ProcessPoolExecutor instance.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   269
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   270
        Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   271
            max_workers: The maximum number of processes that can be used to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   272
                execute the given calls. If None or not given then as many
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   273
                worker processes will be created as the machine has processors.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   274
        """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   275
        _check_system_limits()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   276
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   277
        if max_workers is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   278
            self._max_workers = multiprocessing.cpu_count()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   279
        else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   280
            if max_workers <= 0:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   281
                raise ValueError("max_workers must be greater than 0")
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   282
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   283
            self._max_workers = max_workers
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   284
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   285
        # Make the call queue slightly larger than the number of processes to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   286
        # prevent the worker processes from idling. But don't make it too big
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   287
        # because futures in the call queue cannot be cancelled.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   288
        self._call_queue = multiprocessing.Queue(self._max_workers +
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   289
                                                 EXTRA_QUEUED_CALLS)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   290
        self._result_queue = multiprocessing.Queue()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   291
        self._work_ids = queue.Queue()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   292
        self._queue_management_thread = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   293
        self._processes = set()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   294
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   295
        # Shutdown is a two-step process.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   296
        self._shutdown_thread = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   297
        self._shutdown_lock = threading.Lock()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   298
        self._queue_count = 0
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   299
        self._pending_work_items = {}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   300
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   301
    def _start_queue_management_thread(self):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   302
        # When the executor gets lost, the weakref callback will wake up
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   303
        # the queue management thread.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   304
        def weakref_cb(_, q=self._result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   305
            q.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   306
        if self._queue_management_thread is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   307
            self._queue_management_thread = threading.Thread(
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   308
                    target=_queue_management_worker,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   309
                    args=(weakref.ref(self, weakref_cb),
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   310
                          self._processes,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   311
                          self._pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   312
                          self._work_ids,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   313
                          self._call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   314
                          self._result_queue))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   315
            self._queue_management_thread.daemon = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   316
            self._queue_management_thread.start()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   317
            _threads_queues[self._queue_management_thread] = self._result_queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   318
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   319
    def _adjust_process_count(self):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   320
        for _ in range(len(self._processes), self._max_workers):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   321
            p = multiprocessing.Process(
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   322
                    target=_process_worker,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   323
                    args=(self._call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   324
                          self._result_queue))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   325
            p.start()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   326
            self._processes.add(p)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   327
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   328
    def submit(self, fn, *args, **kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   329
        with self._shutdown_lock:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   330
            if self._shutdown_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   331
                raise RuntimeError('cannot schedule new futures after shutdown')
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   332
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   333
            f = _base.Future()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   334
            w = _WorkItem(f, fn, args, kwargs)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   335
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   336
            self._pending_work_items[self._queue_count] = w
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   337
            self._work_ids.put(self._queue_count)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   338
            self._queue_count += 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   339
            # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   340
            self._result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   341
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   342
            self._start_queue_management_thread()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   343
            self._adjust_process_count()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   344
            return f
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   345
    submit.__doc__ = _base.Executor.submit.__doc__
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   346
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   347
    def shutdown(self, wait=True):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   348
        with self._shutdown_lock:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   349
            self._shutdown_thread = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   350
        if self._queue_management_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   351
            # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   352
            self._result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   353
            if wait:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   354
                self._queue_management_thread.join(sys.maxint)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   355
        # To reduce the risk of openning too many files, remove references to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   356
        # objects that use file descriptors.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   357
        self._queue_management_thread = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   358
        self._call_queue = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   359
        self._result_queue = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   360
        self._processes = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   361
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   362
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
   363
atexit.register(_python_exit)