Skip to content

Commit 5e81ee4

Browse files
committed
Try to fix the page skip issue
1 parent 37ab45a commit 5e81ee4

File tree

2 files changed

+219
-25
lines changed

2 files changed

+219
-25
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 182 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::column::page::{PageIterator, PageReader};
3939
use crate::encryption::decrypt::FileDecryptionProperties;
4040
use crate::errors::{ParquetError, Result};
4141
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
42+
use crate::file::page_index::offset_index::OffsetIndexMetaData;
4243
use crate::file::reader::{ChunkReader, SerializedPageReader};
4344
use crate::schema::types::SchemaDescriptor;
4445

@@ -51,6 +52,61 @@ mod read_plan;
5152
mod selection;
5253
pub mod statistics;
5354

55+
/// Returns true when `selection` keeps some rows for `projection` but prunes whole
56+
/// data pages (determined via `OffsetIndex`). Masks can't handle this because the
57+
/// page data is never fetched, so the caller should fall back to RowSelectors.
58+
pub(crate) fn selection_skips_any_page(
59+
selection: &RowSelection,
60+
projection: &ProjectionMask,
61+
columns: &[OffsetIndexMetaData],
62+
) -> bool {
63+
columns.iter().enumerate().any(|(leaf_idx, column)| {
64+
if !projection.leaf_included(leaf_idx) {
65+
return false;
66+
}
67+
68+
let locations = column.page_locations();
69+
if locations.is_empty() {
70+
return false;
71+
}
72+
73+
let ranges = selection.scan_ranges(locations);
74+
!ranges.is_empty() && ranges.len() < locations.len()
75+
})
76+
}
77+
78+
fn selection_requires_selectors_for_row_groups(
79+
selection: Option<&RowSelection>,
80+
projection: &ProjectionMask,
81+
metadata: &ParquetMetaData,
82+
row_groups: &[usize],
83+
) -> bool {
84+
let mut remaining = match selection {
85+
Some(selection) => selection.clone(),
86+
None => return false,
87+
};
88+
89+
let offset_index = match metadata.offset_index() {
90+
Some(index) => index,
91+
None => return false,
92+
};
93+
94+
for &rg_idx in row_groups {
95+
let columns = match offset_index.get(rg_idx) {
96+
Some(columns) if !columns.is_empty() => columns,
97+
_ => continue,
98+
};
99+
100+
let row_count = metadata.row_group(rg_idx).num_rows() as usize;
101+
let rg_selection = remaining.split_off(row_count);
102+
if selection_skips_any_page(&rg_selection, projection, columns) {
103+
return true;
104+
}
105+
}
106+
107+
false
108+
}
109+
54110
/// Builder for constructing Parquet readers that decode into [Apache Arrow]
55111
/// arrays.
56112
///
@@ -918,18 +974,43 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
918974
.build_array_reader(fields.as_deref(), predicate.projection())?;
919975

920976
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
977+
978+
// Once a predicate has refined the selection, re-check if any requested
979+
// column now skips entire pages. In that case the boolean-mask strategy
980+
// would try to filter data we never fetched; selectors are safe.
981+
if selection_requires_selectors_for_row_groups(
982+
plan_builder.selection(),
983+
predicate.projection(),
984+
reader.metadata.as_ref(),
985+
&reader.row_groups,
986+
) {
987+
plan_builder =
988+
plan_builder.with_selection_strategy(RowSelectionStrategy::Selectors);
989+
}
921990
}
922991
}
923992

924993
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
925994
.build_array_reader(fields.as_deref(), &projection)?;
926995

927-
let read_plan = plan_builder
996+
let mut plan_builder = plan_builder
928997
.limited(reader.num_rows())
929998
.with_offset(offset)
930999
.with_limit(limit)
931-
.build_limited()
932-
.build();
1000+
.build_limited();
1001+
1002+
// Offset/limit can also trim per-row-group selections. Ensure the final
1003+
// plan doesn't leave mask-backed cursors pointing at pruned pages.
1004+
if selection_requires_selectors_for_row_groups(
1005+
plan_builder.selection(),
1006+
&projection,
1007+
reader.metadata.as_ref(),
1008+
&reader.row_groups,
1009+
) {
1010+
plan_builder = plan_builder.with_selection_strategy(RowSelectionStrategy::Selectors);
1011+
}
1012+
1013+
let read_plan = plan_builder.build();
9331014

