--- a/rust/hg-core/src/update.rs Mon Oct 21 12:58:40 2024 +0200
+++ b/rust/hg-core/src/update.rs Thu Nov 14 16:45:23 2024 +0100
@@ -5,6 +5,7 @@
io::Write,
os::unix::fs::{MetadataExt, PermissionsExt},
path::Path,
+ sync::atomic::Ordering,
time::Duration,
};
@@ -24,12 +25,13 @@
revlog::RevlogError,
sparse,
utils::{
+ cap_default_rayon_threads,
files::{filesystem_now, get_path_from_bytes},
hg_path::{hg_path_to_path_buf, HgPath, HgPathError},
path_auditor::PathAuditor,
},
vfs::{is_on_nfs_mount, VfsImpl},
- DirstateParents, UncheckedRevision,
+ DirstateParents, UncheckedRevision, INTERRUPT_RECEIVED,
};
use crossbeam_channel::{Receiver, Sender};
use rayon::prelude::*;
@@ -50,6 +52,7 @@
repo: &Repo,
to: UncheckedRevision,
progress: &dyn Progress,
+ workers: Option<usize>,
) -> Result<usize, HgError> {
// Ignore the warnings, they've been displayed by Python already
// TODO non-Python clients: display narrow warnings
@@ -103,6 +106,15 @@
let chunks = chunk_tracked_files(tracked_files);
progress.update(0, Some(files_count as u64));
+ // TODO find a way (with `nix` or `signal-hook`?) of resetting the
+ // previous signal handler directly after. Currently, this is Python's
+ // job, but:
+ // - it introduces a (small) race between catching and resetting
+ // - it would break signal handlers in other contexts like `rhg``
+ let _ = ctrlc::set_handler(|| {
+ INTERRUPT_RECEIVED.store(true, Ordering::Relaxed)
+ });
+
create_working_copy(
chunks,
working_directory_path,
@@ -111,8 +123,15 @@
files_sender,
errors_sender,
progress,
+ workers,
);
+ // Reset the global interrupt now that we're done
+ if INTERRUPT_RECEIVED.swap(false, Ordering::Relaxed) {
+ // The threads have all exited early, let's re-raise
+ return Err(HgError::InterruptReceived);
+ }
+
let errors: Vec<HgError> = errors_receiver.iter().collect();
if !errors.is_empty() {
log::debug!("{} errors during update (see trace logs)", errors.len());
@@ -182,6 +201,7 @@
}
#[logging_timer::time("trace")]
+#[allow(clippy::too_many_arguments)]
fn create_working_copy<'a: 'b, 'b>(
chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>,
working_directory_path: &Path,
@@ -190,9 +210,11 @@
files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>,
error_sender: Sender<HgError>,
progress: &dyn Progress,
+ workers: Option<usize>,
) {
let auditor = PathAuditor::new(working_directory_path);
- chunks.into_par_iter().for_each(|(dir_path, chunk)| {
+
+ let work_closure = |(dir_path, chunk)| -> Result<(), HgError> {
if let Err(e) = working_copy_worker(
dir_path,
chunk,
@@ -207,7 +229,37 @@
.send(e)
.expect("channel should not be disconnected")
}
- });
+ Ok(())
+ };
+ if let Some(workers) = workers {
+ if workers > 1 {
+ // Work in parallel, potentially restricting the number of threads
+ match rayon::ThreadPoolBuilder::new().num_threads(workers).build()
+ {
+ Err(error) => error_sender
+ .send(HgError::abort(
+ error.to_string(),
+ exit_codes::ABORT,
+ None,
+ ))
+ .expect("channel should not be disconnected"),
+ Ok(pool) => {
+ log::trace!("restricting update to {} threads", workers);
+ pool.install(|| {
+ let _ =
+ chunks.into_par_iter().try_for_each(work_closure);
+ });
+ }
+ }
+ } else {
+ // Work sequentially, don't even invoke rayon
+ let _ = chunks.into_iter().try_for_each(work_closure);
+ }
+ } else {
+ // Work in parallel by default in the global threadpool
+ let _ = cap_default_rayon_threads();
+ let _ = chunks.into_par_iter().try_for_each(work_closure);
+ }
}
/// Represents a work unit for a single thread, responsible for this set of
@@ -228,6 +280,11 @@
let dir_path = working_directory_path.join(dir_path);
std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?;
+ if INTERRUPT_RECEIVED.load(Ordering::Relaxed) {
+ // Stop working, the user has requested that we stop
+ return Err(HgError::InterruptReceived);
+ }
+
for (file, file_node, flags) in chunk {
auditor.audit_path(file)?;
let flags = flags.map(|f| f.into());