Mercurial > public > mercurial-scm > hg
view rust/hg-cpython/src/copy_tracing.rs @ 52411:c2480ac4c5e2
rust-pyo3: retrieving the InnerRevlog of hg-cpython
This allows PyO3-based code to use the InnerRevlog, access its shared data
(core InnerRevlog), which will then allow, e.g., to retrieve references on
the core Index.
On the `hg-cpython` (`rusthg` crate, `rustext` Python extension module),
we had to also build as a Rust library, and open up some accesses (see
notably the public accessor for `inner`, the core `InnerRevlog`).
Retrieving the Rust struct underlying a Python object defined by another
extension module written in Rust is tricky because the Python type objects
are duplicated in the extension modules, leading to failure of the normal
type checking. See the doc-comment of `convert_cpython::extract_inner_revlog`
for a complete explanation.
To solve this, we import the Python type object of `rustext` (defined
by `hg-cpython`) and perform a manual check. Checking the Python type is
necessary, as PyO3 documentation clearly state that downcasting an object
that has not the proper type is Undefined Behaviour.
At this point, we do not have conversion facilities for exceptions (`PyErr`
on both sides), hence the remaining unwraps).
author | Georges Racinet <georges.racinet@cloudcrane.io> |
---|---|
date | Sat, 30 Nov 2024 20:57:02 +0100 |
parents | 4c5f6e95df84 |
children |
line wrap: on
line source
use cpython::ObjectProtocol; use cpython::PyBytes; use cpython::PyDict; use cpython::PyDrop; use cpython::PyList; use cpython::PyModule; use cpython::PyObject; use cpython::PyResult; use cpython::PyTuple; use cpython::Python; use hg::copy_tracing::ChangedFiles; use hg::copy_tracing::CombineChangesetCopies; use hg::Revision; use crate::pybytes_deref::PyBytesDeref; use crate::PyRevision; /// Combines copies information contained into revision `revs` to build a copy /// map. /// /// See mercurial/copies.py for details pub fn combine_changeset_copies_wrapper( py: Python, revs: PyList, children_count: PyDict, target_rev: PyRevision, rev_info: PyObject, multi_thread: bool, ) -> PyResult<PyDict> { let target_rev = Revision(target_rev.0); let children_count = children_count .items(py) .iter() .map(|(k, v)| { Ok((Revision(k.extract::<PyRevision>(py)?.0), v.extract(py)?)) }) .collect::<PyResult<_>>()?; /// (Revision number, parent 1, parent 2, copy data for this revision) type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>); let revs_info = revs.iter(py).map(|rev_py| -> PyResult<RevInfo<PyBytes>> { let rev = Revision(rev_py.extract::<PyRevision>(py)?.0); let tuple: PyTuple = rev_info.call(py, (rev_py,), None)?.cast_into(py)?; let p1 = Revision(tuple.get_item(py, 0).extract::<PyRevision>(py)?.0); let p2 = Revision(tuple.get_item(py, 1).extract::<PyRevision>(py)?.0); let opt_bytes = tuple.get_item(py, 2).extract(py)?; Ok((rev, p1, p2, opt_bytes)) }); let path_copies; if !multi_thread { let mut combine_changeset_copies = CombineChangesetCopies::new(children_count); for rev_info in revs_info { let (rev, p1, p2, opt_bytes) = rev_info?; let files = match &opt_bytes { Some(bytes) => ChangedFiles::new(bytes.data(py)), // Python None was extracted to Option::None, // meaning there was no copy data. None => ChangedFiles::new_empty(), }; combine_changeset_copies.add_revision(rev, p1, p2, files) } path_copies = combine_changeset_copies.finish(target_rev) } else { // Use a bounded channel to provide back-pressure: // if the child thread is slower to process revisions than this thread // is to gather data for them, an unbounded channel would keep // growing and eat memory. // // TODO: tweak the bound? let (rev_info_sender, rev_info_receiver) = crossbeam_channel::bounded::<RevInfo<PyBytesDeref>>(1000); // This channel (going the other way around) however is unbounded. // If they were both bounded, there might potentially be deadlocks // where both channels are full and both threads are waiting on each // other. let (pybytes_sender, pybytes_receiver) = crossbeam_channel::unbounded(); // Start a thread that does CPU-heavy processing in parallel with the // loop below. // // If the parent thread panics, `rev_info_sender` will be dropped and // “disconnected”. `rev_info_receiver` will be notified of this and // exit its own loop. let thread = std::thread::spawn(move || { let mut combine_changeset_copies = CombineChangesetCopies::new(children_count); for (rev, p1, p2, opt_bytes) in rev_info_receiver { let files = match &opt_bytes { Some(raw) => ChangedFiles::new(raw.as_ref()), // Python None was extracted to Option::None, // meaning there was no copy data. None => ChangedFiles::new_empty(), }; combine_changeset_copies.add_revision(rev, p1, p2, files); // Send `PyBytes` back to the parent thread so the parent // thread can drop it. Otherwise the GIL would be implicitly // acquired here through `impl Drop for PyBytes`. if let Some(bytes) = opt_bytes { if pybytes_sender.send(bytes.unwrap()).is_err() { // The channel is disconnected, meaning the parent // thread panicked or returned // early through // `?` to propagate a Python exception. break; } } } combine_changeset_copies.finish(target_rev) }); for rev_info in revs_info { let (rev, p1, p2, opt_bytes) = rev_info?; let opt_bytes = opt_bytes.map(|b| PyBytesDeref::new(py, b)); // We’d prefer to avoid the child thread calling into Python code, // but this avoids a potential deadlock on the GIL if it does: py.allow_threads(|| { rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( "combine_changeset_copies: channel is disconnected", ); }); // Drop anything in the channel, without blocking for pybytes in pybytes_receiver.try_iter() { pybytes.release_ref(py) } } // We’d prefer to avoid the child thread calling into Python code, // but this avoids a potential deadlock on the GIL if it does: path_copies = py.allow_threads(|| { // Disconnect the channel to signal the child thread to stop: // the `for … in rev_info_receiver` loop will end. drop(rev_info_sender); // Wait for the child thread to stop, and propagate any panic. thread.join().unwrap_or_else(|panic_payload| { std::panic::resume_unwind(panic_payload) }) }); // Drop anything left in the channel for pybytes in pybytes_receiver.iter() { pybytes.release_ref(py) } }; let out = PyDict::new(py); for (dest, source) in path_copies.into_iter() { out.set_item( py, PyBytes::new(py, &dest.into_vec()), PyBytes::new(py, &source.into_vec()), )?; } Ok(out) } /// Create the module, with `__package__` given from parent pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> { let dotted_name = &format!("{}.copy_tracing", package); let m = PyModule::new(py, dotted_name)?; m.add(py, "__package__", package)?; m.add(py, "__doc__", "Copy tracing - Rust implementation")?; m.add( py, "combine_changeset_copies", py_fn!( py, combine_changeset_copies_wrapper( revs: PyList, children: PyDict, target_rev: PyRevision, rev_info: PyObject, multi_thread: bool ) ), )?; let sys = PyModule::import(py, "sys")?; let sys_modules: PyDict = sys.get(py, "modules")?.extract(py)?; sys_modules.set_item(py, dotted_name, &m)?; Ok(m) }