From 025d4119eca389c8882ab6d6a367ba1a9900ca82 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 24 Jun 2025 07:36:40 -0400 Subject: [PATCH 1/2] POC: Parquet predicate results cache --- arrow-select/src/coalesce.rs | 5 + arrow-select/src/filter.rs | 8 +- parquet/src/arrow/array_reader/builder.rs | 47 +++- .../src/arrow/array_reader/cached/builder.rs | 235 ++++++++++++++++++ parquet/src/arrow/array_reader/cached/mod.rs | 79 ++++++ .../src/arrow/array_reader/cached/reader.rs | 106 ++++++++ parquet/src/arrow/array_reader/list_array.rs | 3 +- parquet/src/arrow/array_reader/mod.rs | 2 + parquet/src/arrow/arrow_reader/mod.rs | 23 +- parquet/src/arrow/arrow_reader/read_plan.rs | 78 ++++-- parquet/src/arrow/async_reader/mod.rs | 19 +- parquet/src/arrow/mod.rs | 17 +- 12 files changed, 583 insertions(+), 39 deletions(-) create mode 100644 parquet/src/arrow/array_reader/cached/builder.rs create mode 100644 parquet/src/arrow/array_reader/cached/mod.rs create mode 100644 parquet/src/arrow/array_reader/cached/reader.rs diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 9b310c645d07..462365bdf11a 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -318,6 +318,11 @@ impl BatchCoalescer { pub fn next_completed_batch(&mut self) -> Option { self.completed.pop_front() } + + /// Returns all the completed batches + pub fn take_completed_batches(&mut self) -> VecDeque { + std::mem::take(&mut self.completed) + } } /// Return a new `InProgressArray` for the given data type diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index fa91c0690b4c..c67e64d5283b 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -324,6 +324,8 @@ impl IterationStrategy { } /// A filtering predicate that can be applied to an [`Array`] +/// +/// See [`FilterBuilder`] to create a [`FilterPredicate`]. #[derive(Debug)] pub struct FilterPredicate { filter: BooleanArray, @@ -502,6 +504,9 @@ fn filter_null_mask( } /// Filter the packed bitmask `buffer`, with `predicate` starting at bit offset `offset` +/// +/// Panics for `IterationStrategy::All` or `IterationStrategy::None` which must +/// be handled by the caller fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { let src = buffer.values(); let offset = buffer.offset(); @@ -536,7 +541,8 @@ fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { } builder.into() } - IterationStrategy::All | IterationStrategy::None => unreachable!(), + IterationStrategy::All => unreachable!(), + IterationStrategy::None => unreachable!(), } } diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 14a475859810..a0873476cb6f 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -23,7 +23,7 @@ use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ - make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, + make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, CachedPredicateResult, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader, }; @@ -37,11 +37,18 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; /// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader pub(crate) struct ArrayReaderBuilder<'a> { row_groups: &'a dyn RowGroups, + cached_predicate_result: Option<&'a CachedPredicateResult>, } impl<'a> ArrayReaderBuilder<'a> { - pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self { - Self { row_groups } + pub(crate) fn new( + row_groups: &'a dyn RowGroups, + cached_predicate_result: Option<&'a CachedPredicateResult>, + ) -> Self { + Self { + row_groups, + cached_predicate_result, + } } /// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader. @@ -68,6 +75,10 @@ impl<'a> ArrayReaderBuilder<'a> { field: &ParquetField, mask: &ProjectionMask, ) -> Result>> { + if let Some(builder) = self.build_cached_reader(field, mask)? { + return Ok(Some(builder)); + } + match field.field_type { ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask), ParquetFieldType::Group { .. } => match &field.arrow_type { @@ -81,6 +92,33 @@ impl<'a> ArrayReaderBuilder<'a> { } } + /// Build cached array reader if the field is in the projection mask and in the cache + fn build_cached_reader( + &self, + field: &ParquetField, + mask: &ProjectionMask, + ) -> Result>> { + let Some(cached_predicate_result) = self.cached_predicate_result else { + return Ok(None); + }; + + // TODO how to find a cached struct / list + // (Probably have to cache the individual fields) + let ParquetFieldType::Primitive { + col_idx, + primitive_type: _, + } = &field.field_type + else { + return Ok(None); + }; + + if !mask.leaf_included(*col_idx) { + return Ok(None); + } + + cached_predicate_result.build_reader(*col_idx) + } + /// Build array reader for map type. fn build_map_reader( &self, @@ -375,7 +413,8 @@ mod tests { ) .unwrap(); - let array_reader = ArrayReaderBuilder::new(&file_reader) + let cached_predicate_result = None; + let array_reader = ArrayReaderBuilder::new(&file_reader, cached_predicate_result) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/cached/builder.rs b/parquet/src/arrow/array_reader/cached/builder.rs new file mode 100644 index 000000000000..ec7fe7cdd798 --- /dev/null +++ b/parquet/src/arrow/array_reader/cached/builder.rs @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::CachedPredicateResult; +use crate::arrow::arrow_reader::RowSelection; +use crate::arrow::ProjectionMask; +use arrow_array::{Array, BooleanArray, RecordBatch}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; +use arrow_select::filter::prep_null_mask_filter; +use std::sync::Arc; +use arrow_select::coalesce::BatchCoalescer; + +/// Incrementally builds the result of evaluating an ArrowPredicate on +/// a RowGroup. +#[derive(Debug)] +pub(crate) struct CachedPredicateResultBuilder { + /// What is being cached + strategy: CacheStrategy, + /// Total number of columns in the original parquet schema + num_original_columns: usize, + /// Any filters that have been applied. Note this the complete set of filters + /// that have been applied to the cached batches. + filters: Vec, +} + +#[derive(Debug)] +enum CacheStrategy { + /// Don't cache any results + None, + /// Cache the result of filtering all columns in the filter schema + All { + /// The builder for the cached batches + cached_batches_builder: BatchCoalescer, + /// The indexes of the columns in the original parquet schema that are in the projection + original_projection: Vec, + }, + /// Cache the result of filtering a subset of the columns in the filter schema + Subset { + /// The builder for the cached batches + cached_batches_builder: BatchCoalescer, + /// The indexes of the columns in the filter schema that are in the projection + filter_projection: Vec, + /// The indexes of the columns in the original parquet schema that are in the projection + original_projection: Vec, + }, +} + +impl CachedPredicateResultBuilder { + /// Create a new CachedPredicateResultBuilder + /// + /// # Arguments: + /// * `num_original_columns`: The number of columns in the original parquet schema + /// * `schema`: The schema of the filtered record batch (not the original parquet schema) + /// * `filter_mask`: which columns of the original parquet schema did the filter columns come from? + /// * `projection_mask`: which columns of the original parquet schema are in the final projection? + /// + /// This structure does not cache filter results for the columns that are not + /// in the projection mask. This is because the filter results are not needed + pub(crate) fn try_new( + num_original_columns: usize, + filter_schema: &SchemaRef, + filter_mask: &ProjectionMask, + projection_mask: &ProjectionMask, + batch_size: usize, + ) -> Result { + let (filter_mask_inner, projection_mask_inner) = + match (filter_mask.mask(), projection_mask.mask()) { + (Some(filter_mask), Some(projection_mask)) => (filter_mask, projection_mask), + // None means "select all columns" so in this case cache all filtered columns + (Some(filter_mask), None) => (filter_mask, filter_mask), + // None means "select all columns" so in this case cache all columns used in projection + (None, Some(projection_mask)) => (projection_mask, projection_mask), + (None, None) => { + // this means all columns are in the projection *and* filter so cache them all when possible + let cached_batches_builder = BatchCoalescer::new( + Arc::clone(filter_schema), + batch_size, + ); + let strategy = CacheStrategy::All { + cached_batches_builder, + original_projection: (0..num_original_columns).collect(), + }; + return { + Ok(Self { + strategy, + num_original_columns, + filters: vec![], + }) + }; + } + }; + + // Otherwise, need to select a subset of the fields from each batch to cache + + // This is an iterator over the fields of the schema of batches passed + // to the filter. + let mut filter_field_iter = filter_schema.fields.iter().enumerate(); + + let mut filter_projection = vec![]; + let mut original_projection = vec![]; + let mut fields = vec![]; + + // Iterate over the masks from the original schema + assert_eq!(filter_mask_inner.len(), projection_mask_inner.len()); + for (original_index, (&in_filter, &in_projection)) in filter_mask_inner + .iter() + .zip(projection_mask_inner.iter()) + .enumerate() + { + if !in_filter { + continue; + } + // take next field from the filter schema + let (filter_index, field) = + filter_field_iter.next().expect("mismatch in field lengths"); + if !in_projection { + // this field is not in the projection, so don't cache it + continue; + } + // this field is both in filter and the projection, so cache the results + filter_projection.push(filter_index); + original_projection.push(original_index); + fields.push(Arc::clone(field)); + } + let strategy = if fields.is_empty() { + CacheStrategy::None + } else { + let cached_batches_builder = + BatchCoalescer::new(Arc::new(Schema::new(fields)), batch_size); + CacheStrategy::Subset { + cached_batches_builder, + filter_projection, + original_projection, + } + }; + + Ok(Self { + strategy, + num_original_columns, + filters: vec![], + }) + } + + /// Add a new batch and filter to the builder + pub(crate) fn add( + &mut self, + batch: RecordBatch, + mut filter: BooleanArray, + ) -> crate::errors::Result<()> { + if filter.null_count() > 0 { + filter = prep_null_mask_filter(&filter); + } + + match &mut self.strategy { + CacheStrategy::None => {} + CacheStrategy::All { + cached_batches_builder, + .. + } => { + cached_batches_builder.push_batch_with_filter(batch, &filter)?; + } + CacheStrategy::Subset { + cached_batches_builder, + ref filter_projection, + .. + } => { + // If we have a filter projection, we need to project the batch + // to only the columns that are in the filter projection + let projected_batch = batch.project(filter_projection)?; + cached_batches_builder.push_batch_with_filter(projected_batch, &filter)?; + } + } + + self.filters.push(filter); + + Ok(()) + } + + /// Return (selection, maybe_cached_predicate_result) that represents the rows + /// that were selected and batches that were evaluated. + pub(crate) fn build( + self, + ) -> crate::errors::Result<(RowSelection, Option)> { + let Self { + strategy, + num_original_columns, + filters, + } = self; + + let new_selection = RowSelection::from_filters(&filters); + + match strategy { + CacheStrategy::None => Ok((new_selection, None)), + CacheStrategy::All { + mut cached_batches_builder, + original_projection, + } + | CacheStrategy::Subset { + mut cached_batches_builder, + original_projection, + .. + } => { + // explode out the cached batches into the proper place in the original schema + cached_batches_builder.finish_buffered_batch()?; + let completed_batches = cached_batches_builder + .take_completed_batches(); + + let mut cached_result = CachedPredicateResult::new(num_original_columns, filters); + for (batch_index, original_idx) in original_projection.iter().enumerate() { + let mut column_arrays = Vec::with_capacity(completed_batches.len()); + for batch in &completed_batches { + column_arrays.push(Arc::clone(batch.column(batch_index))); + } + cached_result.add_result(*original_idx, column_arrays); + } + + Ok((new_selection, Some(cached_result))) + } + } + } +} diff --git a/parquet/src/arrow/array_reader/cached/mod.rs b/parquet/src/arrow/array_reader/cached/mod.rs new file mode 100644 index 000000000000..8f154f15ef0b --- /dev/null +++ b/parquet/src/arrow/array_reader/cached/mod.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implements a cached column reader that provides data using +//! previously decoded / filtered arrays + +mod builder; +mod reader; + +pub(crate) use builder::CachedPredicateResultBuilder; +use reader::CachedArrayReader; + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::Result; +use arrow_array::{ArrayRef, BooleanArray}; + +/// The result of evaluating a predicate on a RowGroup with a specific +/// RowSelection +/// +/// The flow is: +/// * Decode with a RowSelection +/// * Apply a predicate --> this result +#[derive(Clone)] +pub(crate) struct CachedPredicateResult { + /// Map of parquet schema column index to the result of evaluating the predicate + /// on that column. + /// + /// NOTE each array already has had `filters` applied + /// + /// If `Some`, it is a set of arrays that make up the result. Each has + /// batch_rows rows except for the last + arrays: Vec>>, + /// The results of evaluating the predicate (this has already been applied to the + /// cached results). + filters: Vec, +} + +impl CachedPredicateResult { + pub(crate) fn new(num_columns: usize, filters: Vec) -> Self { + Self { + arrays: vec![None; num_columns], + filters, + } + } + + /// Add the specified array to the cached result + pub fn add_result(&mut self, column_index: usize, arrays: Vec) { + // TODO how is this possible to end up with previously cached arrays? + //assert!(self.arrays.get(column_index).is_none(), "column index {} already has a cached array", column_index); + self.arrays[column_index] = Some(arrays); + } + + /// Returns an array reader for the given column index, if any, that reads from the cache rather + /// than the original column chunk + pub(crate) fn build_reader(&self, col_index: usize) -> Result>> { + let Some(array) = &self.arrays[col_index] else { + return Ok(None); + }; + + Ok(Some(Box::new(CachedArrayReader::new( + array.clone(), + &self.filters, + )))) + } +} diff --git a/parquet/src/arrow/array_reader/cached/reader.rs b/parquet/src/arrow/array_reader/cached/reader.rs new file mode 100644 index 000000000000..881f6a11b410 --- /dev/null +++ b/parquet/src/arrow/array_reader/cached/reader.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use arrow_array::{new_empty_array, Array, ArrayRef, BooleanArray}; +use arrow_schema::DataType; +use std::any::Any; +use std::collections::VecDeque; + +pub(crate) struct CachedArrayReader { + /// The cached arrays. These should already be broken down into the correct batch_size chunks + cached_arrays: VecDeque, + data_type: DataType, + // /// The filter that was applied to the cached array (that has already been applied) + //filter: BooleanArray, + /// The length of the currently "in progress" array + current_length: usize, +} + +impl CachedArrayReader { + pub(crate) fn new(cached_arrays: Vec, _filters: &[BooleanArray]) -> Self { + //let input: Vec<&dyn Array> = filters.iter().map(|b| b as &dyn Array).collect::>(); + //let filter = concat(&input).unwrap().as_boolean().clone(); + let data_type = cached_arrays + .first() + .expect("had at least one array") + .data_type() + .clone(); + Self { + cached_arrays: VecDeque::from(cached_arrays), + data_type, + current_length: 0, + } + } +} + +impl ArrayReader for CachedArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &DataType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> crate::errors::Result { + // since the entire array is cached, reads always succeed + self.current_length += batch_size; + Ok(batch_size) + } + + // Produce the "in progress" batch + fn consume_batch(&mut self) -> crate::errors::Result { + if self.current_length == 0 { + return Ok(new_empty_array(&self.data_type)); + } + + let mut next_array = self.cached_arrays.pop_front().ok_or_else(|| { + crate::errors::ParquetError::General( + "Internal error: no more cached arrays".to_string(), + ) + })?; + + // the next batch is the next array in the queue + // when a limit is applied, the next array may be smaller than the cached batch size, which is fine as we are + // just consuming the next available array + // TODO take this limit into account as part of the filter creation + // TODO this was hit by DataFusion tests, not arrow-rs tests, so there is a gap in our testing + // TODO add coverage + if self.current_length < next_array.len() { + next_array = next_array.slice(0, self.current_length); + } + assert_eq!(self.current_length, next_array.len()); + self.current_length = 0; + Ok(next_array) + } + + fn skip_records(&mut self, num_records: usize) -> crate::errors::Result { + // todo!() + // it would be good to verify the pattern of read/consume matches + // the boolean array + Ok(num_records) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None // TODO this is likely not right for structured types + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None // TODO this is likely not right for structured types + } +} diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 66c4f30b3c29..1d473ac1fc0d 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -563,7 +563,8 @@ mod tests { ) .unwrap(); - let mut array_reader = ArrayReaderBuilder::new(&file_reader) + let cached_predicate_result = None; + let mut array_reader = ArrayReaderBuilder::new(&file_reader, cached_predicate_result) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 94d61c9eacf5..4e6b26045e85 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -42,6 +42,7 @@ mod null_array; mod primitive_array; mod struct_array; +mod cached; #[cfg(test)] mod test_util; @@ -50,6 +51,7 @@ pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks pub use byte_view_array::make_byte_view_array_reader; +pub(crate) use cached::{CachedPredicateResult, CachedPredicateResultBuilder}; #[allow(unused_imports)] // Only used for benchmarks pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; pub use fixed_size_list_array::FixedSizeListArrayReader; diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 9127423efe4b..6ab41fdd7a87 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -716,6 +716,8 @@ impl ParquetRecordBatchReaderBuilder { .row_groups .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect()); + let num_original_columns = self.metadata.file_metadata().schema_descr().num_columns(); + let reader = ReaderRowGroups { reader: Arc::new(self.input.0), metadata: self.metadata, @@ -733,14 +735,24 @@ impl ParquetRecordBatchReaderBuilder { break; } - let array_reader = ArrayReaderBuilder::new(&reader) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + // TODO move this into the read_plan?? + + // Create an ArrayReader for evaluating (just) the predicate columns + let array_reader = + ArrayReaderBuilder::new(&reader, plan_builder.cached_predicate_result()) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; - plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; + // Update the plan with the results of predicate evaluation + plan_builder = plan_builder.with_predicate( + num_original_columns, + array_reader, + predicate.as_mut(), + &self.projection, + )?; } } - let array_reader = ArrayReaderBuilder::new(&reader) + let array_reader = ArrayReaderBuilder::new(&reader, plan_builder.cached_predicate_result()) .build_array_reader(self.fields.as_deref(), &self.projection)?; let read_plan = plan_builder @@ -941,7 +953,8 @@ impl ParquetRecordBatchReader { batch_size: usize, selection: Option, ) -> Result { - let array_reader = ArrayReaderBuilder::new(row_groups) + let cached_predicate_result = None; + let array_reader = ArrayReaderBuilder::new(row_groups, cached_predicate_result) .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; let read_plan = ReadPlanBuilder::new(batch_size) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index e083fb822be4..da76b775d317 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -18,13 +18,15 @@ //! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read //! from a Parquet file -use crate::arrow::array_reader::ArrayReader; +use crate::arrow::array_reader::{ + ArrayReader, CachedPredicateResult, CachedPredicateResultBuilder, +}; use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector, }; +use crate::arrow::ProjectionMask; use crate::errors::{ParquetError, Result}; -use arrow_array::Array; -use arrow_select::filter::prep_null_mask_filter; +use arrow_array::RecordBatchReader; use std::collections::VecDeque; /// A builder for [`ReadPlan`] @@ -33,6 +35,8 @@ pub(crate) struct ReadPlanBuilder { batch_size: usize, /// Current to apply, includes all filters selection: Option, + /// Cached result of evaluating some columns with the RowSelection + cached_predicate_result: Option, } impl ReadPlanBuilder { @@ -41,6 +45,7 @@ impl ReadPlanBuilder { Self { batch_size, selection: None, + cached_predicate_result: None, } } @@ -56,6 +61,11 @@ impl ReadPlanBuilder { self.selection.as_ref() } + /// Returns the currently cached predicate result, if any + pub(crate) fn cached_predicate_result(&self) -> Option<&CachedPredicateResult> { + self.cached_predicate_result.as_ref() + } + /// Specifies the number of rows in the row group, before filtering is applied. /// /// Returns a [`LimitedReadPlanBuilder`] that can apply @@ -83,24 +93,48 @@ impl ReadPlanBuilder { /// Evaluates an [`ArrowPredicate`], updating this plan's `selection` /// - /// If the current `selection` is `Some`, the resulting [`RowSelection`] - /// will be the conjunction of the existing selection and the rows selected - /// by `predicate`. + /// # Arguments + /// + /// * `num_original_columns`: The number of columns in the original parquet + /// schema. + /// + /// * `array_reader`: The array reader to use for evaluating the predicate. + /// must be configured with the projection mask specified by + /// [`ArrowPredicate::projection`] for the `predicate`. + /// + /// * `predicate`: The predicate to evaluate /// - /// Note: pre-existing selections may come from evaluating a previous predicate - /// or if the [`ParquetRecordBatchReader`] specified an explicit + /// * `projection`: The projection mask that will be selected. This code will + /// potentially cache the results of filtering columns that also appear in the + /// projection mask. + /// + /// If `this.selection` is `Some`, the resulting [`RowSelection`] will be + /// the conjunction of it and the rows selected by `predicate` (they will be + /// `AND`ed). + /// + /// Note: A pre-existing selection may come from evaluating a previous + /// predicate or if the [`ParquetRecordBatchReader`] specifies an explicit /// [`RowSelection`] in addition to one or more predicates. pub(crate) fn with_predicate( mut self, + num_original_columns: usize, array_reader: Box, predicate: &mut dyn ArrowPredicate, + projection_mask: &ProjectionMask, ) -> Result { + // Prepare to decode all rows in the selection to evaluate the predicate let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); - let mut filters = vec![]; + let mut cached_results_builder = CachedPredicateResultBuilder::try_new( + num_original_columns, + &reader.schema(), + predicate.projection(), + projection_mask, + self.batch_size, + )?; for maybe_batch in reader { - let maybe_batch = maybe_batch?; - let input_rows = maybe_batch.num_rows(); - let filter = predicate.evaluate(maybe_batch)?; + let batch = maybe_batch?; + let input_rows = batch.num_rows(); + let filter = predicate.evaluate(batch.clone())?; // Since user supplied predicate, check error here to catch bugs quickly if filter.len() != input_rows { return Err(arrow_err!( @@ -108,17 +142,16 @@ impl ReadPlanBuilder { filter.len() )); } - match filter.null_count() { - 0 => filters.push(filter), - _ => filters.push(prep_null_mask_filter(&filter)), - }; + cached_results_builder.add(batch, filter)?; } - let raw = RowSelection::from_filters(&filters); + let (raw, cached_predicate_result) = cached_results_builder.build()?; self.selection = match self.selection.take() { Some(selection) => Some(selection.and_then(&raw)), None => Some(raw), }; + + self.cached_predicate_result = cached_predicate_result; Ok(self) } @@ -131,6 +164,7 @@ impl ReadPlanBuilder { let Self { batch_size, selection, + cached_predicate_result, } = self; let selection = selection.map(|s| s.trim().into()); @@ -138,6 +172,7 @@ impl ReadPlanBuilder { ReadPlan { batch_size, selection, + cached_predicate_result, } } } @@ -234,7 +269,11 @@ pub(crate) struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, /// Row ranges to be selected from the data source + /// TODO update this to use something more efficient + /// See selection: Option>, + /// Cached result of evaluating some column(s) with the current RowSelection + cached_predicate_result: Option, } impl ReadPlan { @@ -243,6 +282,11 @@ impl ReadPlan { self.selection.as_mut() } + /// Returns the current cached predicate result, if any + pub(crate) fn cached_predicate_result(&self) -> Option<&CachedPredicateResult> { + self.cached_predicate_result.as_ref() + } + /// Return the number of rows to read in each output batch #[inline(always)] pub fn batch_size(&self) -> usize { diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 611d6999e07e..a2c1b84b8259 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -599,6 +599,11 @@ where let filter = self.filter.as_mut(); let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + let num_original_columns = row_group + .metadata + .file_metadata() + .schema_descr() + .num_columns(); // Update selection based on any filters if let Some(filter) = filter { @@ -613,10 +618,16 @@ where .fetch(&mut self.input, predicate.projection(), selection) .await?; - let array_reader = ArrayReaderBuilder::new(&row_group) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + let array_reader = + ArrayReaderBuilder::new(&row_group, plan_builder.cached_predicate_result()) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; - plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; + plan_builder = plan_builder.with_predicate( + num_original_columns, + array_reader, + predicate.as_mut(), + &projection, + )?; } } @@ -661,7 +672,7 @@ where let plan = plan_builder.build(); - let array_reader = ArrayReaderBuilder::new(&row_group) + let array_reader = ArrayReaderBuilder::new(&row_group, plan.cached_predicate_result()) .build_array_reader(self.fields.as_deref(), &projection)?; let reader = ParquetRecordBatchReader::new(array_reader, plan); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index e33d6a05a757..335c059c86d3 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -251,17 +251,15 @@ pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; /// #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionMask { - /// If `Some`, a leaf column should be included if the value at + /// If present, a leaf column should be included if the value at /// the corresponding index is true /// - /// If `None`, all columns should be included + /// If `None`, include all columns /// - /// # Examples + /// # Example /// - /// Given the original parquet schema with leaf columns is `[a, b, c, d]` - /// - /// A mask of `[true, false, true, false]` will result in a schema 2 - /// elements long: + /// If the original parquet schema is `[a, b, c, d]` and the mask is `[true, + /// false, true, false]`, then the resulting schema will be 2 elements long: /// * `fields[0]`: `a` /// * `fields[1]`: `c` /// @@ -279,6 +277,11 @@ impl ProjectionMask { Self { mask: None } } + // TODO better interface + pub(crate) fn mask(&self) -> Option<&Vec> { + self.mask.as_ref() + } + /// Create a [`ProjectionMask`] which selects only the specified leaf columns /// /// Note: repeated or out of order indices will not impact the final mask From 81db293240d39ca77180d1fd0c3072d1d4078ba8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 24 Jun 2025 08:43:27 -0400 Subject: [PATCH 2/2] fmt --- parquet/src/arrow/array_reader/cached/builder.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/array_reader/cached/builder.rs b/parquet/src/arrow/array_reader/cached/builder.rs index ec7fe7cdd798..7c240d12ebc6 100644 --- a/parquet/src/arrow/array_reader/cached/builder.rs +++ b/parquet/src/arrow/array_reader/cached/builder.rs @@ -20,9 +20,9 @@ use crate::arrow::arrow_reader::RowSelection; use crate::arrow::ProjectionMask; use arrow_array::{Array, BooleanArray, RecordBatch}; use arrow_schema::{ArrowError, Schema, SchemaRef}; +use arrow_select::coalesce::BatchCoalescer; use arrow_select::filter::prep_null_mask_filter; use std::sync::Arc; -use arrow_select::coalesce::BatchCoalescer; /// Incrementally builds the result of evaluating an ArrowPredicate on /// a RowGroup. @@ -86,10 +86,8 @@ impl CachedPredicateResultBuilder { (None, Some(projection_mask)) => (projection_mask, projection_mask), (None, None) => { // this means all columns are in the projection *and* filter so cache them all when possible - let cached_batches_builder = BatchCoalescer::new( - Arc::clone(filter_schema), - batch_size, - ); + let cached_batches_builder = + BatchCoalescer::new(Arc::clone(filter_schema), batch_size); let strategy = CacheStrategy::All { cached_batches_builder, original_projection: (0..num_original_columns).collect(), @@ -216,8 +214,7 @@ impl CachedPredicateResultBuilder { } => { // explode out the cached batches into the proper place in the original schema cached_batches_builder.finish_buffered_batch()?; - let completed_batches = cached_batches_builder - .take_completed_batches(); + let completed_batches = cached_batches_builder.take_completed_batches(); let mut cached_result = CachedPredicateResult::new(num_original_columns, filters); for (batch_index, original_idx) in original_projection.iter().enumerate() {