changeset 52987:6332e5f857f6

pyo3: transliterate the `copy_tracing` module from `hg-cpython` This is the last module left to migrate to PyO3 from rust-cpython. We will remove the rust-cpython code at the start of the next cycle.
author Rapha?l Gom?s <rgomes@octobus.net>
date Tue, 18 Feb 2025 15:33:26 +0100
parents 1a99bdbdb71b
children 3ced516694ad
files rust/Cargo.lock rust/hg-pyo3/Cargo.toml rust/hg-pyo3/src/copy_tracing.rs rust/hg-pyo3/src/lib.rs
diffstat 4 files changed, 187 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/rust/Cargo.lock	Tue Feb 18 11:48:54 2025 +0100
+++ b/rust/Cargo.lock	Tue Feb 18 15:33:26 2025 +0100
@@ -360,9 +360,9 @@
 
 [[package]]
 name = "crossbeam-channel"
-version = "0.5.13"
+version = "0.5.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
+checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
 dependencies = [
  "crossbeam-utils",
 ]
@@ -732,6 +732,7 @@
 name = "hg-pyo3"
 version = "0.1.0"
 dependencies = [
+ "crossbeam-channel",
  "derive_more",
  "env_logger 0.9.3",
  "hg-core",
--- a/rust/hg-pyo3/Cargo.toml	Tue Feb 18 11:48:54 2025 +0100
+++ b/rust/hg-pyo3/Cargo.toml	Tue Feb 18 15:33:26 2025 +0100
@@ -21,3 +21,4 @@
 derive_more = "0.99.17"
 env_logger = "0.9.3"
 vcsgraph = "0.2.0"
+crossbeam-channel = "0.5.14"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/hg-pyo3/src/copy_tracing.rs	Tue Feb 18 15:33:26 2025 +0100
@@ -0,0 +1,181 @@
+// copy_tracing.rs
+//
+// Copyright 2025 Mercurial developers
+//
+// This software may be used and distributed according to the terms of the
+// GNU General Public License version 2 or any later version.
+
+//! Bindings for the `hg::copy_tracing` module provided by the
+//! `hg-core` package.
+//!
+//! From Python, this will be seen as `mercurial.pyo3_rustext.copy_tracing`
+
+use hg::copy_tracing::ChangedFiles;
+use hg::copy_tracing::CombineChangesetCopies;
+use hg::Revision;
+use pyo3::types::PyBytes;
+use pyo3::types::PyDict;
+use pyo3::types::PyList;
+use pyo3::types::PyTuple;
+
+use crate::revision::PyRevision;
+use crate::utils::new_submodule;
+use crate::utils::PyBytesDeref;
+
+use pyo3::prelude::*;
+
+/// Combines copies information contained into revision `revs` to build a copy
+/// map.
+///
+/// See mercurial/copies.py for details
+#[pyfunction]
+#[pyo3(name = "combine_changeset_copies")]
+pub fn combine_changeset_copies_wrapper(
+    revs: Bound<'_, PyList>,
+    children_count: Bound<'_, PyDict>,
+    target_rev: PyRevision,
+    rev_info: Bound<'_, PyAny>,
+    multi_thread: bool,
+) -> PyResult<PyObject> {
+    let py = revs.py();
+    let target_rev = Revision(target_rev.0);
+    let children_count = children_count
+        .iter()
+        .map(|(k, v)| {
+            Ok((Revision(k.extract::<PyRevision>()?.0), v.extract()?))
+        })
+        .collect::<PyResult<_>>()?;
+
+    /// (Revision number, parent 1, parent 2, copy data for this revision)
+    type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>);
+
+    let revs_info =
+        revs.iter().map(|rev_py| -> PyResult<RevInfo<Py<PyBytes>>> {
+            let rev = Revision(rev_py.extract::<PyRevision>()?.0);
+            let ret = rev_info.call1((rev_py,))?;
+            let tuple: &Bound<'_, PyTuple> = ret.downcast()?;
+            let p1 = Revision(tuple.get_item(0)?.extract::<PyRevision>()?.0);
+            let p2 = Revision(tuple.get_item(1)?.extract::<PyRevision>()?.0);
+            let opt_bytes = tuple.get_item(2)?.extract()?;
+            Ok((rev, p1, p2, opt_bytes))
+        });
+
+    let path_copies;
+    if !multi_thread {
+        let mut combine_changeset_copies =
+            CombineChangesetCopies::new(children_count);
+
+        for rev_info in revs_info {
+            let (rev, p1, p2, opt_bytes) = rev_info?;
+            let files = match &opt_bytes {
+                Some(bytes) => ChangedFiles::new(bytes.as_bytes(py)),
+                // Python None was extracted to Option::None,
+                // meaning there was no copy data.
+                None => ChangedFiles::new_empty(),
+            };
+
+            combine_changeset_copies.add_revision(rev, p1, p2, files)
+        }
+        path_copies = combine_changeset_copies.finish(target_rev)
+    } else {
+        // Use a bounded channel to provide back-pressure:
+        // if the child thread is slower to process revisions than this thread
+        // is to gather data for them, an unbounded channel would keep
+        // growing and eat memory.
+        //
+        // TODO: tweak the bound?
+        let (rev_info_sender, rev_info_receiver) =
+            crossbeam_channel::bounded::<RevInfo<PyBytesDeref>>(1000);
+
+        // This channel (going the other way around) however is unbounded.
+        // If they were both bounded, there might potentially be deadlocks
+        // where both channels are full and both threads are waiting on each
+        // other.
+        let (pybytes_sender, pybytes_receiver) =
+            crossbeam_channel::unbounded();
+
+        // Start a thread that does CPU-heavy processing in parallel with the
+        // loop below.
+        //
+        // If the parent thread panics, `rev_info_sender` will be dropped and
+        // “disconnected”. `rev_info_receiver` will be notified of this and
+        // exit its own loop.
+        let thread = std::thread::spawn(move || {
+            let mut combine_changeset_copies =
+                CombineChangesetCopies::new(children_count);
+            for (rev, p1, p2, opt_bytes) in rev_info_receiver {
+                let files = match &opt_bytes {
+                    Some(raw) => ChangedFiles::new(raw.as_ref()),
+                    // Python None was extracted to Option::None,
+                    // meaning there was no copy data.
+                    None => ChangedFiles::new_empty(),
+                };
+                combine_changeset_copies.add_revision(rev, p1, p2, files);
+
+                // Send `PyBytes` back to the parent thread so the parent
+                // thread can drop it. Otherwise the GIL would be implicitly
+                // acquired here through `impl Drop for PyBytes`.
+                if let Some(bytes) = opt_bytes {
+                    if pybytes_sender.send(bytes.unwrap()).is_err() {
+                        // The channel is disconnected, meaning the parent
+                        // thread panicked or returned
+                        // early through
+                        // `?` to propagate a Python exception.
+                        break;
+                    }
+                }
+            }
+
+            combine_changeset_copies.finish(target_rev)
+        });
+
+        for rev_info in revs_info {
+            let (rev, p1, p2, opt_bytes) = rev_info?;
+            let opt_bytes = opt_bytes.map(|b| PyBytesDeref::new(py, b));
+
+            // We’d prefer to avoid the child thread calling into Python code,
+            // but this avoids a potential deadlock on the GIL if it does:
+            py.allow_threads(|| {
+                rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
+                    "combine_changeset_copies: channel is disconnected",
+                );
+            });
+
+            // Drop anything in the channel, without blocking
+            pybytes_receiver.try_iter().for_each(drop);
+        }
+        // We’d prefer to avoid the child thread calling into Python code,
+        // but this avoids a potential deadlock on the GIL if it does:
+        path_copies = py.allow_threads(|| {
+            // Disconnect the channel to signal the child thread to stop:
+            // the `for … in rev_info_receiver` loop will end.
+            drop(rev_info_sender);
+
+            // Wait for the child thread to stop, and propagate any panic.
+            thread.join().unwrap_or_else(|panic_payload| {
+                std::panic::resume_unwind(panic_payload)
+            })
+        });
+
+        // Drop anything left in the channel
+        drop(pybytes_receiver)
+    };
+
+    let out = PyDict::new(py);
+    for (dest, source) in path_copies.into_iter() {
+        out.set_item(
+            PyBytes::new(py, &dest.into_vec()),
+            PyBytes::new(py, &source.into_vec()),
+        )?;
+    }
+    Ok(out.into_any().unbind())
+}
+
+pub fn init_module<'py>(
+    py: Python<'py>,
+    package: &str,
+) -> PyResult<Bound<'py, PyModule>> {
+    let m = new_submodule(py, package, "copy_tracing")?;
+    m.add_function(wrap_pyfunction!(combine_changeset_copies_wrapper, &m)?)?;
+    Ok(m)
+}
--- a/rust/hg-pyo3/src/lib.rs	Tue Feb 18 11:48:54 2025 +0100
+++ b/rust/hg-pyo3/src/lib.rs	Tue Feb 18 15:33:26 2025 +0100
@@ -1,6 +1,7 @@
 use pyo3::prelude::*;
 
 mod ancestors;
+mod copy_tracing;
 mod dagops;
 mod dirstate;
 mod discovery;
@@ -26,6 +27,7 @@
     env_logger::init();
 
     m.add_submodule(&ancestors::init_module(py, &dotted_name)?)?;
+    m.add_submodule(&copy_tracing::init_module(py, &dotted_name)?)?;
     m.add_submodule(&dagops::init_module(py, &dotted_name)?)?;
     m.add_submodule(&dirstate::init_module(py, &dotted_name)?)?;
     m.add_submodule(&discovery::init_module(py, &dotted_name)?)?;