Mercurial > public > mercurial-scm > hg-stable
annotate mercurial/thirdparty/concurrent/futures/process.py @ 37623:eb687c28a915
thirdparty: vendor futures 3.2.0
Python 3 has a concurrent.futures package in the standard library
for representing futures. The "futures" package on PyPI is a backport
of this package to work with Python 2.
The wire protocol code today has its own future concept for handling
of "batch" requests. The frame-based protocol will also want to
use futures.
I've heavily used the "futures" package on Python 2 in other projects
and it is pretty nice. It even has a built-in thread and process pool
for running functions in parallel. I've used this heavily for concurrent
I/O and other GIL-less activities.
The existing futures API in the wire protocol code is not as nice as
concurrent.futures. Since concurrent.futures is in the Python standard
library and will presumably be the long-term future for futures in our
code base, let's vendor the backport so we can use proper futures today.
# no-check-commit because of style violations
Differential Revision: https://phab.mercurial-scm.org/D3261
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 11 Apr 2018 14:48:24 -0700 |
parents | |
children | 0a9c0d3480b2 |
rev | line source |
---|---|
37623
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
1 # Copyright 2009 Brian Quinlan. All Rights Reserved. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
2 # Licensed to PSF under a Contributor Agreement. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
3 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
4 """Implements ProcessPoolExecutor. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
5 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
6 The follow diagram and text describe the data-flow through the system: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
7 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
8 |======================= In-process =====================|== Out-of-process ==| |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
9 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
10 +----------+ +----------+ +--------+ +-----------+ +---------+ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
11 | | => | Work Ids | => | | => | Call Q | => | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
12 | | +----------+ | | +-----------+ | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
13 | | | ... | | | | ... | | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
14 | | | 6 | | | | 5, call() | | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
15 | | | 7 | | | | ... | | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
16 | Process | | ... | | Local | +-----------+ | Process | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
17 | Pool | +----------+ | Worker | | #1..n | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
18 | Executor | | Thread | | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
19 | | +----------- + | | +-----------+ | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
20 | | <=> | Work Items | <=> | | <= | Result Q | <= | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
21 | | +------------+ | | +-----------+ | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
22 | | | 6: call() | | | | ... | | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
23 | | | future | | | | 4, result | | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
24 | | | ... | | | | 3, except | | | |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
25 +----------+ +------------+ +--------+ +-----------+ +---------+ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
26 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
27 Executor.submit() called: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
29 - adds the id of the _WorkItem to the "Work Ids" queue |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
30 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
31 Local worker thread: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
32 - reads work ids from the "Work Ids" queue and looks up the corresponding |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
34 it is simply removed from the dict, otherwise it is repackaged as a |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
38 - reads _ResultItems from "Result Q", updates the future stored in the |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
39 "Work Items" dict and deletes the dict entry |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
40 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
41 Process #1..n: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
43 _ResultItems in "Request Q" |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
44 """ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
45 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
46 import atexit |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
47 from concurrent.futures import _base |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
48 import Queue as queue |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
49 import multiprocessing |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
50 import threading |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
51 import weakref |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
52 import sys |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
53 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
54 __author__ = 'Brian Quinlan (brian@sweetapp.com)' |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
55 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
56 # Workers are created as daemon threads and processes. This is done to allow the |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
57 # interpreter to exit when there are still idle processes in a |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
58 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
59 # allowing workers to die with the interpreter has two undesirable properties: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
60 # - The workers would still be running during interpretor shutdown, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
61 # meaning that they would fail in unpredictable ways. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
62 # - The workers could be killed while evaluating a work item, which could |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
63 # be bad if the callable being evaluated has external side-effects e.g. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
64 # writing to a file. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
65 # |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
66 # To work around this problem, an exit handler is installed which tells the |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
67 # workers to exit when their work queues are empty and then waits until the |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
68 # threads/processes finish. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
69 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
70 _threads_queues = weakref.WeakKeyDictionary() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
71 _shutdown = False |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
72 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
73 def _python_exit(): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
74 global _shutdown |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
75 _shutdown = True |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
76 items = list(_threads_queues.items()) if _threads_queues else () |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
77 for t, q in items: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
78 q.put(None) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
79 for t, q in items: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
80 t.join(sys.maxint) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
81 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
82 # Controls how many more calls than processes will be queued in the call queue. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
83 # A smaller number will mean that processes spend more time idle waiting for |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
84 # work while a larger number will make Future.cancel() succeed less frequently |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
85 # (Futures in the call queue cannot be cancelled). |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
86 EXTRA_QUEUED_CALLS = 1 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
87 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
88 class _WorkItem(object): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
89 def __init__(self, future, fn, args, kwargs): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
90 self.future = future |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
91 self.fn = fn |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
92 self.args = args |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
93 self.kwargs = kwargs |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
94 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
95 class _ResultItem(object): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
96 def __init__(self, work_id, exception=None, result=None): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
97 self.work_id = work_id |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
98 self.exception = exception |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
99 self.result = result |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
100 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
101 class _CallItem(object): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
102 def __init__(self, work_id, fn, args, kwargs): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
103 self.work_id = work_id |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
104 self.fn = fn |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
105 self.args = args |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
106 self.kwargs = kwargs |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
107 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
108 def _process_worker(call_queue, result_queue): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
109 """Evaluates calls from call_queue and places the results in result_queue. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
110 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
111 This worker is run in a separate process. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
112 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
113 Args: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
114 call_queue: A multiprocessing.Queue of _CallItems that will be read and |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
115 evaluated by the worker. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
116 result_queue: A multiprocessing.Queue of _ResultItems that will written |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
117 to by the worker. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
118 shutdown: A multiprocessing.Event that will be set as a signal to the |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
119 worker that it should exit when call_queue is empty. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
120 """ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
121 while True: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
122 call_item = call_queue.get(block=True) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
123 if call_item is None: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
124 # Wake up queue management thread |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
125 result_queue.put(None) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
126 return |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
127 try: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
128 r = call_item.fn(*call_item.args, **call_item.kwargs) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
129 except: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
130 e = sys.exc_info()[1] |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
131 result_queue.put(_ResultItem(call_item.work_id, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
132 exception=e)) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
133 else: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
134 result_queue.put(_ResultItem(call_item.work_id, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
135 result=r)) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
136 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
137 def _add_call_item_to_queue(pending_work_items, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
138 work_ids, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
139 call_queue): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
140 """Fills call_queue with _WorkItems from pending_work_items. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
141 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
142 This function never blocks. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
143 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
144 Args: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
145 pending_work_items: A dict mapping work ids to _WorkItems e.g. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
146 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
147 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
148 are consumed and the corresponding _WorkItems from |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
149 pending_work_items are transformed into _CallItems and put in |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
150 call_queue. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
151 call_queue: A multiprocessing.Queue that will be filled with _CallItems |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
152 derived from _WorkItems. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
153 """ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
154 while True: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
155 if call_queue.full(): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
156 return |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
157 try: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
158 work_id = work_ids.get(block=False) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
159 except queue.Empty: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
160 return |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
161 else: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
162 work_item = pending_work_items[work_id] |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
163 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
164 if work_item.future.set_running_or_notify_cancel(): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
165 call_queue.put(_CallItem(work_id, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
166 work_item.fn, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
167 work_item.args, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
168 work_item.kwargs), |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
169 block=True) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
170 else: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
171 del pending_work_items[work_id] |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
172 continue |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
173 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
174 def _queue_management_worker(executor_reference, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
175 processes, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
176 pending_work_items, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
177 work_ids_queue, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
178 call_queue, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
179 result_queue): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
180 """Manages the communication between this process and the worker processes. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
181 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
182 This function is run in a local thread. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
183 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
184 Args: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
185 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
186 this thread. Used to determine if the ProcessPoolExecutor has been |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
187 garbage collected and that this function can exit. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
188 process: A list of the multiprocessing.Process instances used as |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
189 workers. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
190 pending_work_items: A dict mapping work ids to _WorkItems e.g. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
191 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
192 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
193 call_queue: A multiprocessing.Queue that will be filled with _CallItems |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
194 derived from _WorkItems for processing by the process workers. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
195 result_queue: A multiprocessing.Queue of _ResultItems generated by the |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
196 process workers. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
197 """ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
198 nb_shutdown_processes = [0] |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
199 def shutdown_one_process(): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
200 """Tell a worker to terminate, which will in turn wake us again""" |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
201 call_queue.put(None) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
202 nb_shutdown_processes[0] += 1 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
203 while True: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
204 _add_call_item_to_queue(pending_work_items, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
205 work_ids_queue, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
206 call_queue) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
207 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
208 result_item = result_queue.get(block=True) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
209 if result_item is not None: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
210 work_item = pending_work_items[result_item.work_id] |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
211 del pending_work_items[result_item.work_id] |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
212 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
213 if result_item.exception: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
214 work_item.future.set_exception(result_item.exception) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
215 else: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
216 work_item.future.set_result(result_item.result) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
217 # Delete references to object. See issue16284 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
218 del work_item |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
219 # Check whether we should start shutting down. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
220 executor = executor_reference() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
221 # No more work items can be added if: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
222 # - The interpreter is shutting down OR |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
223 # - The executor that owns this worker has been collected OR |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
224 # - The executor that owns this worker has been shutdown. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
225 if _shutdown or executor is None or executor._shutdown_thread: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
226 # Since no new work items can be added, it is safe to shutdown |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
227 # this thread if there are no pending work items. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
228 if not pending_work_items: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
229 while nb_shutdown_processes[0] < len(processes): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
230 shutdown_one_process() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
231 # If .join() is not called on the created processes then |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
232 # some multiprocessing.Queue methods may deadlock on Mac OS |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
233 # X. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
234 for p in processes: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
235 p.join() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
236 call_queue.close() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
237 return |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
238 del executor |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
239 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
240 _system_limits_checked = False |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
241 _system_limited = None |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
242 def _check_system_limits(): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
243 global _system_limits_checked, _system_limited |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
244 if _system_limits_checked: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
245 if _system_limited: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
246 raise NotImplementedError(_system_limited) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
247 _system_limits_checked = True |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
248 try: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
249 import os |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
250 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
251 except (AttributeError, ValueError): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
252 # sysconf not available or setting not available |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
253 return |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
254 if nsems_max == -1: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
255 # indetermine limit, assume that limit is determined |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
256 # by available memory only |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
257 return |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
258 if nsems_max >= 256: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
259 # minimum number of semaphores available |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
260 # according to POSIX |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
261 return |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
262 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
263 raise NotImplementedError(_system_limited) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
264 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
265 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
266 class ProcessPoolExecutor(_base.Executor): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
267 def __init__(self, max_workers=None): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
268 """Initializes a new ProcessPoolExecutor instance. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
269 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
270 Args: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
271 max_workers: The maximum number of processes that can be used to |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
272 execute the given calls. If None or not given then as many |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
273 worker processes will be created as the machine has processors. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
274 """ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
275 _check_system_limits() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
276 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
277 if max_workers is None: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
278 self._max_workers = multiprocessing.cpu_count() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
279 else: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
280 if max_workers <= 0: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
281 raise ValueError("max_workers must be greater than 0") |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
282 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
283 self._max_workers = max_workers |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
284 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
285 # Make the call queue slightly larger than the number of processes to |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
286 # prevent the worker processes from idling. But don't make it too big |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
287 # because futures in the call queue cannot be cancelled. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
288 self._call_queue = multiprocessing.Queue(self._max_workers + |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
289 EXTRA_QUEUED_CALLS) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
290 self._result_queue = multiprocessing.Queue() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
291 self._work_ids = queue.Queue() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
292 self._queue_management_thread = None |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
293 self._processes = set() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
294 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
295 # Shutdown is a two-step process. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
296 self._shutdown_thread = False |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
297 self._shutdown_lock = threading.Lock() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
298 self._queue_count = 0 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
299 self._pending_work_items = {} |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
300 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
301 def _start_queue_management_thread(self): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
302 # When the executor gets lost, the weakref callback will wake up |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
303 # the queue management thread. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
304 def weakref_cb(_, q=self._result_queue): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
305 q.put(None) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
306 if self._queue_management_thread is None: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
307 self._queue_management_thread = threading.Thread( |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
308 target=_queue_management_worker, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
309 args=(weakref.ref(self, weakref_cb), |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
310 self._processes, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
311 self._pending_work_items, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
312 self._work_ids, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
313 self._call_queue, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
314 self._result_queue)) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
315 self._queue_management_thread.daemon = True |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
316 self._queue_management_thread.start() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
317 _threads_queues[self._queue_management_thread] = self._result_queue |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
318 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
319 def _adjust_process_count(self): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
320 for _ in range(len(self._processes), self._max_workers): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
321 p = multiprocessing.Process( |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
322 target=_process_worker, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
323 args=(self._call_queue, |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
324 self._result_queue)) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
325 p.start() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
326 self._processes.add(p) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
327 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
328 def submit(self, fn, *args, **kwargs): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
329 with self._shutdown_lock: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
330 if self._shutdown_thread: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
331 raise RuntimeError('cannot schedule new futures after shutdown') |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
332 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
333 f = _base.Future() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
334 w = _WorkItem(f, fn, args, kwargs) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
335 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
336 self._pending_work_items[self._queue_count] = w |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
337 self._work_ids.put(self._queue_count) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
338 self._queue_count += 1 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
339 # Wake up queue management thread |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
340 self._result_queue.put(None) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
341 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
342 self._start_queue_management_thread() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
343 self._adjust_process_count() |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
344 return f |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
345 submit.__doc__ = _base.Executor.submit.__doc__ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
346 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
347 def shutdown(self, wait=True): |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
348 with self._shutdown_lock: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
349 self._shutdown_thread = True |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
350 if self._queue_management_thread: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
351 # Wake up queue management thread |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
352 self._result_queue.put(None) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
353 if wait: |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
354 self._queue_management_thread.join(sys.maxint) |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
355 # To reduce the risk of openning too many files, remove references to |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
356 # objects that use file descriptors. |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
357 self._queue_management_thread = None |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
358 self._call_queue = None |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
359 self._result_queue = None |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
360 self._processes = None |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
361 shutdown.__doc__ = _base.Executor.shutdown.__doc__ |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
362 |
eb687c28a915
thirdparty: vendor futures 3.2.0
Gregory Szorc <gregory.szorc@gmail.com>
parents:
diff
changeset
|
363 atexit.register(_python_exit) |