diff rust/hg-core/src/update.rs @ 52084:8b7123c8947b

update: add a Rust fast-path when updating from null (and clean) This case is easy to detect and we have all we need to generate a valid working copy and dirstate entirely in Rust, which speeds things up considerably: On my machine updating a repo of ~300k files goes from 10.00s down to 4.2s, all while consuming 50% less system time, with all caches hot. Something to note is that further improvements will probably happen with the upcoming `InnerRevlog` series that does smarter mmap hanlding, especially for filelogs. Here are benchmark numbers on a machine with only 4 cores (and no SMT enabled) ``` ### data-env-vars.name = heptapod-public-2024-03-25-ds2-pnm # benchmark.name = hg.command.update # bin-env-vars.hg.py-re2-module = default # bin-env-vars.hg.changeset.node = <this change> # benchmark.variants.atomic-update = no # benchmark.variants.scenario = null-to-tip # benchmark.variants.worker = default default: 5.328762 ~~~~~ rust: 1.308654 (-75.44%, -4.02) ### data-env-vars.name = mercurial-devel-2024-03-22-ds2-pnm # benchmark.name = hg.command.update # bin-env-vars.hg.py-re2-module = default # bin-env-vars.hg.changeset.node = <this change> # benchmark.variants.atomic-update = no # benchmark.variants.scenario = null-to-tip # benchmark.variants.worker = default default: 1.693271 ~~~~~ rust: 1.151053 (-32.02%, -0.54) ### data-env-vars.name = mozilla-unified-2024-03-22-ds2-pnm # benchmark.name = hg.command.update # bin-env-vars.hg.py-re2-module = default # bin-env-vars.hg.changeset.node = <this change> # benchmark.variants.atomic-update = no # benchmark.variants.scenario = null-to-tip # benchmark.variants.worker = default default: 38.901613 ~~~~~ rust: 11.637880 (-70.08%, -27.26) ### data-env-vars.name = netbsd-xsrc-public-2024-09-19-ds2-pnm # benchmark.name = hg.command.update # bin-env-vars.hg.py-re2-module = default # bin-env-vars.hg.changeset.node = <this change> # benchmark.variants.atomic-update = no # benchmark.variants.scenario = null-to-tip # benchmark.variants.worker = default default: 4.793727 ~~~~~ rust: 1.505905 (-68.59%, -3.29) ```
author Rapha?l Gom?s <rgomes@octobus.net>
date Tue, 01 Oct 2024 13:49:11 +0200
parents
children e6a44bc91bc2 039b7caeb4d9
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/hg-core/src/update.rs	Tue Oct 01 13:49:11 2024 +0200
@@ -0,0 +1,491 @@
+//! Tools for moving the repository to a given revision
+
+use std::{
+    fs::Permissions,
+    io::Write,
+    os::unix::fs::{MetadataExt, PermissionsExt},
+    path::Path,
+    time::Duration,
+};
+
+use crate::{
+    dirstate::{ParentFileData, TruncatedTimestamp},
+    dirstate_tree::{
+        dirstate_map::DirstateEntryReset, on_disk::write_tracked_key,
+    },
+    errors::{HgError, IoResultExt},
+    exit_codes,
+    filelog::Filelog,
+    narrow,
+    node::NULL_NODE,
+    operations::{list_rev_tracked_files, ExpandedManifestEntry},
+    progress::Progress,
+    repo::Repo,
+    sparse,
+    utils::{
+        files::{filesystem_now, get_path_from_bytes},
+        hg_path::{hg_path_to_path_buf, HgPath, HgPathError},
+        path_auditor::PathAuditor,
+    },
+    vfs::{is_on_nfs_mount, VfsImpl},
+    DirstateParents, RevlogError, RevlogOpenOptions, UncheckedRevision,
+};
+use crossbeam_channel::{Receiver, Sender};
+use rayon::prelude::*;
+
+fn write_dirstate(repo: &Repo) -> Result<(), HgError> {
+    repo.write_dirstate()
+        .map_err(|e| HgError::abort(e.to_string(), exit_codes::ABORT, None))?;
+    write_tracked_key(repo)
+}
+
+/// Update the current working copy of `repo` to the given revision `to`, from
+/// the null revision and set + write out the dirstate to reflect that.
+///
+/// Do not call this outside of a Python context. This does *not* handle any
+/// of the checks, hooks, lock taking needed to setup and get out of this
+/// update from the null revision.
+pub fn update_from_null(
+    repo: &Repo,
+    to: UncheckedRevision,
+    progress: &dyn Progress,
+) -> Result<usize, HgError> {
+    // Ignore the warnings, they've been displayed by Python already
+    // TODO non-Python clients: display narrow warnings
+    let (narrow_matcher, _) = narrow::matcher(repo)?;
+
+    let files_for_rev = list_rev_tracked_files(repo, to, narrow_matcher)
+        .map_err(handle_revlog_error)?;
+    repo.manually_set_parents(DirstateParents {
+        p1: repo.node(to).expect("update target should exist"),
+        p2: NULL_NODE,
+    })?;
+
+    // Filter the working copy according to the sparse spec
+    let tracked_files: Result<Vec<_>, _> = if !repo.has_sparse() {
+        files_for_rev.iter().collect()
+    } else {
+        // Ignore the warnings, they've been displayed by Python already
+        // TODO non-Python clients: display sparse warnings
+        let (sparse_matcher, _) = sparse::matcher(repo)?;
+        files_for_rev
+            .iter()
+            .filter(|f| {
+                match f {
+                    Ok(f) => sparse_matcher.matches(f.0),
+                    Err(_) => true, // Errors stop the update, include them
+                }
+            })
+            .collect()
+    };
+    let tracked_files = tracked_files?;
+
+    if tracked_files.is_empty() {
+        // Still write the dirstate because we might not be in the null
+        // revision.
+        // This can happen in narrow repos where all paths are excluded in
+        // this revision.
+        write_dirstate(repo)?;
+        return Ok(0);
+    }
+    let store_vfs = &repo.store_vfs();
+    let options = repo.default_revlog_options(crate::RevlogType::Filelog)?;
+    let (errors_sender, errors_receiver) = crossbeam_channel::unbounded();
+    let (files_sender, files_receiver) = crossbeam_channel::unbounded();
+    let working_directory_path = &repo.working_directory_path();
+
+    let files_count = tracked_files.len();
+    let chunks = chunk_tracked_files(tracked_files);
+    progress.update(0, Some(files_count as u64));
+
+    create_working_copy(
+        chunks,
+        working_directory_path,
+        store_vfs,
+        options,
+        files_sender,
+        errors_sender,
+        progress,
+    );
+
+    let errors: Vec<HgError> = errors_receiver.iter().collect();
+    if !errors.is_empty() {
+        log::debug!("{} errors during update (see trace logs)", errors.len());
+        for error in errors.iter() {
+            log::trace!("{}", error);
+        }
+        // Best we can do is raise the first error (in order of the channel)
+        return Err(errors.into_iter().next().expect("can never be empty"));
+    }
+
+    // TODO try to run this concurrently to update the dirstate while we're
+    // still writing out the working copy to see if that improves performance.
+    let total = update_dirstate(repo, files_receiver)?;
+
+    write_dirstate(repo)?;
+
+    Ok(total)
+}
+
+fn handle_revlog_error(e: RevlogError) -> HgError {
+    match e {
+        crate::RevlogError::Other(hg_error) => hg_error,
+        e => HgError::abort(
+            format!("revlog error: {}", e),
+            exit_codes::ABORT,
+            None,
+        ),
+    }
+}
+
+/// Preallocated size of Vec holding directory contents. This aims at
+/// preventing the need for re-allocating the Vec in most cases.
+///
+/// The value is arbitrarily picked as a little over an average number of files
+/// per directory done by looking at a few larger open-source repos.
+/// Most of the runtime is IO anyway, so this doesn't matter too much.
+const FILES_PER_DIRECTORY: usize = 16;
+
+/// Chunk files per directory prefix, so almost every directory is handled
+/// in a separate thread, which works around the FS inode mutex.
+/// Chunking less (and doing approximately `files_count`/`threads`) actually
+/// ends up being less performant: my hypothesis is `rayon`'s work stealing
+/// being more efficient with tasks of varying lengths.
+#[logging_timer::time("trace")]
+fn chunk_tracked_files(
+    tracked_files: Vec<ExpandedManifestEntry>,
+) -> Vec<(&HgPath, Vec<ExpandedManifestEntry>)> {
+    let files_count = tracked_files.len();
+
+    let mut chunks = Vec::with_capacity(files_count / FILES_PER_DIRECTORY);
+
+    let mut current_chunk = Vec::with_capacity(FILES_PER_DIRECTORY);
+    let mut last_directory = tracked_files[0].0.parent();
+
+    for file_info in tracked_files {
+        let current_directory = file_info.0.parent();
+        let different_directory = current_directory != last_directory;
+        if different_directory {
+            chunks.push((last_directory, current_chunk));
+            current_chunk = Vec::with_capacity(FILES_PER_DIRECTORY);
+        }
+        current_chunk.push(file_info);
+        last_directory = current_directory;
+    }
+    chunks.push((last_directory, current_chunk));
+    chunks
+}
+
+#[logging_timer::time("trace")]
+fn create_working_copy<'a: 'b, 'b>(
+    chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>,
+    working_directory_path: &Path,
+    store_vfs: &VfsImpl,
+    options: RevlogOpenOptions,
+    files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>,
+    error_sender: Sender<HgError>,
+    progress: &dyn Progress,
+) {
+    let auditor = PathAuditor::new(working_directory_path);
+    chunks.into_par_iter().for_each(|(dir_path, chunk)| {
+        if let Err(e) = working_copy_worker(
+            dir_path,
+            chunk,
+            working_directory_path,
+            store_vfs,
+            options,
+            &files_sender,
+            progress,
+            &auditor,
+        ) {
+            error_sender
+                .send(e)
+                .expect("channel should not be disconnected")
+        }
+    });
+}
+
+/// Represents a work unit for a single thread, responsible for this set of
+/// files and restoring them to the working copy.
+#[allow(clippy::too_many_arguments)]
+fn working_copy_worker<'a: 'b, 'b>(
+    dir_path: &HgPath,
+    chunk: Vec<ExpandedManifestEntry<'a>>,
+    working_directory_path: &Path,
+    store_vfs: &VfsImpl,
+    options: RevlogOpenOptions,
+    files_sender: &Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>,
+    progress: &dyn Progress,
+    auditor: &PathAuditor,
+) -> Result<(), HgError> {
+    let dir_path =
+        hg_path_to_path_buf(dir_path).expect("invalid path in manifest");
+    let dir_path = working_directory_path.join(dir_path);
+    std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?;
+
+    for (file, file_node, flags) in chunk {
+        auditor.audit_path(file)?;
+        let flags = flags.map(|f| f.into());
+        let path =
+            working_directory_path.join(get_path_from_bytes(file.as_bytes()));
+
+        // Treemanifest is not supported
+        assert!(flags != Some(b't'));
+
+        let filelog = Filelog::open_vfs(store_vfs, file, options)?;
+        let filelog_revision_data = &filelog
+            .data_for_node(file_node)
+            .map_err(handle_revlog_error)?;
+        let file_data = filelog_revision_data.file_data()?;
+
+        if flags == Some(b'l') {
+            let target = get_path_from_bytes(file_data);
+            if let Err(e) = std::os::unix::fs::symlink(target, &path) {
+                // If the path already exists either:
+                //   - another process created this file while ignoring the
+                //     lock => error
+                //   - our check for the fast path is incorrect => error
+                //   - this is a malicious repo/bundle and this is symlink that
+                //     tries to write things where it shouldn't be able to.
+                match e.kind() {
+                    std::io::ErrorKind::AlreadyExists => {
+                        let metadata = std::fs::symlink_metadata(&path)
+                            .when_reading_file(&path)?;
+                        if metadata.is_dir() {
+                            return Err(HgError::Path(
+                                HgPathError::TraversesSymbolicLink {
+                                    // Technically it should be one of the
+                                    // children, but good enough
+                                    path: file
+                                        .join(HgPath::new(b"*"))
+                                        .to_owned(),
+                                    symlink: file.to_owned(),
+                                },
+                            ));
+                        }
+                        return Err(e).when_writing_file(&path);
+                    }
+                    _ => return Err(e).when_writing_file(&path),
+                }
+            }
+        } else {
+            let mut f =
+                std::fs::File::create(&path).when_writing_file(&path)?;
+            f.write_all(file_data).when_writing_file(&path)?;
+        }
+        if flags == Some(b'x') {
+            std::fs::set_permissions(&path, Permissions::from_mode(0o755))
+                .when_writing_file(&path)?;
+        }
+        let metadata =
+            std::fs::symlink_metadata(&path).when_reading_file(&path)?;
+
+        let mode = metadata.mode();
+
+        files_sender
+            .send((
+                file,
+                mode,
+                file_data.len(),
+                TruncatedTimestamp::for_mtime_of(&metadata)
+                    .when_reading_file(&path)?,
+            ))
+            .expect("channel should not be closed");
+        progress.increment(1, None);
+    }
+    Ok(())
+}
+
+#[logging_timer::time("trace")]
+fn update_dirstate(
+    repo: &Repo,
+    files_receiver: Receiver<(&HgPath, u32, usize, TruncatedTimestamp)>,
+) -> Result<usize, HgError> {
+    let mut dirstate = repo
+        .dirstate_map_mut()
+        .map_err(|e| HgError::abort(e.to_string(), exit_codes::ABORT, None))?;
+
+    // (see the comments in `filter_ambiguous_files` in `merge.py` for more)
+    // It turns out that (on Linux at least) the filesystem resolution time
+    // for most filesystems is based on the HZ kernel config. Their internal
+    // clocks do return nanoseconds if the hardware clock is precise enough,
+    // which should be the case on most recent computers but are only updated
+    // every few milliseconds at best (every "jiffy").
+    //
+    // We are still not concerned with fixing the race with other
+    // processes that might modify the working copy right after it was created
+    // within the same tick, because it is impossible to catch.
+    // However, we might as well not race with operations that could run right
+    // after this one, especially other Mercurial operations that could be
+    // waiting for the wlock to change file contents and the dirstate.
+    //
+    // Thus: wait until the filesystem clock has ticked to filter ambiguous
+    // entries and write the dirstate, but only for dirstate-v2, since v1 only
+    // has second-level granularity and waiting for a whole second is too much
+    // of a penalty in the general case.
+    // Although we're assuming that people running dirstate-v2 on Linux
+    // don't have a second-granularity FS (with the exclusion of NFS), users
+    // can be surprising, and at some point in the future dirstate-v2 will
+    // become the default. To that end, we limit the wait time to 100ms and
+    // fall back to the filter method in case of a timeout.
+    //
+    // +------------+------+--------------+
+    // |   version  | wait | filter level |
+    // +------------+------+--------------+
+    // |     V1     | No   | Second       |
+    // |     V2     | Yes  | Nanosecond   |
+    // | V2-slow-fs | No   | Second       |
+    // +------------+------+--------------+
+    let dirstate_v2 = repo.use_dirstate_v2();
+
+    // Let's ignore NFS right off the bat
+    let mut fast_enough_fs = !is_on_nfs_mount(repo.working_directory_path());
+    let fs_time_now = if dirstate_v2 && fast_enough_fs {
+        match wait_until_fs_tick(repo.working_directory_path()) {
+            None => None,
+            Some(Ok(time)) => Some(time),
+            Some(Err(time)) => {
+                fast_enough_fs = false;
+                Some(time)
+            }
+        }
+    } else {
+        filesystem_now(repo.working_directory_path())
+            .ok()
+            .map(TruncatedTimestamp::from)
+    };
+
+    let mut total = 0;
+    for (filename, mode, size, mtime) in files_receiver.into_iter() {
+        total += 1;
+        // When using dirstate-v2 on a filesystem with reasonable performance
+        // this is basically always true unless you get a mtime from the
+        // far future.
+        let has_meaningful_mtime = if let Some(fs_time) = fs_time_now {
+            mtime.for_reliable_mtime_of_self(&fs_time).is_some_and(|t| {
+                // Dirstate-v1 only has second-level information
+                !t.second_ambiguous || dirstate_v2 && fast_enough_fs
+            })
+        } else {
+            // We somehow failed to write to the filesystem, so don't store
+            // the cache information.
+            false
+        };
+        let reset = DirstateEntryReset {
+            filename,
+            wc_tracked: true,
+            p1_tracked: true,
+            p2_info: false,
+            has_meaningful_mtime,
+            parent_file_data_opt: Some(ParentFileData {
+                mode_size: Some((
+                    mode,
+                    size.try_into().expect("invalid file size in manifest"),
+                )),
+                mtime: Some(mtime),
+            }),
+            from_empty: true,
+        };
+        dirstate.reset_state(reset).map_err(|e| {
+            HgError::abort(e.to_string(), exit_codes::ABORT, None)
+        })?;
+    }
+
+    Ok(total)
+}
+
+/// Wait until the next update from the filesystem time by writing in a loop
+/// a new temporary file inside the working directory and checking if its time
+/// differs from the first one observed.
+///
+/// Returns `None` if we are unable to get the filesystem time,
+/// `Some(Err(timestamp))` if we've timed out waiting for the filesystem clock
+/// to tick, and `Some(Ok(timestamp))` if we've waited successfully.
+///
+/// On Linux, your average tick is going to be a "jiffy", or 1/HZ.
+/// HZ is your kernel's tick rate (if it has one configured) and the value
+/// is the one returned by `grep 'CONFIG_HZ=' /boot/config-$(uname -r)`,
+/// again assuming a normal setup.
+///
+/// In my case (Alphare) at the time of writing, I get `CONFIG_HZ=250`,
+/// which equates to 4ms.
+///
+/// This might change with a series that could make it to Linux 6.12:
+/// https://lore.kernel.org/all/20241002-mgtime-v10-8-d1c4717f5284@kernel.org
+fn wait_until_fs_tick(
+    working_directory_path: &Path,
+) -> Option<Result<TruncatedTimestamp, TruncatedTimestamp>> {
+    let start = std::time::Instant::now();
+    let old_fs_time = filesystem_now(working_directory_path).ok()?;
+    let mut fs_time = filesystem_now(working_directory_path).ok()?;
+
+    const FS_TICK_WAIT_TIMEOUT: Duration = Duration::from_millis(100);
+
+    while fs_time == old_fs_time {
+        if std::time::Instant::now() - start > FS_TICK_WAIT_TIMEOUT {
+            log::trace!(
+                "timed out waiting for the fs clock to tick after {:?}",
+                FS_TICK_WAIT_TIMEOUT
+            );
+            return Some(Err(TruncatedTimestamp::from(old_fs_time)));
+        }
+        fs_time = filesystem_now(working_directory_path).ok()?;
+    }
+    log::trace!(
+        "waited for {:?} before writing the dirstate",
+        fs_time.duration_since(old_fs_time)
+    );
+    Some(Ok(TruncatedTimestamp::from(fs_time)))
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use pretty_assertions::assert_eq;
+
+    #[test]
+    fn test_chunk_tracked_files() {
+        fn chunk(v: Vec<&'static str>) -> Vec<ExpandedManifestEntry> {
+            v.into_iter()
+                .map(|f| (HgPath::new(f.as_bytes()), NULL_NODE, None))
+                .collect()
+        }
+        let p = HgPath::new;
+
+        let files = chunk(vec!["a"]);
+        let expected = vec![(p(""), chunk(vec!["a"]))];
+        assert_eq!(chunk_tracked_files(files), expected);
+
+        let files = chunk(vec!["a", "b", "c"]);
+        let expected = vec![(p(""), chunk(vec!["a", "b", "c"]))];
+        assert_eq!(chunk_tracked_files(files), expected);
+
+        let files = chunk(vec![
+            "dir/a-new",
+            "dir/a/mut",
+            "dir/a/mut-mut",
+            "dir/albert",
+            "dir/b",
+            "dir/subdir/c",
+            "dir/subdir/d",
+            "file",
+        ]);
+        let expected = vec![
+            (p("dir"), chunk(vec!["dir/a-new"])),
+            (p("dir/a"), chunk(vec!["dir/a/mut", "dir/a/mut-mut"])),
+            (p("dir"), chunk(vec!["dir/albert", "dir/b"])),
+            (p("dir/subdir"), chunk(vec!["dir/subdir/c", "dir/subdir/d"])),
+            (p(""), chunk(vec!["file"])),
+        ];
+        assert_eq!(chunk_tracked_files(files), expected);
+
+        // Doesn't get split
+        let large_dir = vec![
+            "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12",
+            "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23",
+        ];
+        let files = chunk(large_dir.clone());
+        let expected = vec![(p(""), chunk(large_dir))];
+        assert_eq!(chunk_tracked_files(files), expected);
+    }
+}