Mercurial > public > mercurial-scm > hg-stable
changeset 52987:6332e5f857f6
pyo3: transliterate the `copy_tracing` module from `hg-cpython`
This is the last module left to migrate to PyO3 from rust-cpython. We will
remove the rust-cpython code at the start of the next cycle.
author | Rapha?l Gom?s <rgomes@octobus.net> |
---|---|
date | Tue, 18 Feb 2025 15:33:26 +0100 |
parents | 1a99bdbdb71b |
children | 3ced516694ad |
files | rust/Cargo.lock rust/hg-pyo3/Cargo.toml rust/hg-pyo3/src/copy_tracing.rs rust/hg-pyo3/src/lib.rs |
diffstat | 4 files changed, 187 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/Cargo.lock Tue Feb 18 11:48:54 2025 +0100 +++ b/rust/Cargo.lock Tue Feb 18 15:33:26 2025 +0100 @@ -360,9 +360,9 @@ [[package]] name = "crossbeam-channel" -version = "0.5.13" +version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" dependencies = [ "crossbeam-utils", ] @@ -732,6 +732,7 @@ name = "hg-pyo3" version = "0.1.0" dependencies = [ + "crossbeam-channel", "derive_more", "env_logger 0.9.3", "hg-core",
--- a/rust/hg-pyo3/Cargo.toml Tue Feb 18 11:48:54 2025 +0100 +++ b/rust/hg-pyo3/Cargo.toml Tue Feb 18 15:33:26 2025 +0100 @@ -21,3 +21,4 @@ derive_more = "0.99.17" env_logger = "0.9.3" vcsgraph = "0.2.0" +crossbeam-channel = "0.5.14"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rust/hg-pyo3/src/copy_tracing.rs Tue Feb 18 15:33:26 2025 +0100 @@ -0,0 +1,181 @@ +// copy_tracing.rs +// +// Copyright 2025 Mercurial developers +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +//! Bindings for the `hg::copy_tracing` module provided by the +//! `hg-core` package. +//! +//! From Python, this will be seen as `mercurial.pyo3_rustext.copy_tracing` + +use hg::copy_tracing::ChangedFiles; +use hg::copy_tracing::CombineChangesetCopies; +use hg::Revision; +use pyo3::types::PyBytes; +use pyo3::types::PyDict; +use pyo3::types::PyList; +use pyo3::types::PyTuple; + +use crate::revision::PyRevision; +use crate::utils::new_submodule; +use crate::utils::PyBytesDeref; + +use pyo3::prelude::*; + +/// Combines copies information contained into revision `revs` to build a copy +/// map. +/// +/// See mercurial/copies.py for details +#[pyfunction] +#[pyo3(name = "combine_changeset_copies")] +pub fn combine_changeset_copies_wrapper( + revs: Bound<'_, PyList>, + children_count: Bound<'_, PyDict>, + target_rev: PyRevision, + rev_info: Bound<'_, PyAny>, + multi_thread: bool, +) -> PyResult<PyObject> { + let py = revs.py(); + let target_rev = Revision(target_rev.0); + let children_count = children_count + .iter() + .map(|(k, v)| { + Ok((Revision(k.extract::<PyRevision>()?.0), v.extract()?)) + }) + .collect::<PyResult<_>>()?; + + /// (Revision number, parent 1, parent 2, copy data for this revision) + type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>); + + let revs_info = + revs.iter().map(|rev_py| -> PyResult<RevInfo<Py<PyBytes>>> { + let rev = Revision(rev_py.extract::<PyRevision>()?.0); + let ret = rev_info.call1((rev_py,))?; + let tuple: &Bound<'_, PyTuple> = ret.downcast()?; + let p1 = Revision(tuple.get_item(0)?.extract::<PyRevision>()?.0); + let p2 = Revision(tuple.get_item(1)?.extract::<PyRevision>()?.0); + let opt_bytes = tuple.get_item(2)?.extract()?; + Ok((rev, p1, p2, opt_bytes)) + }); + + let path_copies; + if !multi_thread { + let mut combine_changeset_copies = + CombineChangesetCopies::new(children_count); + + for rev_info in revs_info { + let (rev, p1, p2, opt_bytes) = rev_info?; + let files = match &opt_bytes { + Some(bytes) => ChangedFiles::new(bytes.as_bytes(py)), + // Python None was extracted to Option::None, + // meaning there was no copy data. + None => ChangedFiles::new_empty(), + }; + + combine_changeset_copies.add_revision(rev, p1, p2, files) + } + path_copies = combine_changeset_copies.finish(target_rev) + } else { + // Use a bounded channel to provide back-pressure: + // if the child thread is slower to process revisions than this thread + // is to gather data for them, an unbounded channel would keep + // growing and eat memory. + // + // TODO: tweak the bound? + let (rev_info_sender, rev_info_receiver) = + crossbeam_channel::bounded::<RevInfo<PyBytesDeref>>(1000); + + // This channel (going the other way around) however is unbounded. + // If they were both bounded, there might potentially be deadlocks + // where both channels are full and both threads are waiting on each + // other. + let (pybytes_sender, pybytes_receiver) = + crossbeam_channel::unbounded(); + + // Start a thread that does CPU-heavy processing in parallel with the + // loop below. + // + // If the parent thread panics, `rev_info_sender` will be dropped and + // “disconnected”. `rev_info_receiver` will be notified of this and + // exit its own loop. + let thread = std::thread::spawn(move || { + let mut combine_changeset_copies = + CombineChangesetCopies::new(children_count); + for (rev, p1, p2, opt_bytes) in rev_info_receiver { + let files = match &opt_bytes { + Some(raw) => ChangedFiles::new(raw.as_ref()), + // Python None was extracted to Option::None, + // meaning there was no copy data. + None => ChangedFiles::new_empty(), + }; + combine_changeset_copies.add_revision(rev, p1, p2, files); + + // Send `PyBytes` back to the parent thread so the parent + // thread can drop it. Otherwise the GIL would be implicitly + // acquired here through `impl Drop for PyBytes`. + if let Some(bytes) = opt_bytes { + if pybytes_sender.send(bytes.unwrap()).is_err() { + // The channel is disconnected, meaning the parent + // thread panicked or returned + // early through + // `?` to propagate a Python exception. + break; + } + } + } + + combine_changeset_copies.finish(target_rev) + }); + + for rev_info in revs_info { + let (rev, p1, p2, opt_bytes) = rev_info?; + let opt_bytes = opt_bytes.map(|b| PyBytesDeref::new(py, b)); + + // We’d prefer to avoid the child thread calling into Python code, + // but this avoids a potential deadlock on the GIL if it does: + py.allow_threads(|| { + rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( + "combine_changeset_copies: channel is disconnected", + ); + }); + + // Drop anything in the channel, without blocking + pybytes_receiver.try_iter().for_each(drop); + } + // We’d prefer to avoid the child thread calling into Python code, + // but this avoids a potential deadlock on the GIL if it does: + path_copies = py.allow_threads(|| { + // Disconnect the channel to signal the child thread to stop: + // the `for … in rev_info_receiver` loop will end. + drop(rev_info_sender); + + // Wait for the child thread to stop, and propagate any panic. + thread.join().unwrap_or_else(|panic_payload| { + std::panic::resume_unwind(panic_payload) + }) + }); + + // Drop anything left in the channel + drop(pybytes_receiver) + }; + + let out = PyDict::new(py); + for (dest, source) in path_copies.into_iter() { + out.set_item( + PyBytes::new(py, &dest.into_vec()), + PyBytes::new(py, &source.into_vec()), + )?; + } + Ok(out.into_any().unbind()) +} + +pub fn init_module<'py>( + py: Python<'py>, + package: &str, +) -> PyResult<Bound<'py, PyModule>> { + let m = new_submodule(py, package, "copy_tracing")?; + m.add_function(wrap_pyfunction!(combine_changeset_copies_wrapper, &m)?)?; + Ok(m) +}
--- a/rust/hg-pyo3/src/lib.rs Tue Feb 18 11:48:54 2025 +0100 +++ b/rust/hg-pyo3/src/lib.rs Tue Feb 18 15:33:26 2025 +0100 @@ -1,6 +1,7 @@ use pyo3::prelude::*; mod ancestors; +mod copy_tracing; mod dagops; mod dirstate; mod discovery; @@ -26,6 +27,7 @@ env_logger::init(); m.add_submodule(&ancestors::init_module(py, &dotted_name)?)?; + m.add_submodule(©_tracing::init_module(py, &dotted_name)?)?; m.add_submodule(&dagops::init_module(py, &dotted_name)?)?; m.add_submodule(&dirstate::init_module(py, &dotted_name)?)?; m.add_submodule(&discovery::init_module(py, &dotted_name)?)?;