Skip to content

Commit 025d411

Browse files
committed
POC: Parquet predicate results cache
1 parent 2b40d1d commit 025d411

File tree

12 files changed

+583
-39
lines changed

12 files changed

+583
-39
lines changed

arrow-select/src/coalesce.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,11 @@ impl BatchCoalescer {
318318
pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
319319
self.completed.pop_front()
320320
}
321+
322+
/// Returns all the completed batches
323+
pub fn take_completed_batches(&mut self) -> VecDeque<RecordBatch> {
324+
std::mem::take(&mut self.completed)
325+
}
321326
}
322327

323328
/// Return a new `InProgressArray` for the given data type

arrow-select/src/filter.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,8 @@ impl IterationStrategy {
324324
}
325325

326326
/// A filtering predicate that can be applied to an [`Array`]
327+
///
328+
/// See [`FilterBuilder`] to create a [`FilterPredicate`].
327329
#[derive(Debug)]
328330
pub struct FilterPredicate {
329331
filter: BooleanArray,
@@ -502,6 +504,9 @@ fn filter_null_mask(
502504
}
503505

504506
/// Filter the packed bitmask `buffer`, with `predicate` starting at bit offset `offset`
507+
///
508+
/// Panics for `IterationStrategy::All` or `IterationStrategy::None` which must
509+
/// be handled by the caller
505510
fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer {
506511
let src = buffer.values();
507512
let offset = buffer.offset();
@@ -536,7 +541,8 @@ fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer {
536541
}
537542
builder.into()
538543
}
539-
IterationStrategy::All | IterationStrategy::None => unreachable!(),
544+
IterationStrategy::All => unreachable!(),
545+
IterationStrategy::None => unreachable!(),
540546
}
541547
}
542548

parquet/src/arrow/array_reader/builder.rs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
2323
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2424
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
2525
use crate::arrow::array_reader::{
26-
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
26+
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, CachedPredicateResult,
2727
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
2828
PrimitiveArrayReader, RowGroups, StructArrayReader,
2929
};
@@ -37,11 +37,18 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
3737
/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader
3838
pub(crate) struct ArrayReaderBuilder<'a> {
3939
row_groups: &'a dyn RowGroups,
40+
cached_predicate_result: Option<&'a CachedPredicateResult>,
4041
}
4142

