Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ impl BatchCoalescer {
pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
self.completed.pop_front()
}

/// Returns all the completed batches
pub fn take_completed_batches(&mut self) -> VecDeque<RecordBatch> {
std::mem::take(&mut self.completed)
}
}

/// Return a new `InProgressArray` for the given data type
Expand Down
8 changes: 7 additions & 1 deletion arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ impl IterationStrategy {
}

/// A filtering predicate that can be applied to an [`Array`]
///
/// See [`FilterBuilder`] to create a [`FilterPredicate`].
#[derive(Debug)]
pub struct FilterPredicate {
filter: BooleanArray,
Expand Down Expand Up @@ -502,6 +504,9 @@ fn filter_null_mask(
}

/// Filter the packed bitmask `buffer`, with `predicate` starting at bit offset `offset`
///
/// Panics for `IterationStrategy::All` or `IterationStrategy::None` which must
/// be handled by the caller
fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer {
let src = buffer.values();
let offset = buffer.offset();
Expand Down Expand Up @@ -536,7 +541,8 @@ fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer {
}
builder.into()
}
IterationStrategy::All | IterationStrategy::None => unreachable!(),
IterationStrategy::All => unreachable!(),
IterationStrategy::None => unreachable!(),
}
}

Expand Down
47 changes: 43 additions & 4 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, CachedPredicateResult,
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroups, StructArrayReader,
};
Expand All @@ -37,11 +37,18 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader
pub(crate) struct ArrayReaderBuilder<'a> {
row_groups: &'a dyn RowGroups,
cached_predicate_result: Option<&'a CachedPredicateResult>,
}

