mercurial/thirdparty/concurrent/futures/process.py
changeset 48836 e020dbbc41e5
parent 48835 a0da5075bca3
child 48837 12cba4886e90
--- a/mercurial/thirdparty/concurrent/futures/process.py	Wed Mar 02 10:24:49 2022 -0500
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,365 +0,0 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved.
-# Licensed to PSF under a Contributor Agreement.
-
-"""Implements ProcessPoolExecutor.
-
-The follow diagram and text describe the data-flow through the system:
-
-|======================= In-process =====================|== Out-of-process ==|
-
-+----------+     +----------+       +--------+     +-----------+    +---------+
-|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
-|          |     +----------+       |        |     +-----------+    |         |
-|          |     | ...      |       |        |     | ...       |    |         |
-|          |     | 6        |       |        |     | 5, call() |    |         |
-|          |     | 7        |       |        |     | ...       |    |         |
-| Process  |     | ...      |       | Local  |     +-----------+    | Process |
-|  Pool    |     +----------+       | Worker |                      |  #1..n  |
-| Executor |                        | Thread |                      |         |
-|          |     +----------- +     |        |     +-----------+    |         |
-|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
-|          |     +------------+     |        |     +-----------+    |         |
-|          |     | 6: call()  |     |        |     | ...       |    |         |
-|          |     |    future  |     |        |     | 4, result |    |         |
-|          |     | ...        |     |        |     | 3, except |    |         |
-+----------+     +------------+     +--------+     +-----------+    +---------+
-
-Executor.submit() called:
-- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
-- adds the id of the _WorkItem to the "Work Ids" queue
-
-Local worker thread:
-- reads work ids from the "Work Ids" queue and looks up the corresponding
-  WorkItem from the "Work Items" dict: if the work item has been cancelled then
-  it is simply removed from the dict, otherwise it is repackaged as a
-  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
-  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
-  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
-- reads _ResultItems from "Result Q", updates the future stored in the
-  "Work Items" dict and deletes the dict entry
-
-Process #1..n:
-- reads _CallItems from "Call Q", executes the calls, and puts the resulting
-  _ResultItems in "Request Q"
-"""
-
-from __future__ import absolute_import
-
-import atexit
-from . import _base
-import Queue as queue
-import multiprocessing
-import threading
-import weakref
-import sys
-
-__author__ = 'Brian Quinlan (brian@sweetapp.com)'
-
-# Workers are created as daemon threads and processes. This is done to allow the
-# interpreter to exit when there are still idle processes in a
-# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
-# allowing workers to die with the interpreter has two undesirable properties:
-#   - The workers would still be running during interpretor shutdown,
-#     meaning that they would fail in unpredictable ways.
-#   - The workers could be killed while evaluating a work item, which could
-#     be bad if the callable being evaluated has external side-effects e.g.
-#     writing to a file.
-#
-# To work around this problem, an exit handler is installed which tells the
-# workers to exit when their work queues are empty and then waits until the
-# threads/processes finish.
-
-_threads_queues = weakref.WeakKeyDictionary()
-_shutdown = False
-
-def _python_exit():
-    global _shutdown
-    _shutdown = True
-    items = list(_threads_queues.items()) if _threads_queues else ()
-    for t, q in items:
-        q.put(None)
-    for t, q in items:
-        t.join(sys.maxint)
-
-# Controls how many more calls than processes will be queued in the call queue.
-# A smaller number will mean that processes spend more time idle waiting for
-# work while a larger number will make Future.cancel() succeed less frequently
-# (Futures in the call queue cannot be cancelled).
-EXTRA_QUEUED_CALLS = 1
-
-class _WorkItem(object):
-    def __init__(self, future, fn, args, kwargs):
-        self.future = future
-        self.fn = fn
-        self.args = args
-        self.kwargs = kwargs
-
-class _ResultItem(object):
-    def __init__(self, work_id, exception=None, result=None):
-        self.work_id = work_id
-        self.exception = exception
-        self.result = result
-
-class _CallItem(object):
-    def __init__(self, work_id, fn, args, kwargs):
-        self.work_id = work_id
-        self.fn = fn
-        self.args = args
-        self.kwargs = kwargs
-
-def _process_worker(call_queue, result_queue):
-    """Evaluates calls from call_queue and places the results in result_queue.
-
-    This worker is run in a separate process.
-
-    Args:
-        call_queue: A multiprocessing.Queue of _CallItems that will be read and
-            evaluated by the worker.
-        result_queue: A multiprocessing.Queue of _ResultItems that will written
-            to by the worker.
-        shutdown: A multiprocessing.Event that will be set as a signal to the
-            worker that it should exit when call_queue is empty.
-    """
-    while True:
-        call_item = call_queue.get(block=True)
-        if call_item is None:
-            # Wake up queue management thread
-            result_queue.put(None)
-            return
-        try:
-            r = call_item.fn(*call_item.args, **call_item.kwargs)
-        except:
-            e = sys.exc_info()[1]
-            result_queue.put(_ResultItem(call_item.work_id,
-                                         exception=e))
-        else:
-            result_queue.put(_ResultItem(call_item.work_id,
-                                         result=r))
-
-def _add_call_item_to_queue(pending_work_items,
-                            work_ids,
-                            call_queue):
-    """Fills call_queue with _WorkItems from pending_work_items.
-
-    This function never blocks.
-
-    Args:
-        pending_work_items: A dict mapping work ids to _WorkItems e.g.
-            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
-        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
-            are consumed and the corresponding _WorkItems from
-            pending_work_items are transformed into _CallItems and put in
-            call_queue.
-        call_queue: A multiprocessing.Queue that will be filled with _CallItems
-            derived from _WorkItems.
-    """
-    while True:
-        if call_queue.full():
-            return
-        try:
-            work_id = work_ids.get(block=False)
-        except queue.Empty:
-            return
-        else:
-            work_item = pending_work_items[work_id]
-
-            if work_item.future.set_running_or_notify_cancel():
-                call_queue.put(_CallItem(work_id,
-                                         work_item.fn,
-                                         work_item.args,
-                                         work_item.kwargs),
-                               block=True)
-            else:
-                del pending_work_items[work_id]
-                continue
-
-def _queue_management_worker(executor_reference,
-                             processes,
-                             pending_work_items,
-                             work_ids_queue,
-                             call_queue,
-                             result_queue):
-    """Manages the communication between this process and the worker processes.
-
-    This function is run in a local thread.
-
-    Args:
-        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
-            this thread. Used to determine if the ProcessPoolExecutor has been
-            garbage collected and that this function can exit.
-        process: A list of the multiprocessing.Process instances used as
-            workers.
-        pending_work_items: A dict mapping work ids to _WorkItems e.g.
-            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
-        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
-        call_queue: A multiprocessing.Queue that will be filled with _CallItems
-            derived from _WorkItems for processing by the process workers.
-        result_queue: A multiprocessing.Queue of _ResultItems generated by the
-            process workers.
-    """
-    nb_shutdown_processes = [0]
-    def shutdown_one_process():
-        """Tell a worker to terminate, which will in turn wake us again"""
-        call_queue.put(None)
-        nb_shutdown_processes[0] += 1
-    while True:
-        _add_call_item_to_queue(pending_work_items,
-                                work_ids_queue,
-                                call_queue)
-
-        result_item = result_queue.get(block=True)
-        if result_item is not None:
-            work_item = pending_work_items[result_item.work_id]
-            del pending_work_items[result_item.work_id]
-
-            if result_item.exception:
-                work_item.future.set_exception(result_item.exception)
-            else:
-                work_item.future.set_result(result_item.result)
-            # Delete references to object. See issue16284
-            del work_item
-        # Check whether we should start shutting down.
-        executor = executor_reference()
-        # No more work items can be added if:
-        #   - The interpreter is shutting down OR
-        #   - The executor that owns this worker has been collected OR
-        #   - The executor that owns this worker has been shutdown.
-        if _shutdown or executor is None or executor._shutdown_thread:
-            # Since no new work items can be added, it is safe to shutdown
-            # this thread if there are no pending work items.
-            if not pending_work_items:
-                while nb_shutdown_processes[0] < len(processes):
-                    shutdown_one_process()
-                # If .join() is not called on the created processes then
-                # some multiprocessing.Queue methods may deadlock on Mac OS
-                # X.
-                for p in processes:
-                    p.join()
-                call_queue.close()
-                return
-        del executor
-
-_system_limits_checked = False
-_system_limited = None
-def _check_system_limits():
-    global _system_limits_checked, _system_limited
-    if _system_limits_checked:
-        if _system_limited:
-            raise NotImplementedError(_system_limited)
-    _system_limits_checked = True
-    try:
-        import os
-        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
-    except (AttributeError, ValueError):
-        # sysconf not available or setting not available
-        return
-    if nsems_max == -1:
-        # indetermine limit, assume that limit is determined
-        # by available memory only
-        return
-    if nsems_max >= 256:
-        # minimum number of semaphores available
-        # according to POSIX
-        return
-    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
-    raise NotImplementedError(_system_limited)
-
-
-class ProcessPoolExecutor(_base.Executor):
-    def __init__(self, max_workers=None):
-        """Initializes a new ProcessPoolExecutor instance.
-
-        Args:
-            max_workers: The maximum number of processes that can be used to
-                execute the given calls. If None or not given then as many
-                worker processes will be created as the machine has processors.
-        """
-        _check_system_limits()
-
-        if max_workers is None:
-            self._max_workers = multiprocessing.cpu_count()
-        else:
-            if max_workers <= 0:
-                raise ValueError("max_workers must be greater than 0")
-
-            self._max_workers = max_workers
-
-        # Make the call queue slightly larger than the number of processes to
-        # prevent the worker processes from idling. But don't make it too big
-        # because futures in the call queue cannot be cancelled.
-        self._call_queue = multiprocessing.Queue(self._max_workers +
-                                                 EXTRA_QUEUED_CALLS)
-        self._result_queue = multiprocessing.Queue()
-        self._work_ids = queue.Queue()
-        self._queue_management_thread = None
-        self._processes = set()
-
-        # Shutdown is a two-step process.
-        self._shutdown_thread = False
-        self._shutdown_lock = threading.Lock()
-        self._queue_count = 0
-        self._pending_work_items = {}
-
-    def _start_queue_management_thread(self):
-        # When the executor gets lost, the weakref callback will wake up
-        # the queue management thread.
-        def weakref_cb(_, q=self._result_queue):
-            q.put(None)
-        if self._queue_management_thread is None:
-            self._queue_management_thread = threading.Thread(
-                    target=_queue_management_worker,
-                    args=(weakref.ref(self, weakref_cb),
-                          self._processes,
-                          self._pending_work_items,
-                          self._work_ids,
-                          self._call_queue,
-                          self._result_queue))
-            self._queue_management_thread.daemon = True
-            self._queue_management_thread.start()
-            _threads_queues[self._queue_management_thread] = self._result_queue
-
-    def _adjust_process_count(self):
-        for _ in range(len(self._processes), self._max_workers):
-            p = multiprocessing.Process(
-                    target=_process_worker,
-                    args=(self._call_queue,
-                          self._result_queue))
-            p.start()
-            self._processes.add(p)
-
-    def submit(self, fn, *args, **kwargs):
-        with self._shutdown_lock:
-            if self._shutdown_thread:
-                raise RuntimeError('cannot schedule new futures after shutdown')
-
-            f = _base.Future()
-            w = _WorkItem(f, fn, args, kwargs)
-
-            self._pending_work_items[self._queue_count] = w
-            self._work_ids.put(self._queue_count)
-            self._queue_count += 1
-            # Wake up queue management thread
-            self._result_queue.put(None)
-
-            self._start_queue_management_thread()
-            self._adjust_process_count()
-            return f
-    submit.__doc__ = _base.Executor.submit.__doc__
-
-    def shutdown(self, wait=True):
-        with self._shutdown_lock:
-            self._shutdown_thread = True
-        if self._queue_management_thread:
-            # Wake up queue management thread
-            self._result_queue.put(None)
-            if wait:
-                self._queue_management_thread.join(sys.maxint)
-        # To reduce the risk of openning too many files, remove references to
-        # objects that use file descriptors.
-        self._queue_management_thread = None
-        self._call_queue = None
-        self._result_queue = None
-        self._processes = None
-    shutdown.__doc__ = _base.Executor.shutdown.__doc__
-
-atexit.register(_python_exit)