annotate mercurial/thirdparty/concurrent/futures/process.py @ 37623:eb687c28a915

thirdparty: vendor futures 3.2.0 Python 3 has a concurrent.futures package in the standard library for representing futures. The "futures" package on PyPI is a backport of this package to work with Python 2. The wire protocol code today has its own future concept for handling of "batch" requests. The frame-based protocol will also want to use futures. I've heavily used the "futures" package on Python 2 in other projects and it is pretty nice. It even has a built-in thread and process pool for running functions in parallel. I've used this heavily for concurrent I/O and other GIL-less activities. The existing futures API in the wire protocol code is not as nice as concurrent.futures. Since concurrent.futures is in the Python standard library and will presumably be the long-term future for futures in our code base, let's vendor the backport so we can use proper futures today. # no-check-commit because of style violations Differential Revision: https://phab.mercurial-scm.org/D3261
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 11 Apr 2018 14:48:24 -0700
parents
children 0a9c0d3480b2
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
37623
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
2 # Licensed to PSF under a Contributor Agreement.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
3
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
4 """Implements ProcessPoolExecutor.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
5
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
6 The follow diagram and text describe the data-flow through the system:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
7
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
8 |======================= In-process =====================|== Out-of-process ==|
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
9
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
10 +----------+ +----------+ +--------+ +-----------+ +---------+
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
11 | | => | Work Ids | => | | => | Call Q | => | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
12 | | +----------+ | | +-----------+ | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
13 | | | ... | | | | ... | | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
14 | | | 6 | | | | 5, call() | | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
15 | | | 7 | | | | ... | | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
16 | Process | | ... | | Local | +-----------+ | Process |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
17 | Pool | +----------+ | Worker | | #1..n |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
18 | Executor | | Thread | | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
19 | | +----------- + | | +-----------+ | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
20 | | <=> | Work Items | <=> | | <= | Result Q | <= | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
21 | | +------------+ | | +-----------+ | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
22 | | | 6: call() | | | | ... | | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
23 | | | future | | | | 4, result | | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
24 | | | ... | | | | 3, except | | |
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
25 +----------+ +------------+ +--------+ +-----------+ +---------+
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
26
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
27 Executor.submit() called:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
29 - adds the id of the _WorkItem to the "Work Ids" queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
30
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
31 Local worker thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
32 - reads work ids from the "Work Ids" queue and looks up the corresponding
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
34 it is simply removed from the dict, otherwise it is repackaged as a
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
38 - reads _ResultItems from "Result Q", updates the future stored in the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
39 "Work Items" dict and deletes the dict entry
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
40
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
41 Process #1..n:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
43 _ResultItems in "Request Q"
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
44 """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
45
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
46 import atexit
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
47 from concurrent.futures import _base
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
48 import Queue as queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
49 import multiprocessing
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
50 import threading
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
51 import weakref
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
52 import sys
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
53
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
54 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
55
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
56 # Workers are created as daemon threads and processes. This is done to allow the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
57 # interpreter to exit when there are still idle processes in a
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
58 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
59 # allowing workers to die with the interpreter has two undesirable properties:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
60 # - The workers would still be running during interpretor shutdown,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
61 # meaning that they would fail in unpredictable ways.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
62 # - The workers could be killed while evaluating a work item, which could
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
63 # be bad if the callable being evaluated has external side-effects e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
64 # writing to a file.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
65 #
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
66 # To work around this problem, an exit handler is installed which tells the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
67 # workers to exit when their work queues are empty and then waits until the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
68 # threads/processes finish.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
69
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
70 _threads_queues = weakref.WeakKeyDictionary()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
71 _shutdown = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
72
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
73 def _python_exit():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
74 global _shutdown
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
75 _shutdown = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
76 items = list(_threads_queues.items()) if _threads_queues else ()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
77 for t, q in items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
78 q.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
79 for t, q in items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
80 t.join(sys.maxint)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
81
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
82 # Controls how many more calls than processes will be queued in the call queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
83 # A smaller number will mean that processes spend more time idle waiting for
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
84 # work while a larger number will make Future.cancel() succeed less frequently
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
85 # (Futures in the call queue cannot be cancelled).
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
86 EXTRA_QUEUED_CALLS = 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
87
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
88 class _WorkItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
89 def __init__(self, future, fn, args, kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
90 self.future = future
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
91 self.fn = fn
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
92 self.args = args
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
93 self.kwargs = kwargs
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
94
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
95 class _ResultItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
96 def __init__(self, work_id, exception=None, result=None):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
97 self.work_id = work_id
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
98 self.exception = exception
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
99 self.result = result
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
100
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
101 class _CallItem(object):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
102 def __init__(self, work_id, fn, args, kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
103 self.work_id = work_id
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
104 self.fn = fn
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
105 self.args = args
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
106 self.kwargs = kwargs
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
107
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
108 def _process_worker(call_queue, result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
109 """Evaluates calls from call_queue and places the results in result_queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
110
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
111 This worker is run in a separate process.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
112
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
113 Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
114 call_queue: A multiprocessing.Queue of _CallItems that will be read and
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
115 evaluated by the worker.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
116 result_queue: A multiprocessing.Queue of _ResultItems that will written
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
117 to by the worker.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
118 shutdown: A multiprocessing.Event that will be set as a signal to the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
119 worker that it should exit when call_queue is empty.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
120 """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
121 while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
122 call_item = call_queue.get(block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
123 if call_item is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
124 # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
125 result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
126 return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
127 try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
128 r = call_item.fn(*call_item.args, **call_item.kwargs)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
129 except:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
130 e = sys.exc_info()[1]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
131 result_queue.put(_ResultItem(call_item.work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
132 exception=e))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
133 else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
134 result_queue.put(_ResultItem(call_item.work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
135 result=r))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
136
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
137 def _add_call_item_to_queue(pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
138 work_ids,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
139 call_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
140 """Fills call_queue with _WorkItems from pending_work_items.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
141
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
142 This function never blocks.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
143
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
144 Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
145 pending_work_items: A dict mapping work ids to _WorkItems e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
146 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
147 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
148 are consumed and the corresponding _WorkItems from
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
149 pending_work_items are transformed into _CallItems and put in
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
150 call_queue.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
151 call_queue: A multiprocessing.Queue that will be filled with _CallItems
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
152 derived from _WorkItems.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
153 """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
154 while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
155 if call_queue.full():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
156 return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
157 try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
158 work_id = work_ids.get(block=False)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
159 except queue.Empty:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
160 return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
161 else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
162 work_item = pending_work_items[work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
163
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
164 if work_item.future.set_running_or_notify_cancel():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
165 call_queue.put(_CallItem(work_id,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
166 work_item.fn,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
167 work_item.args,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
168 work_item.kwargs),
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
169 block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
170 else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
171 del pending_work_items[work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
172 continue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
173
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
174 def _queue_management_worker(executor_reference,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
175 processes,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
176 pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
177 work_ids_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
178 call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
179 result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
180 """Manages the communication between this process and the worker processes.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
181
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
182 This function is run in a local thread.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
183
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
184 Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
185 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
186 this thread. Used to determine if the ProcessPoolExecutor has been
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
187 garbage collected and that this function can exit.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
188 process: A list of the multiprocessing.Process instances used as
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
189 workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
190 pending_work_items: A dict mapping work ids to _WorkItems e.g.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
191 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
192 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
193 call_queue: A multiprocessing.Queue that will be filled with _CallItems
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
194 derived from _WorkItems for processing by the process workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
195 result_queue: A multiprocessing.Queue of _ResultItems generated by the
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
196 process workers.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
197 """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
198 nb_shutdown_processes = [0]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
199 def shutdown_one_process():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
200 """Tell a worker to terminate, which will in turn wake us again"""
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
201 call_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
202 nb_shutdown_processes[0] += 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
203 while True:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
204 _add_call_item_to_queue(pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
205 work_ids_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
206 call_queue)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
207
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
208 result_item = result_queue.get(block=True)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
209 if result_item is not None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
210 work_item = pending_work_items[result_item.work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
211 del pending_work_items[result_item.work_id]
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
212
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
213 if result_item.exception:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
214 work_item.future.set_exception(result_item.exception)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
215 else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
216 work_item.future.set_result(result_item.result)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
217 # Delete references to object. See issue16284
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
218 del work_item
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
219 # Check whether we should start shutting down.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
220 executor = executor_reference()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
221 # No more work items can be added if:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
222 # - The interpreter is shutting down OR
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
223 # - The executor that owns this worker has been collected OR
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
224 # - The executor that owns this worker has been shutdown.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
225 if _shutdown or executor is None or executor._shutdown_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
226 # Since no new work items can be added, it is safe to shutdown
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
227 # this thread if there are no pending work items.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
228 if not pending_work_items:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
229 while nb_shutdown_processes[0] < len(processes):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
230 shutdown_one_process()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
231 # If .join() is not called on the created processes then
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
232 # some multiprocessing.Queue methods may deadlock on Mac OS
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
233 # X.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
234 for p in processes:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
235 p.join()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
236 call_queue.close()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
237 return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
238 del executor
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
239
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
240 _system_limits_checked = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
241 _system_limited = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
242 def _check_system_limits():
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
243 global _system_limits_checked, _system_limited
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
244 if _system_limits_checked:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
245 if _system_limited:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
246 raise NotImplementedError(_system_limited)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
247 _system_limits_checked = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
248 try:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
249 import os
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
250 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
251 except (AttributeError, ValueError):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
252 # sysconf not available or setting not available
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
253 return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
254 if nsems_max == -1:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
255 # indetermine limit, assume that limit is determined
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
256 # by available memory only
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
257 return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
258 if nsems_max >= 256:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
259 # minimum number of semaphores available
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
260 # according to POSIX
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
261 return
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
262 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
263 raise NotImplementedError(_system_limited)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
264
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
265
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
266 class ProcessPoolExecutor(_base.Executor):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
267 def __init__(self, max_workers=None):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
268 """Initializes a new ProcessPoolExecutor instance.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
269
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
270 Args:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
271 max_workers: The maximum number of processes that can be used to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
272 execute the given calls. If None or not given then as many
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
273 worker processes will be created as the machine has processors.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
274 """
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
275 _check_system_limits()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
276
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
277 if max_workers is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
278 self._max_workers = multiprocessing.cpu_count()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
279 else:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
280 if max_workers <= 0:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
281 raise ValueError("max_workers must be greater than 0")
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
282
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
283 self._max_workers = max_workers
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
284
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
285 # Make the call queue slightly larger than the number of processes to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
286 # prevent the worker processes from idling. But don't make it too big
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
287 # because futures in the call queue cannot be cancelled.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
288 self._call_queue = multiprocessing.Queue(self._max_workers +
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
289 EXTRA_QUEUED_CALLS)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
290 self._result_queue = multiprocessing.Queue()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
291 self._work_ids = queue.Queue()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
292 self._queue_management_thread = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
293 self._processes = set()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
294
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
295 # Shutdown is a two-step process.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
296 self._shutdown_thread = False
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
297 self._shutdown_lock = threading.Lock()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
298 self._queue_count = 0
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
299 self._pending_work_items = {}
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
300
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
301 def _start_queue_management_thread(self):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
302 # When the executor gets lost, the weakref callback will wake up
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
303 # the queue management thread.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
304 def weakref_cb(_, q=self._result_queue):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
305 q.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
306 if self._queue_management_thread is None:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
307 self._queue_management_thread = threading.Thread(
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
308 target=_queue_management_worker,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
309 args=(weakref.ref(self, weakref_cb),
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
310 self._processes,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
311 self._pending_work_items,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
312 self._work_ids,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
313 self._call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
314 self._result_queue))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
315 self._queue_management_thread.daemon = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
316 self._queue_management_thread.start()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
317 _threads_queues[self._queue_management_thread] = self._result_queue
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
318
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
319 def _adjust_process_count(self):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
320 for _ in range(len(self._processes), self._max_workers):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
321 p = multiprocessing.Process(
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
322 target=_process_worker,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
323 args=(self._call_queue,
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
324 self._result_queue))
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
325 p.start()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
326 self._processes.add(p)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
327
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
328 def submit(self, fn, *args, **kwargs):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
329 with self._shutdown_lock:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
330 if self._shutdown_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
331 raise RuntimeError('cannot schedule new futures after shutdown')
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
332
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
333 f = _base.Future()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
334 w = _WorkItem(f, fn, args, kwargs)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
335
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
336 self._pending_work_items[self._queue_count] = w
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
337 self._work_ids.put(self._queue_count)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
338 self._queue_count += 1
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
339 # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
340 self._result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
341
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
342 self._start_queue_management_thread()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
343 self._adjust_process_count()
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
344 return f
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
345 submit.__doc__ = _base.Executor.submit.__doc__
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
346
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
347 def shutdown(self, wait=True):
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
348 with self._shutdown_lock:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
349 self._shutdown_thread = True
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
350 if self._queue_management_thread:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
351 # Wake up queue management thread
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
352 self._result_queue.put(None)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
353 if wait:
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
354 self._queue_management_thread.join(sys.maxint)
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
355 # To reduce the risk of openning too many files, remove references to
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
356 # objects that use file descriptors.
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
357 self._queue_management_thread = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
358 self._call_queue = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
359 self._result_queue = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
360 self._processes = None
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
361 shutdown.__doc__ = _base.Executor.shutdown.__doc__
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
362
eb687c28a915 thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff changeset
363 atexit.register(_python_exit)