impl<'a> ArrayReaderBuilder<'a> {
pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self {
Self { row_groups }
pub(crate) fn new(
row_groups: &'a dyn RowGroups,
cached_predicate_result: Option<&'a CachedPredicateResult>,
) -> Self {
Self {
row_groups,
cached_predicate_result,
}
}

/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
Expand All @@ -68,6 +75,10 @@ impl<'a> ArrayReaderBuilder<'a> {
field: &ParquetField,
mask: &ProjectionMask,
) -> Result<Option<Box<dyn ArrayReader>>> {
if let Some(builder) = self.build_cached_reader(field, mask)? {
return Ok(Some(builder));
}

match field.field_type {
ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask),
ParquetFieldType::Group { .. } => match &field.arrow_type {
Expand All @@ -81,6 +92,33 @@ impl<'a> ArrayReaderBuilder<'a> {
}
}

/// Build cached array reader if the field is in the projection mask and in the cache
fn build_cached_reader(
&self,
field: &ParquetField,
mask: &ProjectionMask,
) -> Result<Option<Box<dyn ArrayReader>>> {
let Some(cached_predicate_result) = self.cached_predicate_result else {
return Ok(None);
};

// TODO how to find a cached struct / list
// (Probably have to cache the individual fields)
let ParquetFieldType::Primitive {
col_idx,
primitive_type: _,
} = &field.field_type
else {
return Ok(None);
};

if !mask.leaf_included(*col_idx) {
return Ok(None);
}

cached_predicate_result.build_reader(*col_idx)
}

/// Build array reader for map type.
fn build_map_reader(
&self,
Expand Down Expand Up @@ -375,7 +413,8 @@ mod tests {
)
.unwrap();

let array_reader = ArrayReaderBuilder::new(&file_reader)
let cached_predicate_result = None;
let array_reader = ArrayReaderBuilder::new(&file_reader, cached_predicate_result)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();

Expand Down
232 changes: 232 additions & 0 deletions parquet/src/arrow/array_reader/cached/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::CachedPredicateResult;
use crate::arrow::arrow_reader::RowSelection;
use crate::arrow::ProjectionMask;
use arrow_array::{Array, BooleanArray, RecordBatch};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use arrow_select::coalesce::BatchCoalescer;
use arrow_select::filter::prep_null_mask_filter;
use std::sync::Arc;

/// Incrementally builds the result of evaluating an ArrowPredicate on
/// a RowGroup.
#[derive(Debug)]
pub(crate) struct CachedPredicateResultBuilder {
/// What is being cached
strategy: CacheStrategy,
/// Total number of columns in the original parquet schema
num_original_columns: usize,
/// Any filters that have been applied. Note this the complete set of filters
/// that have been applied to the cached batches.
filters: Vec<BooleanArray>,
}

#[derive(Debug)]
enum CacheStrategy {
/// Don't cache any results
None,
/// Cache the result of filtering all columns in the filter schema
All {
/// The builder for the cached batches
cached_batches_builder: BatchCoalescer,
/// The indexes of the columns in the original parquet schema that are in the projection
original_projection: Vec<usize>,
},
/// Cache the result of filtering a subset of the columns in the filter schema
Subset {
/// The builder for the cached batches
cached_batches_builder: BatchCoalescer,
/// The indexes of the columns in the filter schema that are in the projection
filter_projection: Vec<usize>,
/// The indexes of the columns in the original parquet schema that are in the projection
original_projection: Vec<usize>,
},
}

impl CachedPredicateResultBuilder {
/// Create a new CachedPredicateResultBuilder
///
/// # Arguments:
/// * `num_original_columns`: The number of columns in the original parquet schema
/// * `schema`: The schema of the filtered record batch (not the original parquet schema)
/// * `filter_mask`: which columns of the original parquet schema did the filter columns come from?
/// * `projection_mask`: which columns of the original parquet schema are in the final projection?
///
/// This structure does not cache filter results for the columns that are not
/// in the projection mask. This is because the filter results are not needed
pub(crate) fn try_new(
num_original_columns: usize,
filter_schema: &SchemaRef,
filter_mask: &ProjectionMask,
projection_mask: &ProjectionMask,
batch_size: usize,
) -> Result<Self, ArrowError> {
let (filter_mask_inner, projection_mask_inner) =
match (filter_mask.mask(), projection_mask.mask()) {
(Some(filter_mask), Some(projection_mask)) => (filter_mask, projection_mask),
// None means "select all columns" so in this case cache all filtered columns
(Some(filter_mask), None) => (filter_mask, filter_mask),
// None means "select all columns" so in this case cache all columns used in projection
(None, Some(projection_mask)) => (projection_mask, projection_mask),
(None, None) => {
// this means all columns are in the projection *and* filter so cache them all when possible
let cached_batches_builder =
BatchCoalescer::new(Arc::clone(filter_schema), batch_size);
let strategy = CacheStrategy::All {
cached_batches_builder,
original_projection: (0..num_original_columns).collect(),
};
return {
Ok(Self {
strategy,
num_original_columns,
filters: vec![],
})
};
}
};

// Otherwise, need to select a subset of the fields from each batch to cache

// This is an iterator over the fields of the schema of batches passed
// to the filter.
let mut filter_field_iter = filter_schema.fields.iter().enumerate();

let mut filter_projection = vec![];
let mut original_projection = vec![];
let mut fields = vec![];

// Iterate over the masks from the original schema
assert_eq!(filter_mask_inner.len(), projection_mask_inner.len());
for (original_index, (&in_filter, &in_projection)) in filter_mask_inner
.iter()
.zip(projection_mask_inner.iter())
.enumerate()
{
if !in_filter {
continue;
}
// take next field from the filter schema
let (filter_index, field) =
filter_field_iter.next().expect("mismatch in field lengths");
if !in_projection {
// this field is not in the projection, so don't cache it
continue;
}
// this field is both in filter and the projection, so cache the results
filter_projection.push(filter_index);
original_projection.push(original_index);
fields.push(Arc::clone(field));
}
let strategy = if fields.is_empty() {
CacheStrategy::None
} else {
let cached_batches_builder =
BatchCoalescer::new(Arc::new(Schema::new(fields)), batch_size);
CacheStrategy::Subset {
cached_batches_builder,
filter_projection,
original_projection,
}
};

Ok(Self {
strategy,
num_original_columns,
filters: vec![],
})
}

/// Add a new batch and filter to the builder
pub(crate) fn add(
&mut self,
batch: RecordBatch,
mut filter: BooleanArray,
) -> crate::errors::Result<()> {
if filter.null_count() > 0 {
filter = prep_null_mask_filter(&filter);
}

match &mut self.strategy {
CacheStrategy::None => {}
CacheStrategy::All {
cached_batches_builder,
..
} => {
cached_batches_builder.push_batch_with_filter(batch, &filter)?;
}
CacheStrategy::Subset {
cached_batches_builder,
ref filter_projection,
..
} => {
// If we have a filter projection, we need to project the batch
// to only the columns that are in the filter projection
let projected_batch = batch.project(filter_projection)?;
cached_batches_builder.push_batch_with_filter(projected_batch, &filter)?;
}
}

self.filters.push(filter);

Ok(())
}

/// Return (selection, maybe_cached_predicate_result) that represents the rows
/// that were selected and batches that were evaluated.
pub(crate) fn build(
self,
) -> crate::errors::Result<(RowSelection, Option<CachedPredicateResult>)> {
let Self {
strategy,
num_original_columns,
filters,
} = self;

let new_selection = RowSelection::from_filters(&filters);

match strategy {
CacheStrategy::None => Ok((new_selection, None)),
CacheStrategy::All {
mut cached_batches_builder,
original_projection,
}
| CacheStrategy::Subset {
mut cached_batches_builder,
original_projection,
..
} => {
// explode out the cached batches into the proper place in the original schema
cached_batches_builder.finish_buffered_batch()?;
let completed_batches = cached_batches_builder.take_completed_batches();

let mut cached_result = CachedPredicateResult::new(num_original_columns, filters);
for (batch_index, original_idx) in original_projection.iter().enumerate() {
let mut column_arrays = Vec::with_capacity(completed_batches.len());
for batch in &completed_batches {
column_arrays.push(Arc::clone(batch.column(batch_index)));
}
cached_result.add_result(*original_idx, column_arrays);
}

Ok((new_selection, Some(cached_result)))
}
}
}
}
Loading
Loading