rust/hg-core/src/update.rs
branchstable
changeset 52186 e6a44bc91bc2
parent 52056 8b7123c8947b
child 52213 96b113d22b34
--- a/rust/hg-core/src/update.rs	Tue Nov 05 15:18:32 2024 +0100
+++ b/rust/hg-core/src/update.rs	Tue Nov 05 15:21:09 2024 +0100
@@ -23,6 +23,7 @@
     repo::Repo,
     sparse,
     utils::{
+        cap_default_rayon_threads,
         files::{filesystem_now, get_path_from_bytes},
         hg_path::{hg_path_to_path_buf, HgPath, HgPathError},
         path_auditor::PathAuditor,
@@ -49,6 +50,7 @@
     repo: &Repo,
     to: UncheckedRevision,
     progress: &dyn Progress,
+    workers: Option<usize>,
 ) -> Result<usize, HgError> {
     // Ignore the warnings, they've been displayed by Python already
     // TODO non-Python clients: display narrow warnings
@@ -106,6 +108,7 @@
         files_sender,
         errors_sender,
         progress,
+        workers,
     );
 
     let errors: Vec<HgError> = errors_receiver.iter().collect();
@@ -177,6 +180,7 @@
 }
 
 #[logging_timer::time("trace")]
+#[allow(clippy::too_many_arguments)]
 fn create_working_copy<'a: 'b, 'b>(
     chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>,
     working_directory_path: &Path,
@@ -185,9 +189,10 @@
     files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>,
     error_sender: Sender<HgError>,
     progress: &dyn Progress,
+    workers: Option<usize>,
 ) {
     let auditor = PathAuditor::new(working_directory_path);
-    chunks.into_par_iter().for_each(|(dir_path, chunk)| {
+    let work_closure = |(dir_path, chunk)| {
         if let Err(e) = working_copy_worker(
             dir_path,
             chunk,
@@ -202,7 +207,35 @@
                 .send(e)
                 .expect("channel should not be disconnected")
         }
-    });
+    };
+    if let Some(workers) = workers {
+        if workers > 1 {
+            // Work in parallel, potentially restricting the number of threads
+            match rayon::ThreadPoolBuilder::new().num_threads(workers).build()
+            {
+                Err(error) => error_sender
+                    .send(HgError::abort(
+                        error.to_string(),
+                        exit_codes::ABORT,
+                        None,
+                    ))
+                    .expect("channel should not be disconnected"),
+                Ok(pool) => {
+                    log::trace!("restricting update to {} threads", workers);
+                    pool.install(|| {
+                        chunks.into_par_iter().for_each(work_closure);
+                    });
+                }
+            }
+        } else {
+            // Work sequentially, don't even invoke rayon
+            chunks.into_iter().for_each(work_closure);
+        }
+    } else {
+        // Work in parallel by default in the global threadpool
+        let _ = cap_default_rayon_threads();
+        chunks.into_par_iter().for_each(work_closure);
+    }
 }
 
 /// Represents a work unit for a single thread, responsible for this set of