Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 114 additions & 26 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub struct BloomFilterHeader {

/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(transparent)]
struct Block([u32; 8]);
impl Block {
Expand Down Expand Up @@ -195,7 +195,7 @@ impl std::ops::IndexMut<usize> for Block {
///
/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Sbbf(Vec<Block>);

pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
Expand Down Expand Up @@ -244,29 +244,44 @@ fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
}

impl Sbbf {
/// Create a new [Sbbf] with given number of distinct values and false positive probability.
/// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0.
pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
if !(0.0..1.0).contains(&fpp) {
return Err(ParquetError::General(format!(
"False positive probability must be between 0.0 and 1.0, got {fpp}"
)));
}
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}

/// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
/// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
assert_eq!(num_bytes % size_of::<Block>(), 0);
let num_blocks = num_bytes / size_of::<Block>();
let bitset = vec![Block::ZERO; num_blocks];
Self(bitset)
}

pub(crate) fn new(bitset: &[u8]) -> Self {
/// Create a new [Sbbf] from raw bitset bytes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a doc example here showing how to use this API with your intended usecase?

That will both help the documentation and ensure we have exposed enough of the API to be useful

For example, if the idea is to save a SBFF using write and then re-create it again, an exmaple that:

  1. made a Sbbf (ideally read it from a file)
  2. wrote it to a Vec<>
  3. Created a new Sbbf from that vec
  4. Show that it is the same as the original

it would be nice to mention that a correct argument for creating a sbff can be created using Sbff::write

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! that makes sense. I just added a doc example, let me know if you think anything else needs to be added/changed.

///
/// # Examples
///
/// ```
/// # use parquet::errors::Result;
/// # use parquet::bloom_filter::Sbbf;
/// # fn main() -> Result<()> {
/// // In a real application you would read a serialized bloom filter from a Parquet file.
/// // For example, using ParquetRecordBatchStreamBuilder:
/// //
/// // let file = std::fs::File::open("data.parquet")?;
/// // let reader = ParquetRecordBatchStreamBuilder::new(file).await?;
/// // let bloom_filter = reader
/// // .get_row_group_column_bloom_filter(row_group_index, column_index)
/// // .await?;
/// //
/// // For this example we handcraft a 32-byte bitset.
/// let bitset_bytes = vec![0u8; 32];
/// let original = Sbbf::new(&bitset_bytes);
///
/// // Persist the filter (header + bitset) into an in-memory buffer.
/// let mut serialized = Vec::new();
/// original.write(&mut serialized)?;
///
/// // When reading the filter back, reuse the bitset portion of the buffer.
/// let bitset_slice = &serialized[serialized.len() - bitset_bytes.len()..];
/// let reconstructed = Sbbf::new(bitset_slice);
///
/// assert_eq!(reconstructed, original);
/// # Ok(())
/// # }
/// ```
///
/// A practical way to obtain a correctly sized bitset slice for this constructor is to
/// serialize an existing filter with [`Sbbf::write`] and reuse the bitset bytes that follow
/// the header.
pub fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {
Expand All @@ -283,7 +298,7 @@ impl Sbbf {
/// Write the bloom filter data (header and then bitset) to the output. This doesn't
/// flush the writer in order to boost performance of bulk writing all blocks. Caller
/// must remember to flush the writer.
pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = ThriftCompactOutputProtocol::new(&mut writer);
self.header().write_thrift(&mut protocol).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter header: {e}"))
Expand All @@ -292,6 +307,28 @@ impl Sbbf {
Ok(())
}

/// Create a new [Sbbf] with given number of distinct values and false positive probability.
/// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0.
pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
if !(0.0..1.0).contains(&fpp) {
return Err(ParquetError::General(format!(
"False positive probability must be between 0.0 and 1.0, got {fpp}"
)));
}
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}

/// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
/// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
assert_eq!(num_bytes % size_of::<Block>(), 0);
let num_blocks = num_bytes / size_of::<Block>();
let bitset = vec![Block::ZERO; num_blocks];
Self(bitset)
}

/// Write the bitset in serialized form to the writer.
#[cfg(not(target_endian = "little"))]
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
Expand Down Expand Up @@ -476,6 +513,57 @@ mod tests {
}
}

#[test]
fn test_sbbf_new_parses_little_endian_blocks() {
let words: [u32; 16] = [
0x0001_0203,
0x0405_0607,
0x0809_0a0b,
0x0c0d_0e0f,
0x1011_1213,
0x1415_1617,
0x1819_1a1b,
0x1c1d_1e1f,
0x2021_2223,
0x2425_2627,
0x2829_2a2b,
0x2c2d_2e2f,
0x3031_3233,
0x3435_3637,
0x3839_3a3b,
0x3c3d_3e3f,
];
let mut bitset = Vec::with_capacity(words.len() * 4);
for word in &words {
bitset.extend_from_slice(&word.to_le_bytes());
}
let sbbf = Sbbf::new(&bitset);
assert_eq!(sbbf.0.len(), 2);
for (block_index, block) in sbbf.0.iter().enumerate() {
for word_index in 0..8 {
let overall_index = block_index * 8 + word_index;
assert_eq!(block[word_index], words[overall_index]);
}
}
}

#[test]
fn test_sbbf_write_round_trip() {
let bitset: Vec<u8> = (0u8..64).collect();
let sbbf = Sbbf::new(&bitset);
let mut output = Vec::new();
sbbf.write(&mut output).unwrap();

let mut protocol = ThriftSliceInputProtocol::new(&output);
let header = BloomFilterHeader::read_thrift(&mut protocol).unwrap();
assert_eq!(header.num_bytes, bitset.len() as i32);
assert_eq!(header.algorithm, BloomFilterAlgorithm::BLOCK);
assert_eq!(header.hash, BloomFilterHash::XXHASH);
assert_eq!(header.compression, BloomFilterCompression::UNCOMPRESSED);

assert_eq!(protocol.as_slice(), bitset.as_slice());
}

/// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
/// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
/// so altogether it'll be 20 bytes at most.
Expand Down
Loading