4243
impl<'a> ArrayReaderBuilder<'a> {
43-
pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self {
44-
Self { row_groups }
44+
pub(crate) fn new(
45+
row_groups: &'a dyn RowGroups,
46+
cached_predicate_result: Option<&'a CachedPredicateResult>,
47+
) -> Self {
48+
Self {
49+
row_groups,
50+
cached_predicate_result,
51+
}
4552
}
4653

4754
/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
@@ -68,6 +75,10 @@ impl<'a> ArrayReaderBuilder<'a> {
6875
field: &ParquetField,
6976
mask: &ProjectionMask,
7077
) -> Result<Option<Box<dyn ArrayReader>>> {
78+
if let Some(builder) = self.build_cached_reader(field, mask)? {
79+
return Ok(Some(builder));
80+
}
81+
7182
match field.field_type {
7283
ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask),
7384
ParquetFieldType::Group { .. } => match &field.arrow_type {
@@ -81,6 +92,33 @@ impl<'a> ArrayReaderBuilder<'a> {
8192
}
8293
}
8394

95+
/// Build cached array reader if the field is in the projection mask and in the cache
96+
fn build_cached_reader(
97+
&self,
98+
field: &ParquetField,
99+
mask: &ProjectionMask,
100+
) -> Result<Option<Box<dyn ArrayReader>>> {
101+
let Some(cached_predicate_result) = self.cached_predicate_result else {
102+
return Ok(None);
103+
};
104+
105+
// TODO how to find a cached struct / list
106+
// (Probably have to cache the individual fields)
107+
let ParquetFieldType::Primitive {
108+
col_idx,
109+
primitive_type: _,
110+
} = &field.field_type
111+
else {
112+
return Ok(None);
113+
};
114+
115+
if !mask.leaf_included(*col_idx) {
116+
return Ok(None);
117+
}
118+
119+
cached_predicate_result.build_reader(*col_idx)
120+
}
121+
84122
/// Build array reader for map type.
85123
fn build_map_reader(
86124
&self,
@@ -375,7 +413,8 @@ mod tests {
375413
)
376414
.unwrap();
377415

378-
let array_reader = ArrayReaderBuilder::new(&file_reader)
416+
let cached_predicate_result = None;
417+
let array_reader = ArrayReaderBuilder::new(&file_reader, cached_predicate_result)
379418
.build_array_reader(fields.as_ref(), &mask)
380419
.unwrap();
381420

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::arrow::array_reader::CachedPredicateResult;
19+
use crate::arrow::arrow_reader::RowSelection;
20+
use crate::arrow::ProjectionMask;
21+
use arrow_array::{Array, BooleanArray, RecordBatch};
22+
use arrow_schema::{ArrowError, Schema, SchemaRef};
23+
use arrow_select::filter::prep_null_mask_filter;
24+
use std::sync::Arc;
25+
use arrow_select::coalesce::BatchCoalescer;
26+
27+
/// Incrementally builds the result of evaluating an ArrowPredicate on
28+
/// a RowGroup.
29+
#[derive(Debug)]
30+
pub(crate) struct CachedPredicateResultBuilder {
31+
/// What is being cached
32+
strategy: CacheStrategy,
33+
/// Total number of columns in the original parquet schema
34+
num_original_columns: usize,
35+
/// Any filters that have been applied. Note this the complete set of filters
36+
/// that have been applied to the cached batches.
37+
filters: Vec<BooleanArray>,
38+
}
39+
40+
#[derive(Debug)]
41+
enum CacheStrategy {
42+
/// Don't cache any results
43+
None,
44+
/// Cache the result of filtering all columns in the filter schema
45+
All {
46+
/// The builder for the cached batches
47+
cached_batches_builder: BatchCoalescer,
48+
/// The indexes of the columns in the original parquet schema that are in the projection
49+
original_projection: Vec<usize>,
50+
},
51+
/// Cache the result of filtering a subset of the columns in the filter schema
52+
Subset {
53+
/// The builder for the cached batches
54+
cached_batches_builder: BatchCoalescer,
55+
/// The indexes of the columns in the filter schema that are in the projection
56+
filter_projection: Vec<usize>,
57+
/// The indexes of the columns in the original parquet schema that are in the projection
58+
original_projection: Vec<usize>,
59+
},
60+
}
61+
62+
impl CachedPredicateResultBuilder {
63+
/// Create a new CachedPredicateResultBuilder
64+
///
65+
/// # Arguments:
66+
/// * `num_original_columns`: The number of columns in the original parquet schema
67+
/// * `schema`: The schema of the filtered record batch (not the original parquet schema)
68+
/// * `filter_mask`: which columns of the original parquet schema did the filter columns come from?
69+
/// * `projection_mask`: which columns of the original parquet schema are in the final projection?
70+
///
71+
/// This structure does not cache filter results for the columns that are not
72+
/// in the projection mask. This is because the filter results are not needed
73+
pub(crate) fn try_new(
74+
num_original_columns: usize,
75+
filter_schema: &SchemaRef,
76+
filter_mask: &ProjectionMask,
77+
projection_mask: &ProjectionMask,
78+
batch_size: usize,
79+
) -> Result<Self, ArrowError> {
80+
let (filter_mask_inner, projection_mask_inner) =
81+
match (filter_mask.mask(), projection_mask.mask()) {
82+
(Some(filter_mask), Some(projection_mask)) => (filter_mask, projection_mask),
83+
// None means "select all columns" so in this case cache all filtered columns
84+
(Some(filter_mask), None) => (filter_mask, filter_mask),
85+
// None means "select all columns" so in this case cache all columns used in projection
86+
(None, Some(projection_mask)) => (projection_mask, projection_mask),
87+
(None, None) => {
88+
// this means all columns are in the projection *and* filter so cache them all when possible
89+
let cached_batches_builder = BatchCoalescer::new(
90+
Arc::clone(filter_schema),
91+
batch_size,
92+
);
93+
let strategy = CacheStrategy::All {
94+
cached_batches_builder,
95+
original_projection: (0..num_original_columns).collect(),
96+
};
97+
return {
98+
Ok(Self {
99+
strategy,
100+
num_original_columns,
101+
filters: vec![],
102+
})
103+
};
104+
}
105+
};
106+
107+
// Otherwise, need to select a subset of the fields from each batch to cache
108+
109+
// This is an iterator over the fields of the schema of batches passed
110+
// to the filter.
111+
let mut filter_field_iter = filter_schema.fields.iter().enumerate();
112+
113+
let mut filter_projection = vec![];
114+
let mut original_projection = vec![];
115+
let mut fields = vec![];
116+
117+
// Iterate over the masks from the original schema
118+
assert_eq!(filter_mask_inner.len(), projection_mask_inner.len());
119+
for (original_index, (&in_filter, &in_projection)) in filter_mask_inner
120+
.iter()
121+
.zip(projection_mask_inner.iter())
122+
.enumerate()
123+
{
124+
if !in_filter {
125+
continue;
126+
}
127+
// take next field from the filter schema
128+
let (filter_index, field) =
129+
filter_field_iter.next().expect("mismatch in field lengths");
130+
if !in_projection {
131+
// this field is not in the projection, so don't cache it
132+
continue;
133+
}
134+
// this field is both in filter and the projection, so cache the results
135+
filter_projection.push(filter_index);
136+
original_projection.push(original_index);
137+
fields.push(Arc::clone(field));
138+
}
139+
let strategy = if fields.is_empty() {
140+
CacheStrategy::None
141+
} else {
142+
let cached_batches_builder =
143+
BatchCoalescer::new(Arc::new(Schema::new(fields)), batch_size);
144+
CacheStrategy::Subset {
145+
cached_batches_builder,
146+
filter_projection,
147+
original_projection,
148+
}
149+
};
150+
151+
Ok(Self {
152+
strategy,
153+
num_original_columns,
154+
filters: vec![],
155+
})
156+
}
157+
158+
/// Add a new batch and filter to the builder
159+
pub(crate) fn add(
160+
&mut self,
161+
batch: RecordBatch,
162+
mut filter: BooleanArray,
163+
) -> crate::errors::Result<()> {
164+
if filter.null_count() > 0 {
165+
filter = prep_null_mask_filter(&filter);
166+
}
167+
168+
match &mut self.strategy {
169+
CacheStrategy::None => {}
170+
CacheStrategy::All {
171+
cached_batches_builder,
172+
..
173+
} => {
174+
cached_batches_builder.push_batch_with_filter(batch, &filter)?;
175+
}
176+
CacheStrategy::Subset {
177+
cached_batches_builder,
178+
ref filter_projection,
179+
..
180+
} => {
181+
// If we have a filter projection, we need to project the batch
182+
// to only the columns that are in the filter projection
183+
let projected_batch = batch.project(filter_projection)?;
184+
cached_batches_builder.push_batch_with_filter(projected_batch, &filter)?;
185+
}
186+
}
187+
188+
self.filters.push(filter);
189+
190+
Ok(())
191+
}
192+
193+
/// Return (selection, maybe_cached_predicate_result) that represents the rows
194+
/// that were selected and batches that were evaluated.
195+
pub(crate) fn build(
196+
self,
197+
) -> crate::errors::Result<(RowSelection, Option<CachedPredicateResult>)> {
198+
let Self {
199+
strategy,
200+
num_original_columns,
201+
filters,
202+
} = self;
203+
204+
let new_selection = RowSelection::from_filters(&filters);
205+
206+
match strategy {
207+
CacheStrategy::None => Ok((new_selection, None)),
208+
CacheStrategy::All {
209+
mut cached_batches_builder,
210+
original_projection,
211+
}
212+
| CacheStrategy::Subset {
213+
mut cached_batches_builder,
214+
original_projection,
215+
..
216+
} => {
217+
// explode out the cached batches into the proper place in the original schema
218+
cached_batches_builder.finish_buffered_batch()?;
219+
let completed_batches = cached_batches_builder
220+
.take_completed_batches();
221+
222+
let mut cached_result = CachedPredicateResult::new(num_original_columns, filters);
223+
for (batch_index, original_idx) in original_projection.iter().enumerate() {
224+
let mut column_arrays = Vec::with_capacity(completed_batches.len());
225+
for batch in &completed_batches {
226+
column_arrays.push(Arc::clone(batch.column(batch_index)));
227+
}
228+
cached_result.add_result(*original_idx, column_arrays);
229+
}
230+
231+
Ok((new_selection, Some(cached_result)))
232+
}
233+
}
234+
}
235+
}

0 commit comments

Comments
 (0)