Mercurial > public > mercurial-scm > hg-stable
diff rust/hg-core/src/update.rs @ 52313:65d516db7309
branching: merge stable into default
author | Rapha?l Gom?s <rgomes@octobus.net> |
---|---|
date | Thu, 14 Nov 2024 16:45:23 +0100 |
parents | bd8081e9fd62 96b113d22b34 |
children | db065b33fa56 |
line wrap: on
line diff
--- a/rust/hg-core/src/update.rs Mon Oct 21 12:58:40 2024 +0200 +++ b/rust/hg-core/src/update.rs Thu Nov 14 16:45:23 2024 +0100 @@ -5,6 +5,7 @@ io::Write, os::unix::fs::{MetadataExt, PermissionsExt}, path::Path, + sync::atomic::Ordering, time::Duration, }; @@ -24,12 +25,13 @@ revlog::RevlogError, sparse, utils::{ + cap_default_rayon_threads, files::{filesystem_now, get_path_from_bytes}, hg_path::{hg_path_to_path_buf, HgPath, HgPathError}, path_auditor::PathAuditor, }, vfs::{is_on_nfs_mount, VfsImpl}, - DirstateParents, UncheckedRevision, + DirstateParents, UncheckedRevision, INTERRUPT_RECEIVED, }; use crossbeam_channel::{Receiver, Sender}; use rayon::prelude::*; @@ -50,6 +52,7 @@ repo: &Repo, to: UncheckedRevision, progress: &dyn Progress, + workers: Option<usize>, ) -> Result<usize, HgError> { // Ignore the warnings, they've been displayed by Python already // TODO non-Python clients: display narrow warnings @@ -103,6 +106,15 @@ let chunks = chunk_tracked_files(tracked_files); progress.update(0, Some(files_count as u64)); + // TODO find a way (with `nix` or `signal-hook`?) of resetting the + // previous signal handler directly after. Currently, this is Python's + // job, but: + // - it introduces a (small) race between catching and resetting + // - it would break signal handlers in other contexts like `rhg`` + let _ = ctrlc::set_handler(|| { + INTERRUPT_RECEIVED.store(true, Ordering::Relaxed) + }); + create_working_copy( chunks, working_directory_path, @@ -111,8 +123,15 @@ files_sender, errors_sender, progress, + workers, ); + // Reset the global interrupt now that we're done + if INTERRUPT_RECEIVED.swap(false, Ordering::Relaxed) { + // The threads have all exited early, let's re-raise + return Err(HgError::InterruptReceived); + } + let errors: Vec<HgError> = errors_receiver.iter().collect(); if !errors.is_empty() { log::debug!("{} errors during update (see trace logs)", errors.len()); @@ -182,6 +201,7 @@ } #[logging_timer::time("trace")] +#[allow(clippy::too_many_arguments)] fn create_working_copy<'a: 'b, 'b>( chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>, working_directory_path: &Path, @@ -190,9 +210,11 @@ files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>, error_sender: Sender<HgError>, progress: &dyn Progress, + workers: Option<usize>, ) { let auditor = PathAuditor::new(working_directory_path); - chunks.into_par_iter().for_each(|(dir_path, chunk)| { + + let work_closure = |(dir_path, chunk)| -> Result<(), HgError> { if let Err(e) = working_copy_worker( dir_path, chunk, @@ -207,7 +229,37 @@ .send(e) .expect("channel should not be disconnected") } - }); + Ok(()) + }; + if let Some(workers) = workers { + if workers > 1 { + // Work in parallel, potentially restricting the number of threads + match rayon::ThreadPoolBuilder::new().num_threads(workers).build() + { + Err(error) => error_sender + .send(HgError::abort( + error.to_string(), + exit_codes::ABORT, + None, + )) + .expect("channel should not be disconnected"), + Ok(pool) => { + log::trace!("restricting update to {} threads", workers); + pool.install(|| { + let _ = + chunks.into_par_iter().try_for_each(work_closure); + }); + } + } + } else { + // Work sequentially, don't even invoke rayon + let _ = chunks.into_iter().try_for_each(work_closure); + } + } else { + // Work in parallel by default in the global threadpool + let _ = cap_default_rayon_threads(); + let _ = chunks.into_par_iter().try_for_each(work_closure); + } } /// Represents a work unit for a single thread, responsible for this set of @@ -228,6 +280,11 @@ let dir_path = working_directory_path.join(dir_path); std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?; + if INTERRUPT_RECEIVED.load(Ordering::Relaxed) { + // Stop working, the user has requested that we stop + return Err(HgError::InterruptReceived); + } + for (file, file_node, flags) in chunk { auditor.audit_path(file)?; let flags = flags.map(|f| f.into());