diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index ea068acb29eb..8b9189f9ec4a 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -16,15 +16,6 @@ // under the License. //! Contains reader which reads parquet data into arrow [`RecordBatch`] - -use arrow_array::cast::AsArray; -use arrow_array::Array; -use arrow_array::{RecordBatch, RecordBatchReader}; -use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; -pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; -use std::sync::Arc; - pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; @@ -36,6 +27,16 @@ use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; +use arrow_array::cast::AsArray; +use arrow_array::{Array, BooleanArray}; +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_buffer::BooleanBufferBuilder; +use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_select::filter::filter; +pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +pub use selection::{RowSelection, RowSelector}; +use std::collections::VecDeque; +use std::sync::Arc; pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder}; @@ -799,67 +800,169 @@ impl Iterator for ParquetRecordBatchReader { } } +/// Take the next selection from the selection queue, and return the selection +/// whose selected row count is to_select or less (if input selection is exhausted). +fn take_next_selection( + selection: &mut VecDeque, + to_select: usize, +) -> Option { + let mut current_selected = 0; + let mut rt = Vec::new(); + while let Some(front) = selection.pop_front() { + if front.skip { + rt.push(front); + continue; + } + + if current_selected + front.row_count <= to_select { + rt.push(front); + current_selected += front.row_count; + } else { + let select = to_select - current_selected; + let remaining = front.row_count - select; + rt.push(RowSelector::select(select)); + selection.push_front(RowSelector::select(remaining)); + return Some(rt.into()); + } + } + if !rt.is_empty() { + return Some(rt.into()); + } + None +} + impl ParquetRecordBatchReader { /// Returns the next `RecordBatch` from the reader, or `None` if the reader /// has reached the end of the file. /// /// Returns `Result>` rather than `Option>` to /// simplify error handling with `?` + /// + /// Use the adaptive selection strategy to read the next batch of rows, here are the + /// details about the policy: + /// + /// **Window Size**: The adaptive window size equals the configured `batch_size`. + /// For each call, the reader processes up to `batch_size` rows in one logical window. + /// It dynamically decides on a per-subwindow basis whether to use: + /// - **Range-based selection** (default, for higher throughput) + /// - **Bitmap-based selection** (finer granularity when runs are very short) + /// + /// **Switching Criterion**: Only when the *average run length* in the subwindow is < 10 rows + /// (i.e. `total_rows < 10 * num_runs`) do we switch from range to bitmap mode. + /// + /// **Example Patterns (sub-window examples)**: + /// *Note: these totals refer to a sampled sub-window of the full batch, not the entire `batch_size`.* + /// + /// ```text + /// Batch size = 8192 rows (Window) + /// + /// 1) Big range runs: + /// [2000 read | 2000 skip | 4192 read] + /// avg ≈ 3 runs, avg length ≈ 2730 → **range** mode + /// + /// 2) Medium range runs: + /// [200 read | 200 skip | 200 read ...] + /// avg length ≈ 200 → **range** mode + /// + /// 3) Dense small runs (many small alternations): + /// [ 5 read | 10 skip | 5 read | 10 skip | 5 read | 5 read ... ] + /// avg ≈ 6.7 < 10 → **bitmap** mode + /// ``` + /// + /// Returns a `RecordBatch` if any rows are produced, or `None` when no rows remain. fn next_inner(&mut self) -> Result> { let mut read_records = 0; let batch_size = self.batch_size(); + let mut mask_builder = BooleanBufferBuilder::new(batch_size); + match self.read_plan.selection_mut() { Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); - if front.skip { - let skipped = self.array_reader.skip_records(front.row_count)?; - - if skipped != front.row_count { - return Err(general_err!( - "failed to skip rows, expected {}, got {}", - front.row_count, - skipped - )); + while let Some(cur_selection) = + take_next_selection(selection, batch_size - read_records) + { + let mut total_read = 0; + let mut total_skip = 0; + for r in cur_selection.iter() { + if r.skip { + total_skip += r.row_count; + } else { + total_read += r.row_count; } - continue; - } - - //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. - //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 - if front.row_count == 0 { - continue; } - - // try to read record - let need_read = batch_size - read_records; - let to_read = match front.row_count.checked_sub(need_read) { - Some(remaining) if remaining != 0 => { - // if page row count less than batch_size we must set batch size to page row count. - // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); - need_read + let select_count = cur_selection.iter().count(); + let total = total_skip + total_read; + + if total < 10 * select_count { + // Choose bitmap and then to filter if runs are on average < 10 rows + let mut bitmap_builder = BooleanBufferBuilder::new(total); + for select in cur_selection.iter() { + bitmap_builder.append_n(select.row_count, !select.skip); } - _ => front.row_count, - }; - match self.array_reader.read_records(to_read)? { - 0 => break, - rec => read_records += rec, - }; + let bitmap = BooleanArray::new(bitmap_builder.finish(), None); + self.array_reader.read_records(bitmap.len())?; + mask_builder.append_buffer(bitmap.values()); + read_records += bitmap.true_count(); + } else { + // Use fast range-Based skip/read + for select in cur_selection.iter() { + if select.skip { + let skipped = self.array_reader.skip_records(select.row_count)?; + + if skipped != select.row_count { + return Err(general_err!( + "failed to skip rows, expected {}, got {}", + select.row_count, + skipped + )); + } + continue; + } else { + //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. + //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 + if select.row_count == 0 { + continue; + } + match self.array_reader.read_records(select.row_count)? { + 0 => break, + rec => { + mask_builder.append_n(rec, true); + read_records += rec; + } + }; + } + } + } + // Stop once ~75% of window size is filled + if read_records >= (batch_size / 4 * 3) { + break; + } } } None => { - self.array_reader.read_records(batch_size)?; + // No selector: read entire batch + let rec = self.array_reader.read_records(self.batch_size())?; + mask_builder.append_n(rec, true); } }; let array = self.array_reader.consume_batch()?; - let struct_array = array.as_struct_opt().ok_or_else(|| { - ArrowError::ParquetError("Struct array reader should return struct array".to_string()) - })?; + if array.is_empty() { + return Ok(None); + } + + // Apply mask if used + let final_array = if mask_builder.is_empty() { + array + } else { + let bitmap = BooleanArray::new(mask_builder.finish(), None); + filter(&array, &bitmap)? + }; + + let struct_arr = final_array.as_struct_opt().expect("StructArray expected"); - Ok(if struct_array.len() > 0 { - Some(RecordBatch::from(struct_array)) + // Return only non-empty batches + Ok(if struct_arr.len() > 0 { + Some(RecordBatch::from(struct_arr)) } else { None })