Mercurial > public > mercurial-scm > hg-stable
diff rust/hg-core/src/update.rs @ 52156:e6a44bc91bc2 stable
rust-update: make `update_from_null` respect `worker.numcpu` config option
This was overlooked in the original series.
This is important for tests (because we run many at once), and for the
occasional end user that wants to keep their CPU usage in check.
A future series should clean up this `worker` parameter tunelling business by
rewriting the config in Rust, but doing so on stable would be a very bad
idea.
author | Rapha?l Gom?s <rgomes@octobus.net> |
---|---|
date | Tue, 05 Nov 2024 15:21:09 +0100 |
parents | 8b7123c8947b |
children | 96b113d22b34 |
line wrap: on
line diff
--- a/rust/hg-core/src/update.rs Tue Nov 05 15:18:32 2024 +0100 +++ b/rust/hg-core/src/update.rs Tue Nov 05 15:21:09 2024 +0100 @@ -23,6 +23,7 @@ repo::Repo, sparse, utils::{ + cap_default_rayon_threads, files::{filesystem_now, get_path_from_bytes}, hg_path::{hg_path_to_path_buf, HgPath, HgPathError}, path_auditor::PathAuditor, @@ -49,6 +50,7 @@ repo: &Repo, to: UncheckedRevision, progress: &dyn Progress, + workers: Option<usize>, ) -> Result<usize, HgError> { // Ignore the warnings, they've been displayed by Python already // TODO non-Python clients: display narrow warnings @@ -106,6 +108,7 @@ files_sender, errors_sender, progress, + workers, ); let errors: Vec<HgError> = errors_receiver.iter().collect(); @@ -177,6 +180,7 @@ } #[logging_timer::time("trace")] +#[allow(clippy::too_many_arguments)] fn create_working_copy<'a: 'b, 'b>( chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>, working_directory_path: &Path, @@ -185,9 +189,10 @@ files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>, error_sender: Sender<HgError>, progress: &dyn Progress, + workers: Option<usize>, ) { let auditor = PathAuditor::new(working_directory_path); - chunks.into_par_iter().for_each(|(dir_path, chunk)| { + let work_closure = |(dir_path, chunk)| { if let Err(e) = working_copy_worker( dir_path, chunk, @@ -202,7 +207,35 @@ .send(e) .expect("channel should not be disconnected") } - }); + }; + if let Some(workers) = workers { + if workers > 1 { + // Work in parallel, potentially restricting the number of threads + match rayon::ThreadPoolBuilder::new().num_threads(workers).build() + { + Err(error) => error_sender + .send(HgError::abort( + error.to_string(), + exit_codes::ABORT, + None, + )) + .expect("channel should not be disconnected"), + Ok(pool) => { + log::trace!("restricting update to {} threads", workers); + pool.install(|| { + chunks.into_par_iter().for_each(work_closure); + }); + } + } + } else { + // Work sequentially, don't even invoke rayon + chunks.into_iter().for_each(work_closure); + } + } else { + // Work in parallel by default in the global threadpool + let _ = cap_default_rayon_threads(); + chunks.into_par_iter().for_each(work_closure); + } } /// Represents a work unit for a single thread, responsible for this set of