Mercurial > public > mercurial-scm > hg-stable
diff rust/hg-core/src/update.rs @ 52183:96b113d22b34 stable
rust-update: handle SIGINT from long-running update threads
The current code does not respond to ^C until after the Rust bit is finished
doing its work. This is expected, since Rust holds the GIL for the duration
of the call and does not call `PyErr_CheckSignals`. Freeing the GIL to do our
work does not really improve anything since the Rust threads are still going,
and the only way of cancelling a thread is by making it cooperate.
So we do the following:
- remember the SIGINT handler in hg-cpython and reset it after the call
into core (see inline comment in `update.rs` about this)
- make all update threads watch for a global `AtomicBool` being `true`,
and if so stop their work
- reset the global bool and exit early (i.e. before writing the dirstate)
- raise SIGINT from `hg-cpython` if update returns `InterruptReceived`
author | Rapha?l Gom?s <rgomes@octobus.net> |
---|---|
date | Tue, 12 Nov 2024 12:52:13 +0100 |
parents | e6a44bc91bc2 |
children | 65d516db7309 |
line wrap: on
line diff
--- a/rust/hg-core/src/update.rs Fri Nov 08 17:08:11 2024 +0100 +++ b/rust/hg-core/src/update.rs Tue Nov 12 12:52:13 2024 +0100 @@ -5,6 +5,7 @@ io::Write, os::unix::fs::{MetadataExt, PermissionsExt}, path::Path, + sync::atomic::Ordering, time::Duration, }; @@ -30,6 +31,7 @@ }, vfs::{is_on_nfs_mount, VfsImpl}, DirstateParents, RevlogError, RevlogOpenOptions, UncheckedRevision, + INTERRUPT_RECEIVED, }; use crossbeam_channel::{Receiver, Sender}; use rayon::prelude::*; @@ -100,6 +102,15 @@ let chunks = chunk_tracked_files(tracked_files); progress.update(0, Some(files_count as u64)); + // TODO find a way (with `nix` or `signal-hook`?) of resetting the + // previous signal handler directly after. Currently, this is Python's + // job, but: + // - it introduces a (small) race between catching and resetting + // - it would break signal handlers in other contexts like `rhg`` + let _ = ctrlc::set_handler(|| { + INTERRUPT_RECEIVED.store(true, Ordering::Relaxed) + }); + create_working_copy( chunks, working_directory_path, @@ -111,6 +122,12 @@ workers, ); + // Reset the global interrupt now that we're done + if INTERRUPT_RECEIVED.swap(false, Ordering::Relaxed) { + // The threads have all exited early, let's re-raise + return Err(HgError::InterruptReceived); + } + let errors: Vec<HgError> = errors_receiver.iter().collect(); if !errors.is_empty() { log::debug!("{} errors during update (see trace logs)", errors.len()); @@ -192,7 +209,8 @@ workers: Option<usize>, ) { let auditor = PathAuditor::new(working_directory_path); - let work_closure = |(dir_path, chunk)| { + + let work_closure = |(dir_path, chunk)| -> Result<(), HgError> { if let Err(e) = working_copy_worker( dir_path, chunk, @@ -207,6 +225,7 @@ .send(e) .expect("channel should not be disconnected") } + Ok(()) }; if let Some(workers) = workers { if workers > 1 { @@ -223,18 +242,19 @@ Ok(pool) => { log::trace!("restricting update to {} threads", workers); pool.install(|| { - chunks.into_par_iter().for_each(work_closure); + let _ = + chunks.into_par_iter().try_for_each(work_closure); }); } } } else { // Work sequentially, don't even invoke rayon - chunks.into_iter().for_each(work_closure); + let _ = chunks.into_iter().try_for_each(work_closure); } } else { // Work in parallel by default in the global threadpool let _ = cap_default_rayon_threads(); - chunks.into_par_iter().for_each(work_closure); + let _ = chunks.into_par_iter().try_for_each(work_closure); } } @@ -256,6 +276,11 @@ let dir_path = working_directory_path.join(dir_path); std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?; + if INTERRUPT_RECEIVED.load(Ordering::Relaxed) { + // Stop working, the user has requested that we stop + return Err(HgError::InterruptReceived); + } + for (file, file_node, flags) in chunk { auditor.audit_path(file)?; let flags = flags.map(|f| f.into());