-
Notifications
You must be signed in to change notification settings - Fork 999
Speed up Parquet filter pushdown v4 (Predicate evaluation cache for async_reader) #7850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
#[derive(Clone)] | ||
pub struct CacheOptions<'a> { | ||
pub projection_mask: &'a ProjectionMask, | ||
pub cache: Arc<Mutex<RowGroupCache>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Practically there's no contention because there's not parallelism in decoding one row group. we add mutex here because we need to use Arc.
let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new( | ||
batch_size, | ||
// None, | ||
Some(1024 * 1024 * 100), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently hard-coded, leave it a future work to make it configurable through user settings
@@ -613,8 +623,18 @@ where | |||
.fetch(&mut self.input, predicate.projection(), selection) | |||
.await?; | |||
|
|||
let mut cache_projection = predicate.projection().clone(); | |||
cache_projection.intersect(&projection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A column is cached if and only if it appears both in output projection and filter projection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So one thing I didn't understand after reading this PR in detail was how the relative row positions are updated after applying a filter.
For example if we are applying multiple filters, the first may reduce the original RowSelection down to [100->200]
, and now when the second filter runs it is only evaluated on the 100->200 rows , not the original selection
In other words I think there needs to be some sort of function equvalent to RowSelection::and_then
that applies to the cache
// Narrow the cache so that it only retains the results of evaluating the predicate
let row_group_cache = row_group_cache.and_then(resulting_selection)
Maybe this is the root cause of https://github.com/apache/datafusion/actions/runs/16302299778/job/46039904381?pr=16711
} | ||
|
||
fn get_def_levels(&self) -> Option<&[i16]> { | ||
None // we don't allow nullable parent for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nested columns not support yet
😮 -- My brain is likely too fried at the moment to review this properly but it is on my list for first thing tomorrow |
Thank you @XiangpengHao for amazing work, i will try to review and test this PR! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR is I think this is really clever - very nice @XiangpengHao . I left some structural comments / suggestions but nothing major.
I will run some more benchmarks, but it was showing very nice improvements for Q21 locally for me (129ms --> 90ms)
If that looks good I'll wire it up in DataFusion and run those benchmarks
Some thoughts:
- I would be happy to wire in the buffering limit / API
- As you say, there are many more improvements possible -- specifically I suspect the
RowSelector
representation is going to cause us pain and suffering for filters that have many short selections when bitmaps would be a better choice
Buffering
I think buffering the intermediate filter results is unavoidable if we want to preserve the current behavior to minimizes the size of IO requests
If we want to reduce buffering I think we can only really do it by increasing the number of IO requests (so we can incrementally produce the final output). I think we should proceed with buffering and then tune if/when needed
CacheOptions { | ||
projection_mask: &cache_projection, | ||
cache: row_group_cache.clone(), | ||
role: crate::arrow::array_reader::CacheRole::Producer, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
structurally both here and below it might help to keep the creation ofthe CacheOptions
into the cache itself so a reader of this code doesn't have to understand the innards of the cache
CacheOptions { | |
projection_mask: &cache_projection, | |
cache: row_group_cache.clone(), | |
role: crate::arrow::array_reader::CacheRole::Producer, | |
}, | |
row_group_cache.producer_options(projection, predicate.proection()) |
|
||
let start_position = self.outer_position - row_count; | ||
|
||
let selection_buffer = row_selection_to_boolean_buffer(row_count, self.selections.iter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is clever -- though it will likely suffer from the same "RowSelection is a crappy representation for small selection runs" problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is to alleviate the problem. If we have multiple small selection runs on the same cached batch, first combine them into a boolean buffer, and do boolean selection once.
.expect("data must be already cached in the read_records call, this is a bug"); | ||
let cached = cached.slice(overlap_start - batch_start, selection_length); | ||
let filtered = arrow_select::filter::filter(&cached, &mask_array)?; | ||
selected_arrays.push(filtered); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably use the new BatchCoalescer
here instead: https://docs.rs/arrow/latest/arrow/compute/struct.BatchCoalescer.html
It is definitely faster for primitive arrays and will save intermediate memory usage
It might have some trouble with StringView as it also tries to gc internally too -- we may need to optimize the output to avoid gc'ing if we see the same buffer from call to call
🤖 |
🤖: Benchmark completed Details
|
🤖 |
😎 -- very nice |
Great result! I am curious about the performance compared with no filter pushdown case, because previous try will also improve the performance for this benchmark. But compared to the no filter pushdown case, it has some regression. |
I will try and run this experiment later today |
Thank you @alamb , if it has no regression, i believe this PR will also resolve the adaptive selection cases, if it has regression, we can further combine the adaptive selection for final optimization. |
This comment was marked as resolved.
This comment was marked as resolved.
🤖: Benchmark completed Details
|
This comment was marked as resolved.
This comment was marked as resolved.
I will do this once the above two PRs are merged |
Revert backwards incompatible changes to the Parquet reader API
Clarify in documentation that cache is only for async decoder
I really like #8000, thank you @alamb for writing it up! I'll think about it over the next couple of days. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank yoU @XiangpengHao -- I think we should proceed with this PR
I broke out some of the infrastructure into a new PR in case that is easier for other reviewers
What I think we should do is wait until after we cut the next release (eta early next week) and then merge it in
# Which issue does this PR close? We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. - related to #7850 # Rationale for this change While reviewing #7850 from @XiangpengHao I found myself wanting even more comments (or maybe I was doing this as an exercise to load the state back into my head) In any case, I wrote up some comments that I think would make the code easier to understand # What changes are included in this PR? Add some more docs # Are these changes tested? By CI # Are there any user-facing changes? No -- this is documentation to internal interfaces There is no code or functional change
Now that we have released 56.0.0 and we have a story for why we won't do predicate result caching for the sync reader (namely #7983) I think we are ready to merge this PR I merged up from main, and I am going to take one more look to make sure there are no breaking API changes |
use std::sync::atomic::AtomicUsize; | ||
use std::sync::Arc; | ||
|
||
/// This enum represents the state of Arrow reader metrics collection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think the addition of metrics will be very helpful for other use cases (as mentioned recently by @mapleFU and @steveloughran recently)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another look at this PR and I think we need to fix the test before merging. Otherwise we are good to go.
I'll follow up with @XiangpengHao and either he or I will fix it
@@ -1832,6 +1882,7 @@ mod tests { | |||
assert_eq!(total_rows, 730); | |||
} | |||
|
|||
#[ignore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can merge this PR without un-ignoring this test
I think it is showing a regression. When I I looked into it more, and it seems like the new cache, even when supposedly disabled, is changing the behavior and fetching more pages.
I think we need to ensure that if the cache is disabled, then the IO behavior is the same as before
Specifically it looks like we now fetch all the pages, even those that they are supposed to be skipped:
Expected page requests: [
113..222,
331..440,
573..682,
791..900,
1033..1142,
1251..1360,
...
Actual page requests: [
4..113,
113..222,
222..331,
331..440,
440..573,
573..682,
682..791,
791..900,
900..1033,
1033..1142,
1142..1251,
1251..1360,
...
Here is the diff I was using to investigate:
Details
diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs
index 843ad766e9..b3da39c48e 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -1884,7 +1884,6 @@ mod tests {
assert_eq!(total_rows, 730);
}
- #[ignore]
#[tokio::test]
async fn test_in_memory_row_group_sparse() {
let testdata = arrow::util::test_util::parquet_test_data();
@@ -1925,8 +1924,6 @@ mod tests {
)
.unwrap();
- let _schema_desc = metadata.file_metadata().schema_descr();
-
let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
let reader_factory = ReaderFactory {
@@ -1946,19 +1943,25 @@ mod tests {
// Setup `RowSelection` so that we can skip every other page, selecting the last page
let mut selectors = vec![];
let mut expected_page_requests: Vec<Range<usize>> = vec![];
+ let mut page_idx = 0;
while let Some(page) = pages.next() {
+
let num_rows = if let Some(next_page) = pages.peek() {
next_page.first_row_index - page.first_row_index
} else {
num_rows - page.first_row_index
};
+ println!("page {page_idx}: first_row_index={} offset={} compressed_page_size={}, num_rows={num_rows}, skip={skip}", page.first_row_index, page.offset, page.compressed_page_size);
+ page_idx += 1;
+ let start = page.offset as usize;
+ let end = start + page.compressed_page_size as usize;
if skip {
selectors.push(RowSelector::skip(num_rows as usize));
+ println!(" skipping page with {num_rows} rows : {start}..{end}");
} else {
selectors.push(RowSelector::select(num_rows as usize));
- let start = page.offset as usize;
- let end = start + page.compressed_page_size as usize;
+ println!(" selecting page with {num_rows} rows: {start}..{end}");
expected_page_requests.push(start..end);
}
skip = !skip;
@@ -1973,7 +1976,13 @@ mod tests {
let requests = requests.lock().unwrap();
- assert_eq!(&requests[..], &expected_page_requests)
+ println!("Expected page requests: {:#?}", &expected_page_requests);
+ println!("Actual page requests: {:#?}", &requests[..]);
+
+ assert_eq!(
+ format!("{:#?}",&expected_page_requests),
+ format!("{:#?}", &requests[..]),
+ );
}
#[tokio::test]
yes, will take a look soon
|
I have a few more things I'd like to change, will update here once they're ready |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've finished my pass with two new changes, can you take a look? @alamb
No test is ignored now.
} | ||
|
||
/// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) | ||
fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option<ProjectionMask> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New change 1: exclude nested column from cache.
Previous behavior: panic.
It's not impossible but very hard to support cache nested columns. We don't support it yet.
With this change, it will fallback to the old implementation, i.e., decode twice, but at least will not panic.
@@ -924,7 +1014,15 @@ impl InMemoryRowGroup<'_> { | |||
_ => (), | |||
} | |||
|
|||
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); | |||
// Expand selection to batch boundaries only for cached columns | |||
let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change 2: only expand the selection for the caching column, not other columns. This should improve the IO.
Thank you @XiangpengHao -- I am starting to check this out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @XiangpengHao
I looked at the code in the latest commits and it looks good to me. I am testing this PR here
- Rerunning the performance tests in DataFusion (see apache/datafusion#16711 (comment))
- Rerunning benchmarks on this PR #7850 (comment)
- this PR with the new tests from #7971 (see #8096)
Assuming everything looks good I'll merge it in
@@ -1920,7 +1930,6 @@ mod tests { | |||
assert_eq!(total_rows, 730); | |||
} | |||
|
|||
#[ignore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
🤖 |
🤖: Benchmark completed Details
|
Ok,I think we have bikeshed this one enough and let's go! |
# Which issue does this PR close? - Part of #8000 - Related to #7850 # Rationale for this change There is quite a bit of code in the current Parquet sync and async readers related to IO patterns that I do not think is not covered by existing tests. As I refactor the guts of the readers into the PushDecoder, I would like to ensure we don't introduce regressions in existing functionality. I would like to add tests that cover the IO patterns of the Parquet Reader so I don't break it # What changes are included in this PR? Add tests which 1. Creates a temporary parquet file with a known row group structure 2. Reads data from that file using the Arrow Parquet Reader, recording the IO operations 3. Asserts the expected IO patterns based on the read operations in a human understandable behavior This is done for both the sync and async readers. I am sorry this is such a massive PR, but it is entirely tests and I think it is quite important. I could break the sync or async tests into their own PR, but this seems uncessary # Are these changes tested? Yes, indeed the entire PR is only tests # Are there any user-facing changes?
This is my latest attempt to make pushdown faster. Prior art: #6921
cc @alamb @zhuqi-lucas
filter_pushdown
) by default datafusion#3463Problems of #6921
This PR takes a different approach, it does not change the decoding pipeline, so we avoid the problem 1. It also caches the arrow record batch, so avoid problem 2.
But this means we need to use more memory to cache data.
How it works?
array_readers
with a transparentcached_array_reader
.RowGroupCache
to look for a batch, and only reads from underlying reader on a cache miss.In a concurrent setup, not all reader may reach the peak point at the same time, so the peak system memory usage might be lower.
cached_array_reader
will fallback to read and decode from Parquet.Other benefits
How does it perform?
My criterion somehow won't produces a result from
--save-baseline
, so I asked llm to generate a table from this benchmark:Baseline
is the implementation for current main branch.New Unlimited
is the new pushdown with unlimited memory budget.New 100MB
is the new pushdown but the memory budget for a row group caching is 100MB.Limitations
Next steps?
This pr is largely proof of concept, I want to collect some feedback before sending a multi-thousands pr :)
Some items I can think of: