comparison rust/hg-pyo3/src/copy_tracing.rs @ 52987:6332e5f857f6

pyo3: transliterate the `copy_tracing` module from `hg-cpython` This is the last module left to migrate to PyO3 from rust-cpython. We will remove the rust-cpython code at the start of the next cycle.
author Rapha?l Gom?s <rgomes@octobus.net>
date Tue, 18 Feb 2025 15:33:26 +0100
parents
children
comparison
equal deleted inserted replaced
52986:1a99bdbdb71b 52987:6332e5f857f6
1 // copy_tracing.rs
2 //
3 // Copyright 2025 Mercurial developers
4 //
5 // This software may be used and distributed according to the terms of the
6 // GNU General Public License version 2 or any later version.
7
8 //! Bindings for the `hg::copy_tracing` module provided by the
9 //! `hg-core` package.
10 //!
11 //! From Python, this will be seen as `mercurial.pyo3_rustext.copy_tracing`
12
13 use hg::copy_tracing::ChangedFiles;
14 use hg::copy_tracing::CombineChangesetCopies;
15 use hg::Revision;
16 use pyo3::types::PyBytes;
17 use pyo3::types::PyDict;
18 use pyo3::types::PyList;
19 use pyo3::types::PyTuple;
20
21 use crate::revision::PyRevision;
22 use crate::utils::new_submodule;
23 use crate::utils::PyBytesDeref;
24
25 use pyo3::prelude::*;
26
27 /// Combines copies information contained into revision `revs` to build a copy
28 /// map.
29 ///
30 /// See mercurial/copies.py for details
31 #[pyfunction]
32 #[pyo3(name = "combine_changeset_copies")]
33 pub fn combine_changeset_copies_wrapper(
34 revs: Bound<'_, PyList>,
35 children_count: Bound<'_, PyDict>,
36 target_rev: PyRevision,
37 rev_info: Bound<'_, PyAny>,
38 multi_thread: bool,
39 ) -> PyResult<PyObject> {
40 let py = revs.py();
41 let target_rev = Revision(target_rev.0);
42 let children_count = children_count
43 .iter()
44 .map(|(k, v)| {
45 Ok((Revision(k.extract::<PyRevision>()?.0), v.extract()?))
46 })
47 .collect::<PyResult<_>>()?;
48
49 /// (Revision number, parent 1, parent 2, copy data for this revision)
50 type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>);
51
52 let revs_info =
53 revs.iter().map(|rev_py| -> PyResult<RevInfo<Py<PyBytes>>> {
54 let rev = Revision(rev_py.extract::<PyRevision>()?.0);
55 let ret = rev_info.call1((rev_py,))?;
56 let tuple: &Bound<'_, PyTuple> = ret.downcast()?;
57 let p1 = Revision(tuple.get_item(0)?.extract::<PyRevision>()?.0);
58 let p2 = Revision(tuple.get_item(1)?.extract::<PyRevision>()?.0);
59 let opt_bytes = tuple.get_item(2)?.extract()?;
60 Ok((rev, p1, p2, opt_bytes))
61 });
62
63 let path_copies;
64 if !multi_thread {
65 let mut combine_changeset_copies =
66 CombineChangesetCopies::new(children_count);
67
68 for rev_info in revs_info {
69 let (rev, p1, p2, opt_bytes) = rev_info?;
70 let files = match &opt_bytes {
71 Some(bytes) => ChangedFiles::new(bytes.as_bytes(py)),
72 // Python None was extracted to Option::None,
73 // meaning there was no copy data.
74 None => ChangedFiles::new_empty(),
75 };
76
77 combine_changeset_copies.add_revision(rev, p1, p2, files)
78 }
79 path_copies = combine_changeset_copies.finish(target_rev)
80 } else {
81 // Use a bounded channel to provide back-pressure:
82 // if the child thread is slower to process revisions than this thread
83 // is to gather data for them, an unbounded channel would keep
84 // growing and eat memory.
85 //
86 // TODO: tweak the bound?
87 let (rev_info_sender, rev_info_receiver) =
88 crossbeam_channel::bounded::<RevInfo<PyBytesDeref>>(1000);
89
90 // This channel (going the other way around) however is unbounded.
91 // If they were both bounded, there might potentially be deadlocks
92 // where both channels are full and both threads are waiting on each
93 // other.
94 let (pybytes_sender, pybytes_receiver) =
95 crossbeam_channel::unbounded();
96
97 // Start a thread that does CPU-heavy processing in parallel with the
98 // loop below.
99 //
100 // If the parent thread panics, `rev_info_sender` will be dropped and
101 // “disconnected”. `rev_info_receiver` will be notified of this and
102 // exit its own loop.
103 let thread = std::thread::spawn(move || {
104 let mut combine_changeset_copies =
105 CombineChangesetCopies::new(children_count);
106 for (rev, p1, p2, opt_bytes) in rev_info_receiver {
107 let files = match &opt_bytes {
108 Some(raw) => ChangedFiles::new(raw.as_ref()),
109 // Python None was extracted to Option::None,
110 // meaning there was no copy data.
111 None => ChangedFiles::new_empty(),
112 };
113 combine_changeset_copies.add_revision(rev, p1, p2, files);
114
115 // Send `PyBytes` back to the parent thread so the parent
116 // thread can drop it. Otherwise the GIL would be implicitly
117 // acquired here through `impl Drop for PyBytes`.
118 if let Some(bytes) = opt_bytes {
119 if pybytes_sender.send(bytes.unwrap()).is_err() {
120 // The channel is disconnected, meaning the parent
121 // thread panicked or returned
122 // early through
123 // `?` to propagate a Python exception.
124 break;
125 }
126 }
127 }
128
129 combine_changeset_copies.finish(target_rev)
130 });
131
132 for rev_info in revs_info {
133 let (rev, p1, p2, opt_bytes) = rev_info?;
134 let opt_bytes = opt_bytes.map(|b| PyBytesDeref::new(py, b));
135
136 // We’d prefer to avoid the child thread calling into Python code,
137 // but this avoids a potential deadlock on the GIL if it does:
138 py.allow_threads(|| {
139 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
140 "combine_changeset_copies: channel is disconnected",
141 );
142 });
143
144 // Drop anything in the channel, without blocking
145 pybytes_receiver.try_iter().for_each(drop);
146 }
147 // We’d prefer to avoid the child thread calling into Python code,
148 // but this avoids a potential deadlock on the GIL if it does:
149 path_copies = py.allow_threads(|| {
150 // Disconnect the channel to signal the child thread to stop:
151 // the `for … in rev_info_receiver` loop will end.
152 drop(rev_info_sender);
153
154 // Wait for the child thread to stop, and propagate any panic.
155 thread.join().unwrap_or_else(|panic_payload| {
156 std::panic::resume_unwind(panic_payload)
157 })
158 });
159
160 // Drop anything left in the channel
161 drop(pybytes_receiver)
162 };
163
164 let out = PyDict::new(py);
165 for (dest, source) in path_copies.into_iter() {
166 out.set_item(
167 PyBytes::new(py, &dest.into_vec()),
168 PyBytes::new(py, &source.into_vec()),
169 )?;
170 }
171 Ok(out.into_any().unbind())
172 }
173
174 pub fn init_module<'py>(
175 py: Python<'py>,
176 package: &str,
177 ) -> PyResult<Bound<'py, PyModule>> {
178 let m = new_submodule(py, package, "copy_tracing")?;
179 m.add_function(wrap_pyfunction!(combine_changeset_copies_wrapper, &m)?)?;
180 Ok(m)
181 }