diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 290a887b2960..ffa9c4c9a62b 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -116,7 +116,7 @@ pub struct BloomFilterHeader { /// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits. /// Each word is thought of as an array of bits; each bit is either "set" or "not set". -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[repr(transparent)] struct Block([u32; 8]); impl Block { @@ -195,7 +195,7 @@ impl std::ops::IndexMut for Block { /// /// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`] /// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Sbbf(Vec); pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; @@ -244,29 +244,44 @@ fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize { } impl Sbbf { - /// Create a new [Sbbf] with given number of distinct values and false positive probability. - /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0. - pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result { - if !(0.0..1.0).contains(&fpp) { - return Err(ParquetError::General(format!( - "False positive probability must be between 0.0 and 1.0, got {fpp}" - ))); - } - let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp); - Ok(Self::new_with_num_of_bytes(num_bits / 8)) - } - - /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted - /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. - pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self { - let num_bytes = optimal_num_of_bytes(num_bytes); - assert_eq!(num_bytes % size_of::(), 0); - let num_blocks = num_bytes / size_of::(); - let bitset = vec![Block::ZERO; num_blocks]; - Self(bitset) - } - - pub(crate) fn new(bitset: &[u8]) -> Self { + /// Create a new [Sbbf] from raw bitset bytes. + /// + /// # Examples + /// + /// ``` + /// # use parquet::errors::Result; + /// # use parquet::bloom_filter::Sbbf; + /// # fn main() -> Result<()> { + /// // In a real application you would read a serialized bloom filter from a Parquet file. + /// // For example, using ParquetRecordBatchStreamBuilder: + /// // + /// // let file = std::fs::File::open("data.parquet")?; + /// // let reader = ParquetRecordBatchStreamBuilder::new(file).await?; + /// // let bloom_filter = reader + /// // .get_row_group_column_bloom_filter(row_group_index, column_index) + /// // .await?; + /// // + /// // For this example we handcraft a 32-byte bitset. + /// let bitset_bytes = vec![0u8; 32]; + /// let original = Sbbf::new(&bitset_bytes); + /// + /// // Persist the filter (header + bitset) into an in-memory buffer. + /// let mut serialized = Vec::new(); + /// original.write(&mut serialized)?; + /// + /// // When reading the filter back, reuse the bitset portion of the buffer. + /// let bitset_slice = &serialized[serialized.len() - bitset_bytes.len()..]; + /// let reconstructed = Sbbf::new(bitset_slice); + /// + /// assert_eq!(reconstructed, original); + /// # Ok(()) + /// # } + /// ``` + /// + /// A practical way to obtain a correctly sized bitset slice for this constructor is to + /// serialize an existing filter with [`Sbbf::write`] and reuse the bitset bytes that follow + /// the header. + pub fn new(bitset: &[u8]) -> Self { let data = bitset .chunks_exact(4 * 8) .map(|chunk| { @@ -283,7 +298,7 @@ impl Sbbf { /// Write the bloom filter data (header and then bitset) to the output. This doesn't /// flush the writer in order to boost performance of bulk writing all blocks. Caller /// must remember to flush the writer. - pub(crate) fn write(&self, mut writer: W) -> Result<(), ParquetError> { + pub fn write(&self, mut writer: W) -> Result<(), ParquetError> { let mut protocol = ThriftCompactOutputProtocol::new(&mut writer); self.header().write_thrift(&mut protocol).map_err(|e| { ParquetError::General(format!("Could not write bloom filter header: {e}")) @@ -292,6 +307,28 @@ impl Sbbf { Ok(()) } + /// Create a new [Sbbf] with given number of distinct values and false positive probability. + /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0. + pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result { + if !(0.0..1.0).contains(&fpp) { + return Err(ParquetError::General(format!( + "False positive probability must be between 0.0 and 1.0, got {fpp}" + ))); + } + let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp); + Ok(Self::new_with_num_of_bytes(num_bits / 8)) + } + + /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted + /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. + pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self { + let num_bytes = optimal_num_of_bytes(num_bytes); + assert_eq!(num_bytes % size_of::(), 0); + let num_blocks = num_bytes / size_of::(); + let bitset = vec![Block::ZERO; num_blocks]; + Self(bitset) + } + /// Write the bitset in serialized form to the writer. #[cfg(not(target_endian = "little"))] fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { @@ -476,6 +513,57 @@ mod tests { } } + #[test] + fn test_sbbf_new_parses_little_endian_blocks() { + let words: [u32; 16] = [ + 0x0001_0203, + 0x0405_0607, + 0x0809_0a0b, + 0x0c0d_0e0f, + 0x1011_1213, + 0x1415_1617, + 0x1819_1a1b, + 0x1c1d_1e1f, + 0x2021_2223, + 0x2425_2627, + 0x2829_2a2b, + 0x2c2d_2e2f, + 0x3031_3233, + 0x3435_3637, + 0x3839_3a3b, + 0x3c3d_3e3f, + ]; + let mut bitset = Vec::with_capacity(words.len() * 4); + for word in &words { + bitset.extend_from_slice(&word.to_le_bytes()); + } + let sbbf = Sbbf::new(&bitset); + assert_eq!(sbbf.0.len(), 2); + for (block_index, block) in sbbf.0.iter().enumerate() { + for word_index in 0..8 { + let overall_index = block_index * 8 + word_index; + assert_eq!(block[word_index], words[overall_index]); + } + } + } + + #[test] + fn test_sbbf_write_round_trip() { + let bitset: Vec = (0u8..64).collect(); + let sbbf = Sbbf::new(&bitset); + let mut output = Vec::new(); + sbbf.write(&mut output).unwrap(); + + let mut protocol = ThriftSliceInputProtocol::new(&output); + let header = BloomFilterHeader::read_thrift(&mut protocol).unwrap(); + assert_eq!(header.num_bytes, bitset.len() as i32); + assert_eq!(header.algorithm, BloomFilterAlgorithm::BLOCK); + assert_eq!(header.hash, BloomFilterHash::XXHASH); + assert_eq!(header.compression, BloomFilterCompression::UNCOMPRESSED); + + assert_eq!(protocol.as_slice(), bitset.as_slice()); + } + /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes /// so altogether it'll be 20 bytes at most.