diff rust/hg-core/src/update.rs @ 52183:96b113d22b34 stable

rust-update: handle SIGINT from long-running update threads The current code does not respond to ^C until after the Rust bit is finished doing its work. This is expected, since Rust holds the GIL for the duration of the call and does not call `PyErr_CheckSignals`. Freeing the GIL to do our work does not really improve anything since the Rust threads are still going, and the only way of cancelling a thread is by making it cooperate. So we do the following: - remember the SIGINT handler in hg-cpython and reset it after the call into core (see inline comment in `update.rs` about this) - make all update threads watch for a global `AtomicBool` being `true`, and if so stop their work - reset the global bool and exit early (i.e. before writing the dirstate) - raise SIGINT from `hg-cpython` if update returns `InterruptReceived`
author Rapha?l Gom?s <rgomes@octobus.net>
date Tue, 12 Nov 2024 12:52:13 +0100
parents e6a44bc91bc2
children 65d516db7309
line wrap: on
line diff
--- a/rust/hg-core/src/update.rs	Fri Nov 08 17:08:11 2024 +0100
+++ b/rust/hg-core/src/update.rs	Tue Nov 12 12:52:13 2024 +0100
@@ -5,6 +5,7 @@
     io::Write,
     os::unix::fs::{MetadataExt, PermissionsExt},
     path::Path,
+    sync::atomic::Ordering,
     time::Duration,
 };
 
@@ -30,6 +31,7 @@
     },
     vfs::{is_on_nfs_mount, VfsImpl},
     DirstateParents, RevlogError, RevlogOpenOptions, UncheckedRevision,
+    INTERRUPT_RECEIVED,
 };
 use crossbeam_channel::{Receiver, Sender};
 use rayon::prelude::*;
@@ -100,6 +102,15 @@
     let chunks = chunk_tracked_files(tracked_files);
     progress.update(0, Some(files_count as u64));
 
+    // TODO find a way (with `nix` or `signal-hook`?) of resetting the
+    // previous signal handler directly after. Currently, this is Python's
+    // job, but:
+    //     - it introduces a (small) race between catching and resetting
+    //     - it would break signal handlers in other contexts like `rhg``
+    let _ = ctrlc::set_handler(|| {
+        INTERRUPT_RECEIVED.store(true, Ordering::Relaxed)
+    });
+
     create_working_copy(
         chunks,
         working_directory_path,
@@ -111,6 +122,12 @@
         workers,
     );
 
+    // Reset the global interrupt now that we're done
+    if INTERRUPT_RECEIVED.swap(false, Ordering::Relaxed) {
+        // The threads have all exited early, let's re-raise
+        return Err(HgError::InterruptReceived);
+    }
+
     let errors: Vec<HgError> = errors_receiver.iter().collect();
     if !errors.is_empty() {
         log::debug!("{} errors during update (see trace logs)", errors.len());
@@ -192,7 +209,8 @@
     workers: Option<usize>,
 ) {
     let auditor = PathAuditor::new(working_directory_path);
-    let work_closure = |(dir_path, chunk)| {
+
+    let work_closure = |(dir_path, chunk)| -> Result<(), HgError> {
         if let Err(e) = working_copy_worker(
             dir_path,
             chunk,
@@ -207,6 +225,7 @@
                 .send(e)
                 .expect("channel should not be disconnected")
         }
+        Ok(())
     };
     if let Some(workers) = workers {
         if workers > 1 {
@@ -223,18 +242,19 @@
                 Ok(pool) => {
                     log::trace!("restricting update to {} threads", workers);
                     pool.install(|| {
-                        chunks.into_par_iter().for_each(work_closure);
+                        let _ =
+                            chunks.into_par_iter().try_for_each(work_closure);
                     });
                 }
             }
         } else {
             // Work sequentially, don't even invoke rayon
-            chunks.into_iter().for_each(work_closure);
+            let _ = chunks.into_iter().try_for_each(work_closure);
         }
     } else {
         // Work in parallel by default in the global threadpool
         let _ = cap_default_rayon_threads();
-        chunks.into_par_iter().for_each(work_closure);
+        let _ = chunks.into_par_iter().try_for_each(work_closure);
     }
 }
 
@@ -256,6 +276,11 @@
     let dir_path = working_directory_path.join(dir_path);
     std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?;
 
+    if INTERRUPT_RECEIVED.load(Ordering::Relaxed) {
+        // Stop working, the user has requested that we stop
+        return Err(HgError::InterruptReceived);
+    }
+
     for (file, file_node, flags) in chunk {
         auditor.audit_path(file)?;
         let flags = flags.map(|f| f.into());