40 let p2 = tuple.get_item(py, 1).extract(py)?; |
41 let p2 = tuple.get_item(py, 1).extract(py)?; |
41 let opt_bytes = tuple.get_item(py, 2).extract(py)?; |
42 let opt_bytes = tuple.get_item(py, 2).extract(py)?; |
42 Ok((rev, p1, p2, opt_bytes)) |
43 Ok((rev, p1, p2, opt_bytes)) |
43 }); |
44 }); |
44 |
45 |
45 let mut combine_changeset_copies = |
46 let path_copies = if !multi_thread { |
46 CombineChangesetCopies::new(children_count); |
47 let mut combine_changeset_copies = |
|
48 CombineChangesetCopies::new(children_count); |
47 |
49 |
48 for rev_info in revs_info { |
50 for rev_info in revs_info { |
49 let (rev, p1, p2, opt_bytes) = rev_info?; |
51 let (rev, p1, p2, opt_bytes) = rev_info?; |
50 let files = match &opt_bytes { |
52 let files = match &opt_bytes { |
51 Some(bytes) => ChangedFiles::new(bytes.data(py)), |
53 Some(bytes) => ChangedFiles::new(bytes.data(py)), |
52 // value was presumably None, meaning they was no copy data. |
54 // Python None was extracted to Option::None, |
53 None => ChangedFiles::new_empty(), |
55 // meaning there was no copy data. |
54 }; |
56 None => ChangedFiles::new_empty(), |
|
57 }; |
55 |
58 |
56 combine_changeset_copies.add_revision(rev, p1, p2, files) |
59 combine_changeset_copies.add_revision(rev, p1, p2, files) |
57 } |
60 } |
58 let path_copies = combine_changeset_copies.finish(target_rev); |
61 combine_changeset_copies.finish(target_rev) |
|
62 } else { |
|
63 // Use a bounded channel to provide back-pressure: |
|
64 // if the child thread is slower to process revisions than this thread |
|
65 // is to gather data for them, an unbounded channel would keep |
|
66 // growing and eat memory. |
|
67 // |
|
68 // TODO: tweak the bound? |
|
69 let (rev_info_sender, rev_info_receiver) = |
|
70 crossbeam_channel::bounded::<RevInfo>(1000); |
|
71 |
|
72 // Start a thread that does CPU-heavy processing in parallel with the |
|
73 // loop below. |
|
74 // |
|
75 // If the parent thread panics, `rev_info_sender` will be dropped and |
|
76 // “disconnected”. `rev_info_receiver` will be notified of this and |
|
77 // exit its own loop. |
|
78 let thread = std::thread::spawn(move || { |
|
79 let mut combine_changeset_copies = |
|
80 CombineChangesetCopies::new(children_count); |
|
81 for (rev, p1, p2, opt_bytes) in rev_info_receiver { |
|
82 let gil = Python::acquire_gil(); |
|
83 let py = gil.python(); |
|
84 let files = match &opt_bytes { |
|
85 Some(raw) => ChangedFiles::new(raw.data(py)), |
|
86 // Python None was extracted to Option::None, |
|
87 // meaning there was no copy data. |
|
88 None => ChangedFiles::new_empty(), |
|
89 }; |
|
90 combine_changeset_copies.add_revision(rev, p1, p2, files) |
|
91 } |
|
92 |
|
93 combine_changeset_copies.finish(target_rev) |
|
94 }); |
|
95 |
|
96 for rev_info in revs_info { |
|
97 let (rev, p1, p2, opt_bytes) = rev_info?; |
|
98 |
|
99 // We’d prefer to avoid the child thread calling into Python code, |
|
100 // but this avoids a potential deadlock on the GIL if it does: |
|
101 py.allow_threads(|| { |
|
102 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( |
|
103 "combine_changeset_copies: channel is disconnected", |
|
104 ); |
|
105 }); |
|
106 } |
|
107 // We’d prefer to avoid the child thread calling into Python code, |
|
108 // but this avoids a potential deadlock on the GIL if it does: |
|
109 py.allow_threads(|| { |
|
110 // Disconnect the channel to signal the child thread to stop: |
|
111 // the `for … in rev_info_receiver` loop will end. |
|
112 drop(rev_info_sender); |
|
113 |
|
114 // Wait for the child thread to stop, and propagate any panic. |
|
115 thread.join().unwrap_or_else(|panic_payload| { |
|
116 std::panic::resume_unwind(panic_payload) |
|
117 }) |
|
118 }) |
|
119 }; |
|
120 |
59 let out = PyDict::new(py); |
121 let out = PyDict::new(py); |
60 for (dest, source) in path_copies.into_iter() { |
122 for (dest, source) in path_copies.into_iter() { |
61 out.set_item( |
123 out.set_item( |
62 py, |
124 py, |
63 PyBytes::new(py, &dest.into_vec()), |
125 PyBytes::new(py, &dest.into_vec()), |