rust/hg-cpython/src/copy_tracing.rs
changeset 46588 47557ea79fc7
parent 46587 cb4b0b0c6de4
child 46589 620c88fb42a2
equal deleted inserted replaced
46587:cb4b0b0c6de4 46588:47557ea79fc7
    20     py: Python,
    20     py: Python,
    21     revs: PyList,
    21     revs: PyList,
    22     children_count: PyDict,
    22     children_count: PyDict,
    23     target_rev: Revision,
    23     target_rev: Revision,
    24     rev_info: PyObject,
    24     rev_info: PyObject,
       
    25     multi_thread: bool,
    25 ) -> PyResult<PyDict> {
    26 ) -> PyResult<PyDict> {
    26     let children_count = children_count
    27     let children_count = children_count
    27         .items(py)
    28         .items(py)
    28         .iter()
    29         .iter()
    29         .map(|(k, v)| Ok((k.extract(py)?, v.extract(py)?)))
    30         .map(|(k, v)| Ok((k.extract(py)?, v.extract(py)?)))
    40         let p2 = tuple.get_item(py, 1).extract(py)?;
    41         let p2 = tuple.get_item(py, 1).extract(py)?;
    41         let opt_bytes = tuple.get_item(py, 2).extract(py)?;
    42         let opt_bytes = tuple.get_item(py, 2).extract(py)?;
    42         Ok((rev, p1, p2, opt_bytes))
    43         Ok((rev, p1, p2, opt_bytes))
    43     });
    44     });
    44 
    45 
    45     let mut combine_changeset_copies =
    46     let path_copies = if !multi_thread {
    46         CombineChangesetCopies::new(children_count);
    47         let mut combine_changeset_copies =
       
    48             CombineChangesetCopies::new(children_count);
    47 
    49 
    48     for rev_info in revs_info {
    50         for rev_info in revs_info {
    49         let (rev, p1, p2, opt_bytes) = rev_info?;
    51             let (rev, p1, p2, opt_bytes) = rev_info?;
    50         let files = match &opt_bytes {
    52             let files = match &opt_bytes {
    51             Some(bytes) => ChangedFiles::new(bytes.data(py)),
    53                 Some(bytes) => ChangedFiles::new(bytes.data(py)),
    52             // value was presumably None, meaning they was no copy data.
    54                 // Python None was extracted to Option::None,
    53             None => ChangedFiles::new_empty(),
    55                 // meaning there was no copy data.
    54         };
    56                 None => ChangedFiles::new_empty(),
       
    57             };
    55 
    58 
    56         combine_changeset_copies.add_revision(rev, p1, p2, files)
    59             combine_changeset_copies.add_revision(rev, p1, p2, files)
    57     }
    60         }
    58     let path_copies = combine_changeset_copies.finish(target_rev);
    61         combine_changeset_copies.finish(target_rev)
       
    62     } else {
       
    63         // Use a bounded channel to provide back-pressure:
       
    64         // if the child thread is slower to process revisions than this thread
       
    65         // is to gather data for them, an unbounded channel would keep
       
    66         // growing and eat memory.
       
    67         //
       
    68         // TODO: tweak the bound?
       
    69         let (rev_info_sender, rev_info_receiver) =
       
    70             crossbeam_channel::bounded::<RevInfo>(1000);
       
    71 
       
    72         // Start a thread that does CPU-heavy processing in parallel with the
       
    73         // loop below.
       
    74         //
       
    75         // If the parent thread panics, `rev_info_sender` will be dropped and
       
    76         // “disconnected”. `rev_info_receiver` will be notified of this and
       
    77         // exit its own loop.
       
    78         let thread = std::thread::spawn(move || {
       
    79             let mut combine_changeset_copies =
       
    80                 CombineChangesetCopies::new(children_count);
       
    81             for (rev, p1, p2, opt_bytes) in rev_info_receiver {
       
    82                 let gil = Python::acquire_gil();
       
    83                 let py = gil.python();
       
    84                 let files = match &opt_bytes {
       
    85                     Some(raw) => ChangedFiles::new(raw.data(py)),
       
    86                     // Python None was extracted to Option::None,
       
    87                     // meaning there was no copy data.
       
    88                     None => ChangedFiles::new_empty(),
       
    89                 };
       
    90                 combine_changeset_copies.add_revision(rev, p1, p2, files)
       
    91             }
       
    92 
       
    93             combine_changeset_copies.finish(target_rev)
       
    94         });
       
    95 
       
    96         for rev_info in revs_info {
       
    97             let (rev, p1, p2, opt_bytes) = rev_info?;
       
    98 
       
    99             // We’d prefer to avoid the child thread calling into Python code,
       
   100             // but this avoids a potential deadlock on the GIL if it does:
       
   101             py.allow_threads(|| {
       
   102                 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
       
   103                     "combine_changeset_copies: channel is disconnected",
       
   104                 );
       
   105             });
       
   106         }
       
   107         // We’d prefer to avoid the child thread calling into Python code,
       
   108         // but this avoids a potential deadlock on the GIL if it does:
       
   109         py.allow_threads(|| {
       
   110             // Disconnect the channel to signal the child thread to stop:
       
   111             // the `for … in rev_info_receiver` loop will end.
       
   112             drop(rev_info_sender);
       
   113 
       
   114             // Wait for the child thread to stop, and propagate any panic.
       
   115             thread.join().unwrap_or_else(|panic_payload| {
       
   116                 std::panic::resume_unwind(panic_payload)
       
   117             })
       
   118         })
       
   119     };
       
   120 
    59     let out = PyDict::new(py);
   121     let out = PyDict::new(py);
    60     for (dest, source) in path_copies.into_iter() {
   122     for (dest, source) in path_copies.into_iter() {
    61         out.set_item(
   123         out.set_item(
    62             py,
   124             py,
    63             PyBytes::new(py, &dest.into_vec()),
   125             PyBytes::new(py, &dest.into_vec()),
    82             py,
   144             py,
    83             combine_changeset_copies_wrapper(
   145             combine_changeset_copies_wrapper(
    84                 revs: PyList,
   146                 revs: PyList,
    85                 children: PyDict,
   147                 children: PyDict,
    86                 target_rev: Revision,
   148                 target_rev: Revision,
    87                 rev_info: PyObject
   149                 rev_info: PyObject,
       
   150                 multi_thread: bool
    88             )
   151             )
    89         ),
   152         ),
    90     )?;
   153     )?;
    91 
   154 
    92     let sys = PyModule::import(py, "sys")?;
   155     let sys = PyModule::import(py, "sys")?;