diff rust/hg-core/src/update.rs @ 52313:65d516db7309

branching: merge stable into default
author Rapha?l Gom?s <rgomes@octobus.net>
date Thu, 14 Nov 2024 16:45:23 +0100
parents bd8081e9fd62 96b113d22b34
children db065b33fa56
line wrap: on
line diff
--- a/rust/hg-core/src/update.rs	Mon Oct 21 12:58:40 2024 +0200
+++ b/rust/hg-core/src/update.rs	Thu Nov 14 16:45:23 2024 +0100
@@ -5,6 +5,7 @@
     io::Write,
     os::unix::fs::{MetadataExt, PermissionsExt},
     path::Path,
+    sync::atomic::Ordering,
     time::Duration,
 };
 
@@ -24,12 +25,13 @@
     revlog::RevlogError,
     sparse,
     utils::{
+        cap_default_rayon_threads,
         files::{filesystem_now, get_path_from_bytes},
         hg_path::{hg_path_to_path_buf, HgPath, HgPathError},
         path_auditor::PathAuditor,
     },
     vfs::{is_on_nfs_mount, VfsImpl},
-    DirstateParents, UncheckedRevision,
+    DirstateParents, UncheckedRevision, INTERRUPT_RECEIVED,
 };
 use crossbeam_channel::{Receiver, Sender};
 use rayon::prelude::*;
@@ -50,6 +52,7 @@
     repo: &Repo,
     to: UncheckedRevision,
     progress: &dyn Progress,
+    workers: Option<usize>,
 ) -> Result<usize, HgError> {
     // Ignore the warnings, they've been displayed by Python already
     // TODO non-Python clients: display narrow warnings
@@ -103,6 +106,15 @@
     let chunks = chunk_tracked_files(tracked_files);
     progress.update(0, Some(files_count as u64));
 
+    // TODO find a way (with `nix` or `signal-hook`?) of resetting the
+    // previous signal handler directly after. Currently, this is Python's
+    // job, but:
+    //     - it introduces a (small) race between catching and resetting
+    //     - it would break signal handlers in other contexts like `rhg``
+    let _ = ctrlc::set_handler(|| {
+        INTERRUPT_RECEIVED.store(true, Ordering::Relaxed)
+    });
+
     create_working_copy(
         chunks,
         working_directory_path,
@@ -111,8 +123,15 @@
         files_sender,
         errors_sender,
         progress,
+        workers,
     );
 
+    // Reset the global interrupt now that we're done
+    if INTERRUPT_RECEIVED.swap(false, Ordering::Relaxed) {
+        // The threads have all exited early, let's re-raise
+        return Err(HgError::InterruptReceived);
+    }
+
     let errors: Vec<HgError> = errors_receiver.iter().collect();
     if !errors.is_empty() {
         log::debug!("{} errors during update (see trace logs)", errors.len());
@@ -182,6 +201,7 @@
 }
 
 #[logging_timer::time("trace")]
+#[allow(clippy::too_many_arguments)]
 fn create_working_copy<'a: 'b, 'b>(
     chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>,
     working_directory_path: &Path,
@@ -190,9 +210,11 @@
     files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>,
     error_sender: Sender<HgError>,
     progress: &dyn Progress,
+    workers: Option<usize>,
 ) {
     let auditor = PathAuditor::new(working_directory_path);
-    chunks.into_par_iter().for_each(|(dir_path, chunk)| {
+
+    let work_closure = |(dir_path, chunk)| -> Result<(), HgError> {
         if let Err(e) = working_copy_worker(
             dir_path,
             chunk,
@@ -207,7 +229,37 @@
                 .send(e)
                 .expect("channel should not be disconnected")
         }
-    });
+        Ok(())
+    };
+    if let Some(workers) = workers {
+        if workers > 1 {
+            // Work in parallel, potentially restricting the number of threads
+            match rayon::ThreadPoolBuilder::new().num_threads(workers).build()
+            {
+                Err(error) => error_sender
+                    .send(HgError::abort(
+                        error.to_string(),
+                        exit_codes::ABORT,
+                        None,
+                    ))
+                    .expect("channel should not be disconnected"),
+                Ok(pool) => {
+                    log::trace!("restricting update to {} threads", workers);
+                    pool.install(|| {
+                        let _ =
+                            chunks.into_par_iter().try_for_each(work_closure);
+                    });
+                }
+            }
+        } else {
+            // Work sequentially, don't even invoke rayon
+            let _ = chunks.into_iter().try_for_each(work_closure);
+        }
+    } else {
+        // Work in parallel by default in the global threadpool
+        let _ = cap_default_rayon_threads();
+        let _ = chunks.into_par_iter().try_for_each(work_closure);
+    }
 }
 
 /// Represents a work unit for a single thread, responsible for this set of
@@ -228,6 +280,11 @@
     let dir_path = working_directory_path.join(dir_path);
     std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?;
 
+    if INTERRUPT_RECEIVED.load(Ordering::Relaxed) {
+        // Stop working, the user has requested that we stop
+        return Err(HgError::InterruptReceived);
+    }
+
     for (file, file_node, flags) in chunk {
         auditor.audit_path(file)?;
         let flags = flags.map(|f| f.into());