Mercurial > public > mercurial-scm > hg
diff rust/hg-core/src/revlog/mod.rs @ 52160:e01e84e5e426
rust-revlog: add a Rust-only `InnerRevlog`
This mirrors the Python `InnerRevlog` and will be used in a future patch
to replace said Python implementation. This allows us to start doing more
things in pure Rust, in particular reading and writing operations.
A lot of changes have to be introduced all at once, it wouldn't be very
useful to separate this patch IMO since all of them are either interlocked
or only useful with the rest.
author | Rapha?l Gom?s <rgomes@octobus.net> |
---|---|
date | Thu, 10 Oct 2024 10:34:51 +0200 |
parents | 039b7caeb4d9 |
children | 84b5802ba7d3 |
line wrap: on
line diff
--- a/rust/hg-core/src/revlog/mod.rs Thu Oct 10 10:38:35 2024 +0200 +++ b/rust/hg-core/src/revlog/mod.rs Thu Oct 10 10:34:51 2024 +0200 @@ -9,7 +9,10 @@ pub mod nodemap; mod nodemap_docket; pub mod path_encode; -use compression::uncompressed_zstd_data; +use inner_revlog::CoreRevisionBuffer; +use inner_revlog::InnerRevlog; +use inner_revlog::RevisionBuffer; +use memmap2::MmapOptions; pub use node::{FromHexError, Node, NodePrefix}; use options::RevlogOpenOptions; pub mod changelog; @@ -17,25 +20,25 @@ pub mod file_io; pub mod filelog; pub mod index; +pub mod inner_revlog; pub mod manifest; pub mod options; pub mod patch; use std::borrow::Cow; +use std::io::ErrorKind; use std::io::Read; use std::ops::Deref; use std::path::Path; -use flate2::read::ZlibDecoder; -use sha1::{Digest, Sha1}; - -use self::node::{NODE_BYTES_LENGTH, NULL_NODE}; +use self::node::NULL_NODE; use self::nodemap_docket::NodeMapDocket; use super::index::Index; -use super::index::INDEX_ENTRY_SIZE; use super::nodemap::{NodeMap, NodeMapError}; use crate::errors::HgError; +use crate::errors::IoResultExt; use crate::exit_codes; +use crate::vfs::Vfs; use crate::vfs::VfsImpl; /// As noted in revlog.c, revision numbers are actually encoded in @@ -256,24 +259,17 @@ } } -/// Read only implementation of revlog. pub struct Revlog { - /// When index and data are not interleaved: bytes of the revlog index. - /// When index and data are interleaved: bytes of the revlog index and - /// data. - index: Index, - /// When index and data are not interleaved: bytes of the revlog data - data_bytes: Option<Box<dyn Deref<Target = [u8]> + Send>>, + inner: InnerRevlog, /// When present on disk: the persistent nodemap for this revlog nodemap: Option<nodemap::NodeTree>, } impl Graph for Revlog { fn parents(&self, rev: Revision) -> Result<[Revision; 2], GraphError> { - self.index.parents(rev) + self.index().parents(rev) } } - impl Revlog { /// Open a revlog index file. /// @@ -289,6 +285,10 @@ Self::open_gen(store_vfs, index_path, data_path, options, None) } + fn index(&self) -> &Index { + &self.inner.index + } + fn open_gen( // Todo use the `Vfs` trait here once we create a function for mmap store_vfs: &VfsImpl, @@ -298,37 +298,10 @@ nodemap_for_test: Option<nodemap::NodeTree>, ) -> Result<Self, HgError> { let index_path = index_path.as_ref(); - let index = { - match store_vfs.mmap_open_opt(index_path)? { - None => Index::new( - Box::<Vec<_>>::default(), - options.index_header(), - ), - Some(index_mmap) => { - let index = Index::new( - Box::new(index_mmap), - options.index_header(), - )?; - Ok(index) - } - } - }?; + let index = open_index(store_vfs, index_path, options)?; let default_data_path = index_path.with_extension("d"); - - // type annotation required - // won't recognize Mmap as Deref<Target = [u8]> - let data_bytes: Option<Box<dyn Deref<Target = [u8]> + Send>> = - if index.is_inline() { - None - } else if index.is_empty() { - // No need to even try to open the data file then. - Some(Box::new(&[][..])) - } else { - let data_path = data_path.unwrap_or(&default_data_path); - let data_mmap = store_vfs.mmap_open(data_path)?; - Some(Box::new(data_mmap)) - }; + let data_path = data_path.unwrap_or(&default_data_path); let nodemap = if index.is_inline() || !options.use_nodemap { None @@ -346,20 +319,27 @@ let nodemap = nodemap_for_test.or(nodemap); Ok(Revlog { - index, - data_bytes, + inner: InnerRevlog::new( + Box::new(store_vfs.clone()), + index, + index_path.to_path_buf(), + data_path.to_path_buf(), + options.data_config, + options.delta_config, + options.feature_config, + ), nodemap, }) } /// Return number of entries of the `Revlog`. pub fn len(&self) -> usize { - self.index.len() + self.index().len() } /// Returns `true` if the `Revlog` has zero `entries`. pub fn is_empty(&self) -> bool { - self.index.is_empty() + self.index().is_empty() } /// Returns the node ID for the given revision number, if it exists in this @@ -368,8 +348,8 @@ if rev == NULL_REVISION.into() { return Some(&NULL_NODE); } - let rev = self.index.check_revision(rev)?; - Some(self.index.get_entry(rev)?.hash()) + let rev = self.index().check_revision(rev)?; + Some(self.index().get_entry(rev)?.hash()) } /// Return the revision number for the given node ID, if it exists in this @@ -380,7 +360,7 @@ ) -> Result<Revision, RevlogError> { if let Some(nodemap) = &self.nodemap { nodemap - .find_bin(&self.index, node)? + .find_bin(self.index(), node)? .ok_or(RevlogError::InvalidRevision(format!("{:x}", node))) } else { self.rev_from_node_no_persistent_nodemap(node) @@ -406,7 +386,7 @@ NULL_NODE } else { let index_entry = - self.index.get_entry(rev).ok_or_else(|| { + self.index().get_entry(rev).ok_or_else(|| { HgError::corrupted( "revlog references a revision not in the index", ) @@ -429,7 +409,21 @@ /// Returns whether the given revision exists in this revlog. pub fn has_rev(&self, rev: UncheckedRevision) -> bool { - self.index.check_revision(rev).is_some() + self.index().check_revision(rev).is_some() + } + + pub fn get_entry_for_checked_rev( + &self, + rev: Revision, + ) -> Result<RevlogEntry, RevlogError> { + self.inner.get_entry_for_checked_rev(rev) + } + + pub fn get_entry( + &self, + rev: UncheckedRevision, + ) -> Result<RevlogEntry, RevlogError> { + self.inner.get_entry(rev) } /// Return the full data associated to a revision. @@ -466,153 +460,93 @@ expected: &[u8], data: &[u8], ) -> bool { - let e1 = self.index.get_entry(p1); - let h1 = match e1 { - Some(ref entry) => entry.hash(), - None => &NULL_NODE, - }; - let e2 = self.index.get_entry(p2); - let h2 = match e2 { - Some(ref entry) => entry.hash(), - None => &NULL_NODE, - }; - - hash(data, h1.as_bytes(), h2.as_bytes()) == expected + self.inner.check_hash(p1, p2, expected, data) } /// Build the full data of a revision out its snapshot /// and its deltas. - fn build_data_from_deltas( - snapshot: RevlogEntry, - deltas: &[RevlogEntry], - ) -> Result<Vec<u8>, HgError> { - let snapshot = snapshot.data_chunk()?; - let deltas = deltas - .iter() - .rev() - .map(RevlogEntry::data_chunk) - .collect::<Result<Vec<_>, _>>()?; - let patches: Vec<_> = - deltas.iter().map(|d| patch::PatchList::new(d)).collect(); - let patch = patch::fold_patch_lists(&patches); - Ok(patch.apply(&snapshot)) - } - - /// Return the revlog data. - fn data(&self) -> &[u8] { - match &self.data_bytes { - Some(data_bytes) => data_bytes, - None => panic!( - "forgot to load the data or trying to access inline data" - ), - } - } - - pub fn make_null_entry(&self) -> RevlogEntry { - RevlogEntry { - revlog: self, - rev: NULL_REVISION, - bytes: b"", - compressed_len: 0, - uncompressed_len: 0, - base_rev_or_base_of_delta_chain: None, - p1: NULL_REVISION, - p2: NULL_REVISION, - flags: NULL_REVLOG_ENTRY_FLAGS, - hash: NULL_NODE, - } - } - - fn get_entry_for_checked_rev( - &self, - rev: Revision, - ) -> Result<RevlogEntry, RevlogError> { - if rev == NULL_REVISION { - return Ok(self.make_null_entry()); + fn build_data_from_deltas<T>( + buffer: &mut dyn RevisionBuffer<Target = T>, + snapshot: &[u8], + deltas: &[impl AsRef<[u8]>], + ) -> Result<(), RevlogError> { + if deltas.is_empty() { + buffer.extend_from_slice(snapshot); + return Ok(()); } - let index_entry = self - .index - .get_entry(rev) - .ok_or(RevlogError::InvalidRevision(rev.to_string()))?; - let offset = index_entry.offset(); - let start = if self.index.is_inline() { - offset + ((rev.0 as usize + 1) * INDEX_ENTRY_SIZE) - } else { - offset - }; - let end = start + index_entry.compressed_len() as usize; - let data = if self.index.is_inline() { - self.index.data(start, end) - } else { - &self.data()[start..end] - }; - let base_rev = self - .index - .check_revision(index_entry.base_revision_or_base_of_delta_chain()) - .ok_or_else(|| { - RevlogError::corrupted(format!( - "base revision for rev {} is invalid", - rev - )) - })?; - let p1 = - self.index.check_revision(index_entry.p1()).ok_or_else(|| { - RevlogError::corrupted(format!( - "p1 for rev {} is invalid", - rev - )) - })?; - let p2 = - self.index.check_revision(index_entry.p2()).ok_or_else(|| { - RevlogError::corrupted(format!( - "p2 for rev {} is invalid", - rev - )) - })?; - let entry = RevlogEntry { - revlog: self, - rev, - bytes: data, - compressed_len: index_entry.compressed_len(), - uncompressed_len: index_entry.uncompressed_len(), - base_rev_or_base_of_delta_chain: if base_rev == rev { + let patches: Result<Vec<_>, _> = deltas + .iter() + .map(|d| patch::PatchList::new(d.as_ref())) + .collect(); + let patch = patch::fold_patch_lists(&patches?); + patch.apply(buffer, snapshot); + Ok(()) + } +} + +type IndexData = Box<dyn Deref<Target = [u8]> + Send + Sync>; + +/// Open the revlog [`Index`] at `index_path`, through the `store_vfs` and the +/// given `options`. This controls whether (and how) we `mmap` the index file, +/// and returns an empty buffer if the index does not exist on disk. +/// This is only used when doing pure-Rust work, in Python contexts this is +/// unused at the time of writing. +pub fn open_index( + store_vfs: &impl Vfs, + index_path: &Path, + options: RevlogOpenOptions, +) -> Result<Index, HgError> { + let buf: IndexData = match store_vfs.open_read(index_path) { + Ok(mut file) => { + let mut buf = if let Some(threshold) = + options.data_config.mmap_index_threshold + { + let size = store_vfs.file_size(&file)?; + if size >= threshold { + // Safety is "enforced" by locks and assuming other + // processes are well-behaved. If any misbehaving or + // malicious process does touch the index, it could lead + // to corruption. This is somewhat inherent to file-based + // `mmap`, though some platforms have some ways of + // mitigating. + // TODO linux: set the immutable flag with `chattr(1)`? + let mmap = unsafe { MmapOptions::new().map(&file) } + .when_reading_file(index_path)?; + Some(Box::new(mmap) as IndexData) + } else { + None + } + } else { None - } else { - Some(base_rev) - }, - p1, - p2, - flags: index_entry.flags(), - hash: *index_entry.hash(), - }; - Ok(entry) - } + }; - /// Get an entry of the revlog. - pub fn get_entry( - &self, - rev: UncheckedRevision, - ) -> Result<RevlogEntry, RevlogError> { - if rev == NULL_REVISION.into() { - return Ok(self.make_null_entry()); + if buf.is_none() { + let mut data = vec![]; + file.read_to_end(&mut data).when_reading_file(index_path)?; + buf = Some(Box::new(data) as IndexData); + } + buf.unwrap() } - let rev = self.index.check_revision(rev).ok_or_else(|| { - RevlogError::corrupted(format!("rev {} is invalid", rev)) - })?; - self.get_entry_for_checked_rev(rev) - } + Err(err) => match err { + HgError::IoError { error, context } => match error.kind() { + ErrorKind::NotFound => Box::<Vec<u8>>::default(), + _ => return Err(HgError::IoError { error, context }), + }, + e => return Err(e), + }, + }; + + let index = Index::new(buf, options.index_header())?; + Ok(index) } /// The revlog entry's bytes and the necessary informations to extract /// the entry's data. #[derive(Clone)] pub struct RevlogEntry<'revlog> { - revlog: &'revlog Revlog, + revlog: &'revlog InnerRevlog, rev: Revision, - bytes: &'revlog [u8], - compressed_len: u32, uncompressed_len: i32, - base_rev_or_base_of_delta_chain: Option<Revision>, p1: Revision, p2: Revision, flags: u16, @@ -683,33 +617,47 @@ } /// The data for this entry, after resolving deltas if any. - pub fn rawdata(&self) -> Result<Cow<'revlog, [u8]>, RevlogError> { - let mut entry = self.clone(); - let mut delta_chain = vec![]; + /// Non-Python callers should probably call [`Self::data`] instead. + fn rawdata<G, T>( + &self, + stop_rev: Option<(Revision, &[u8])>, + with_buffer: G, + ) -> Result<(), RevlogError> + where + G: FnOnce( + usize, + &mut dyn FnMut( + &mut dyn RevisionBuffer<Target = T>, + ) -> Result<(), RevlogError>, + ) -> Result<(), RevlogError>, + { + let (delta_chain, stopped) = self + .revlog + .delta_chain(self.revision(), stop_rev.map(|(r, _)| r))?; + let target_size = + self.uncompressed_len().map(|raw_size| 4 * raw_size as u64); - // The meaning of `base_rev_or_base_of_delta_chain` depends on - // generaldelta. See the doc on `ENTRY_DELTA_BASE` in - // `mercurial/revlogutils/constants.py` and the code in - // [_chaininfo] and in [index_deltachain]. - let uses_generaldelta = self.revlog.index.uses_generaldelta(); - while let Some(base_rev) = entry.base_rev_or_base_of_delta_chain { - entry = if uses_generaldelta { - delta_chain.push(entry); - self.revlog.get_entry_for_checked_rev(base_rev)? - } else { - let base_rev = UncheckedRevision(entry.rev.0 - 1); - delta_chain.push(entry); - self.revlog.get_entry(base_rev)? - }; - } + let deltas = self.revlog.chunks(delta_chain, target_size)?; - let data = if delta_chain.is_empty() { - entry.data_chunk()? + let (base_text, deltas) = if stopped { + ( + stop_rev.as_ref().expect("last revision should be cached").1, + &deltas[..], + ) } else { - Revlog::build_data_from_deltas(entry, &delta_chain)?.into() + let (buf, deltas) = deltas.split_at(1); + (buf[0].as_ref(), deltas) }; - Ok(data) + let size = self + .uncompressed_len() + .map(|l| l as usize) + .unwrap_or(base_text.len()); + with_buffer(size, &mut |buf| { + Revlog::build_data_from_deltas(buf, base_text, deltas)?; + Ok(()) + })?; + Ok(()) } fn check_data( @@ -739,86 +687,23 @@ } pub fn data(&self) -> Result<Cow<'revlog, [u8]>, RevlogError> { - let data = self.rawdata()?; + // TODO figure out if there is ever a need for `Cow` here anymore. + let mut data = CoreRevisionBuffer::new(); if self.rev == NULL_REVISION { - return Ok(data); + return Ok(data.finish().into()); } + self.rawdata(None, |size, f| { + // Pre-allocate the expected size (received from the index) + data.resize(size); + // Actually fill the buffer + f(&mut data)?; + Ok(()) + })?; if self.is_censored() { return Err(HgError::CensoredNodeError.into()); } - self.check_data(data) - } - - /// Extract the data contained in the entry. - /// This may be a delta. (See `is_delta`.) - fn data_chunk(&self) -> Result<Cow<'revlog, [u8]>, HgError> { - if self.bytes.is_empty() { - return Ok(Cow::Borrowed(&[])); - } - match self.bytes[0] { - // Revision data is the entirety of the entry, including this - // header. - b'\0' => Ok(Cow::Borrowed(self.bytes)), - // Raw revision data follows. - b'u' => Ok(Cow::Borrowed(&self.bytes[1..])), - // zlib (RFC 1950) data. - b'x' => Ok(Cow::Owned(self.uncompressed_zlib_data()?)), - // zstd data. - b'\x28' => Ok(Cow::Owned(uncompressed_zstd_data( - self.bytes, - self.is_delta(), - self.uncompressed_len.max(0), - )?)), - // A proper new format should have had a repo/store requirement. - format_type => Err(corrupted(format!( - "unknown compression header '{}'", - format_type - ))), - } + self.check_data(data.finish().into()) } - - fn uncompressed_zlib_data(&self) -> Result<Vec<u8>, HgError> { - let mut decoder = ZlibDecoder::new(self.bytes); - if self.is_delta() { - let mut buf = Vec::with_capacity(self.compressed_len as usize); - decoder - .read_to_end(&mut buf) - .map_err(|e| corrupted(e.to_string()))?; - Ok(buf) - } else { - let cap = self.uncompressed_len.max(0) as usize; - let mut buf = vec![0; cap]; - decoder - .read_exact(&mut buf) - .map_err(|e| corrupted(e.to_string()))?; - Ok(buf) - } - } - - /// Tell if the entry is a snapshot or a delta - /// (influences on decompression). - fn is_delta(&self) -> bool { - self.base_rev_or_base_of_delta_chain.is_some() - } -} - -/// Calculate the hash of a revision given its data and its parents. -fn hash( - data: &[u8], - p1_hash: &[u8], - p2_hash: &[u8], -) -> [u8; NODE_BYTES_LENGTH] { - let mut hasher = Sha1::new(); - let (a, b) = (p1_hash, p2_hash); - if a > b { - hasher.update(b); - hasher.update(a); - } else { - hasher.update(a); - hasher.update(b); - } - hasher.update(data); - *hasher.finalize().as_ref() } #[cfg(test)]