Mercurial > public > mercurial-scm > hg-stable
comparison rust/hg-pyo3/src/copy_tracing.rs @ 52987:6332e5f857f6
pyo3: transliterate the `copy_tracing` module from `hg-cpython`
This is the last module left to migrate to PyO3 from rust-cpython. We will
remove the rust-cpython code at the start of the next cycle.
author | Rapha?l Gom?s <rgomes@octobus.net> |
---|---|
date | Tue, 18 Feb 2025 15:33:26 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
52986:1a99bdbdb71b | 52987:6332e5f857f6 |
---|---|
1 // copy_tracing.rs | |
2 // | |
3 // Copyright 2025 Mercurial developers | |
4 // | |
5 // This software may be used and distributed according to the terms of the | |
6 // GNU General Public License version 2 or any later version. | |
7 | |
8 //! Bindings for the `hg::copy_tracing` module provided by the | |
9 //! `hg-core` package. | |
10 //! | |
11 //! From Python, this will be seen as `mercurial.pyo3_rustext.copy_tracing` | |
12 | |
13 use hg::copy_tracing::ChangedFiles; | |
14 use hg::copy_tracing::CombineChangesetCopies; | |
15 use hg::Revision; | |
16 use pyo3::types::PyBytes; | |
17 use pyo3::types::PyDict; | |
18 use pyo3::types::PyList; | |
19 use pyo3::types::PyTuple; | |
20 | |
21 use crate::revision::PyRevision; | |
22 use crate::utils::new_submodule; | |
23 use crate::utils::PyBytesDeref; | |
24 | |
25 use pyo3::prelude::*; | |
26 | |
27 /// Combines copies information contained into revision `revs` to build a copy | |
28 /// map. | |
29 /// | |
30 /// See mercurial/copies.py for details | |
31 #[pyfunction] | |
32 #[pyo3(name = "combine_changeset_copies")] | |
33 pub fn combine_changeset_copies_wrapper( | |
34 revs: Bound<'_, PyList>, | |
35 children_count: Bound<'_, PyDict>, | |
36 target_rev: PyRevision, | |
37 rev_info: Bound<'_, PyAny>, | |
38 multi_thread: bool, | |
39 ) -> PyResult<PyObject> { | |
40 let py = revs.py(); | |
41 let target_rev = Revision(target_rev.0); | |
42 let children_count = children_count | |
43 .iter() | |
44 .map(|(k, v)| { | |
45 Ok((Revision(k.extract::<PyRevision>()?.0), v.extract()?)) | |
46 }) | |
47 .collect::<PyResult<_>>()?; | |
48 | |
49 /// (Revision number, parent 1, parent 2, copy data for this revision) | |
50 type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>); | |
51 | |
52 let revs_info = | |
53 revs.iter().map(|rev_py| -> PyResult<RevInfo<Py<PyBytes>>> { | |
54 let rev = Revision(rev_py.extract::<PyRevision>()?.0); | |
55 let ret = rev_info.call1((rev_py,))?; | |
56 let tuple: &Bound<'_, PyTuple> = ret.downcast()?; | |
57 let p1 = Revision(tuple.get_item(0)?.extract::<PyRevision>()?.0); | |
58 let p2 = Revision(tuple.get_item(1)?.extract::<PyRevision>()?.0); | |
59 let opt_bytes = tuple.get_item(2)?.extract()?; | |
60 Ok((rev, p1, p2, opt_bytes)) | |
61 }); | |
62 | |
63 let path_copies; | |
64 if !multi_thread { | |
65 let mut combine_changeset_copies = | |
66 CombineChangesetCopies::new(children_count); | |
67 | |
68 for rev_info in revs_info { | |
69 let (rev, p1, p2, opt_bytes) = rev_info?; | |
70 let files = match &opt_bytes { | |
71 Some(bytes) => ChangedFiles::new(bytes.as_bytes(py)), | |
72 // Python None was extracted to Option::None, | |
73 // meaning there was no copy data. | |
74 None => ChangedFiles::new_empty(), | |
75 }; | |
76 | |
77 combine_changeset_copies.add_revision(rev, p1, p2, files) | |
78 } | |
79 path_copies = combine_changeset_copies.finish(target_rev) | |
80 } else { | |
81 // Use a bounded channel to provide back-pressure: | |
82 // if the child thread is slower to process revisions than this thread | |
83 // is to gather data for them, an unbounded channel would keep | |
84 // growing and eat memory. | |
85 // | |
86 // TODO: tweak the bound? | |
87 let (rev_info_sender, rev_info_receiver) = | |
88 crossbeam_channel::bounded::<RevInfo<PyBytesDeref>>(1000); | |
89 | |
90 // This channel (going the other way around) however is unbounded. | |
91 // If they were both bounded, there might potentially be deadlocks | |
92 // where both channels are full and both threads are waiting on each | |
93 // other. | |
94 let (pybytes_sender, pybytes_receiver) = | |
95 crossbeam_channel::unbounded(); | |
96 | |
97 // Start a thread that does CPU-heavy processing in parallel with the | |
98 // loop below. | |
99 // | |
100 // If the parent thread panics, `rev_info_sender` will be dropped and | |
101 // “disconnected”. `rev_info_receiver` will be notified of this and | |
102 // exit its own loop. | |
103 let thread = std::thread::spawn(move || { | |
104 let mut combine_changeset_copies = | |
105 CombineChangesetCopies::new(children_count); | |
106 for (rev, p1, p2, opt_bytes) in rev_info_receiver { | |
107 let files = match &opt_bytes { | |
108 Some(raw) => ChangedFiles::new(raw.as_ref()), | |
109 // Python None was extracted to Option::None, | |
110 // meaning there was no copy data. | |
111 None => ChangedFiles::new_empty(), | |
112 }; | |
113 combine_changeset_copies.add_revision(rev, p1, p2, files); | |
114 | |
115 // Send `PyBytes` back to the parent thread so the parent | |
116 // thread can drop it. Otherwise the GIL would be implicitly | |
117 // acquired here through `impl Drop for PyBytes`. | |
118 if let Some(bytes) = opt_bytes { | |
119 if pybytes_sender.send(bytes.unwrap()).is_err() { | |
120 // The channel is disconnected, meaning the parent | |
121 // thread panicked or returned | |
122 // early through | |
123 // `?` to propagate a Python exception. | |
124 break; | |
125 } | |
126 } | |
127 } | |
128 | |
129 combine_changeset_copies.finish(target_rev) | |
130 }); | |
131 | |
132 for rev_info in revs_info { | |
133 let (rev, p1, p2, opt_bytes) = rev_info?; | |
134 let opt_bytes = opt_bytes.map(|b| PyBytesDeref::new(py, b)); | |
135 | |
136 // We’d prefer to avoid the child thread calling into Python code, | |
137 // but this avoids a potential deadlock on the GIL if it does: | |
138 py.allow_threads(|| { | |
139 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | |
140 "combine_changeset_copies: channel is disconnected", | |
141 ); | |
142 }); | |
143 | |
144 // Drop anything in the channel, without blocking | |
145 pybytes_receiver.try_iter().for_each(drop); | |
146 } | |
147 // We’d prefer to avoid the child thread calling into Python code, | |
148 // but this avoids a potential deadlock on the GIL if it does: | |
149 path_copies = py.allow_threads(|| { | |
150 // Disconnect the channel to signal the child thread to stop: | |
151 // the `for … in rev_info_receiver` loop will end. | |
152 drop(rev_info_sender); | |
153 | |
154 // Wait for the child thread to stop, and propagate any panic. | |
155 thread.join().unwrap_or_else(|panic_payload| { | |
156 std::panic::resume_unwind(panic_payload) | |
157 }) | |
158 }); | |
159 | |
160 // Drop anything left in the channel | |
161 drop(pybytes_receiver) | |
162 }; | |
163 | |
164 let out = PyDict::new(py); | |
165 for (dest, source) in path_copies.into_iter() { | |
166 out.set_item( | |
167 PyBytes::new(py, &dest.into_vec()), | |
168 PyBytes::new(py, &source.into_vec()), | |
169 )?; | |
170 } | |
171 Ok(out.into_any().unbind()) | |
172 } | |
173 | |
174 pub fn init_module<'py>( | |
175 py: Python<'py>, | |
176 package: &str, | |
177 ) -> PyResult<Bound<'py, PyModule>> { | |
178 let m = new_submodule(py, package, "copy_tracing")?; | |
179 m.add_function(wrap_pyfunction!(combine_changeset_copies_wrapper, &m)?)?; | |
180 Ok(m) | |
181 } |