diff mercurial/worker.py @ 49537:3556f0392808 stable

lfs: avoid closing connections when the worker doesn't fork Probably not much more than an minor optimization, but could be useful in the case of `hg verify` where missing blobs are fetched one at a time.
author Matt Harbison <matt_harbison@yahoo.com>
date Tue, 18 Oct 2022 13:56:45 -0400
parents d54b213c4380
children 3eef8baf6b92
line wrap: on
line diff
--- a/mercurial/worker.py	Tue Oct 18 13:36:33 2022 -0400
+++ b/mercurial/worker.py	Tue Oct 18 13:56:45 2022 -0400
@@ -125,7 +125,14 @@
 
 
 def worker(
-    ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
+    ui,
+    costperarg,
+    func,
+    staticargs,
+    args,
+    hasretval=False,
+    threadsafe=True,
+    prefork=None,
 ):
     """run a function, possibly in parallel in multiple worker
     processes.
@@ -149,6 +156,10 @@
     threadsafe - whether work items are thread safe and can be executed using
     a thread-based worker. Should be disabled for CPU heavy tasks that don't
     release the GIL.
+
+    prefork - a parameterless Callable that is invoked prior to forking the
+    process.  fork() is only used on non-Windows platforms, but is also not
+    called on POSIX platforms if the work amount doesn't warrant a worker.
     """
     enabled = ui.configbool(b'worker', b'enabled')
     if enabled and _platformworker is _posixworker and not ismainthread():
@@ -157,11 +168,13 @@
         enabled = False
 
     if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
-        return _platformworker(ui, func, staticargs, args, hasretval)
+        return _platformworker(
+            ui, func, staticargs, args, hasretval, prefork=prefork
+        )
     return func(*staticargs + (args,))
 
 
-def _posixworker(ui, func, staticargs, args, hasretval):
+def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -207,6 +220,10 @@
     parentpid = os.getpid()
     pipes = []
     retval = {}
+
+    if prefork:
+        prefork()
+
     for pargs in partition(args, min(workers, len(args))):
         # Every worker gets its own pipe to send results on, so we don't have to
         # implement atomic writes larger than PIPE_BUF. Each forked process has
@@ -316,7 +333,7 @@
         return -(os.WTERMSIG(code))
 
 
-def _windowsworker(ui, func, staticargs, args, hasretval):
+def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
     class Worker(threading.Thread):
         def __init__(
             self, taskqueue, resultqueue, func, staticargs, *args, **kwargs