9341015
Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
9351016
}
@@ -1257,6 +1338,28 @@ mod tests {
12571338
use std::path::PathBuf;
12581339
use std::sync::Arc;
12591340

1341+
use crate::arrow::arrow_reader::{
1342+
ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1343+
ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelectionStrategy,
1344+
RowSelector,
1345+
};
1346+
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1347+
use crate::arrow::{ArrowWriter, ProjectionMask};
1348+
use crate::basic::{Compression, ConvertedType, Encoding, Repetition, Type as PhysicalType};
1349+
use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1350+
use crate::data_type::{
1351+
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1352+
FloatType, Int32Type, Int64Type, Int96, Int96Type,
1353+
};
1354+
use crate::errors::Result;
1355+
use crate::file::metadata::ParquetMetaData;
1356+
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1357+
use crate::file::writer::SerializedFileWriter;
1358+
use crate::schema::parser::parse_message_type;
1359+
use crate::schema::types::{Type, TypePtr};
1360+
use crate::util::test_common::rand_gen::RandGen;
1361+
use arrow::compute::kernels::cmp::eq;
1362+
use arrow::compute::or;
12601363
use arrow_array::builder::*;
12611364
use arrow_array::cast::AsArray;
12621365
use arrow_array::types::{
@@ -1277,26 +1380,6 @@ mod tests {
12771380
use rand::{Rng, RngCore, rng};
12781381
use tempfile::tempfile;
12791382

1280-
use crate::arrow::arrow_reader::{
1281-
ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
1282-
ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
1283-
};
1284-
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
1285-
use crate::arrow::{ArrowWriter, ProjectionMask};
1286-
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
1287-
use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1288-
use crate::data_type::{
1289-
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1290-
FloatType, Int32Type, Int64Type, Int96, Int96Type,
1291-
};
1292-
use crate::errors::Result;
1293-
use crate::file::metadata::ParquetMetaData;
1294-
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1295-
use crate::file::writer::SerializedFileWriter;
1296-
use crate::schema::parser::parse_message_type;
1297-
use crate::schema::types::{Type, TypePtr};
1298-
use crate::util::test_common::rand_gen::RandGen;
1299-
13001383
#[test]
13011384
fn test_arrow_reader_all_columns() {
13021385
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
@@ -5145,6 +5228,82 @@ mod tests {
51455228
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
51465229
}
51475230

5231+
const SECOND_MATCH_INDEX: usize = 4096;
5232+
const SECOND_MATCH_VALUE: i64 = 12345;
5233+
5234+
fn build_mask_pruning_parquet() -> Bytes {
5235+
let schema = Arc::new(Schema::new(vec![
5236+
Field::new("key", ArrowDataType::Int64, false),
5237+
Field::new("value", ArrowDataType::Float64, false),
5238+
]));
5239+
5240+
let num_rows = 8192usize;
5241+
let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5242+
int_values[0] = 9999;
5243+
int_values[SECOND_MATCH_INDEX] = SECOND_MATCH_VALUE;
5244+
let keys = Int64Array::from(int_values);
5245+
let values = Float64Array::from_iter_values((0..num_rows).map(|v| v as f64 * 1.5));
5246+
let batch = RecordBatch::try_new(
5247+
Arc::clone(&schema),
5248+
vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5249+
)
5250+
.unwrap();
5251+
5252+
let props = WriterProperties::builder()
5253+
.set_compression(Compression::SNAPPY)
5254+
.set_data_page_size_limit(1024)
5255+
.set_data_page_row_count_limit(32)
5256+
.build();
5257+
5258+
let mut buffer = Vec::new();
5259+
let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5260+
writer.write(&batch).unwrap();
5261+
writer.close().unwrap();
5262+
5263+
Bytes::from(buffer)
5264+
}
5265+
5266+
#[test]
5267+
fn test_mask_strategy_full_page_skip_triggers_error() {
5268+
let data = build_mask_pruning_parquet();
5269+
5270+
let filter_mask;
5271+
let output_mask;
5272+
{
5273+
let options = ArrowReaderOptions::new().with_page_index(true);
5274+
let builder =
5275+
ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5276+
.unwrap();
5277+
let schema = builder.parquet_schema().clone();
5278+
filter_mask = ProjectionMask::leaves(&schema, [0]);
5279+
output_mask = ProjectionMask::leaves(&schema, [1]);
5280+
}
5281+
5282+
let options = ArrowReaderOptions::new().with_page_index(true);
5283+
let predicate = ArrowPredicateFn::new(filter_mask, move |batch: RecordBatch| {
5284+
let column = batch.column(0);
5285+
let match_first = eq(column, &Int64Array::new_scalar(9999))?;
5286+
let match_second = eq(column, &Int64Array::new_scalar(SECOND_MATCH_VALUE))?;
5287+
or(&match_first, &match_second)
5288+
});
5289+
5290+
let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data, options)
5291+
.unwrap()
5292+
.with_projection(output_mask)
5293+
.with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5294+
.with_batch_size(256)
5295+
.with_row_selection_strategy(RowSelectionStrategy::Mask)
5296+
.build()
5297+
.unwrap();
5298+
5299+
// The mask strategy used to panic once predicate pruning removed whole pages.
5300+
// Collecting into batches validates the plan now downgrades to selectors instead.
5301+
let schema = reader.schema().clone();
5302+
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5303+
let result = concat_batches(&schema, &batches).unwrap();
5304+
assert_eq!(result.num_rows(), 2);
5305+
}
5306+
51485307
#[test]
51495308
fn test_get_row_group_column_bloom_filter_with_length() {
51505309
// convert to new parquet file with bloom_filter_length

parquet/src/arrow/async_reader/mod.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef};
4040

4141
use crate::arrow::arrow_reader::{
4242
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
43-
RowFilter, RowSelection, RowSelectionStrategy,
43+
RowFilter, RowSelection, RowSelectionStrategy, selection_skips_any_page,
4444
};
4545

4646
use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
@@ -49,6 +49,7 @@ use crate::bloom_filter::{
4949
};
5050
use crate::errors::{ParquetError, Result};
5151
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
52+
use crate::file::page_index::offset_index::OffsetIndexMetaData;
5253

5354
mod metadata;
5455
pub use metadata::*;
@@ -637,6 +638,17 @@ where
637638
return Ok((self, None)); // ruled out entire row group
638639
}
639640

641+
// Predicate evaluation can zero out some pages; switch to selectors
642+
// before the mask-backed cursor tries to touch data we won't fetch.
643+
if selection_requires_selector(
644+
plan_builder.selection(),
645+
predicate.projection(),
646+
offset_index,
647+
) {
648+
plan_builder =
649+
plan_builder.with_selection_strategy(RowSelectionStrategy::Selectors);
650+
}
651+
640652
// (pre) Fetch only the columns that are selected by the predicate
641653
let selection = plan_builder.selection();
642654
// Fetch predicate columns; expand selection only for cached predicate columns
@@ -669,12 +681,17 @@ where
669681
}
670682

671683
// Apply any limit and offset
672-
let plan_builder = plan_builder
684+
let mut plan_builder = plan_builder
673685
.limited(row_group.row_count)
674686
.with_offset(self.offset)
675687
.with_limit(self.limit)
676688
.build_limited();
677689

690+
// Offset/limit may further trim pages, so re-evaluate the strategy here too.
691+
if selection_requires_selector(plan_builder.selection(), &projection, offset_index) {
692+
plan_builder = plan_builder.with_selection_strategy(RowSelectionStrategy::Selectors);
693+
}
694+
678695
let rows_after = plan_builder
679696
.num_rows_selected()
680697
.unwrap_or(row_group.row_count);
@@ -765,6 +782,24 @@ where
765782
}
766783
}
767784

785+
fn selection_requires_selector(
786+
selection: Option<&RowSelection>,
787+
projection: &ProjectionMask,
788+
offset_index: Option<&[OffsetIndexMetaData]>,
789+
) -> bool {
790+
let selection = match selection {
791+
Some(selection) => selection,
792+
None => return false,
793+
};
794+
795+
let offset_index = match offset_index {
796+
Some(index) => index,
797+
None => return false,
798+
};
799+
800+
selection_skips_any_page(selection, projection, offset_index)
801+
}
802+
768803
enum StreamState<T> {
769804
/// At the start of a new row group, or the end of the parquet stream
770805
Init,

0 commit comments

Comments
 (0)