-
Notifications
You must be signed in to change notification settings - Fork 3.9k
ARROW-11300: [Rust][DataFusion] Further performance improvements on hash aggregation with small groups #9271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@jorgecarleitao while the change makes hash aggregates a bit faster already, it seems |
|
Yes, slicing is suboptimal atm. Also, IMO it should not be the That analysis is really cool, though. How did you came up with that????? 😮 Any blog post or readme??? :P |
|
Thanks @jorgecarleitao makes sense. |
|
This PR in itself is ready for review now. The change in approach will help us in the future, even more with |
Codecov Report
@@ Coverage Diff @@
## master #9271 +/- ##
=======================================
Coverage 81.89% 81.89%
=======================================
Files 215 215
Lines 52988 53003 +15
=======================================
+ Hits 43392 43407 +15
Misses 9596 9596
Continue to review full report at Codecov.
|
|
@jorgecarleitao I found the "offending" code is this function in /// Creates a zero-copy slice of itself. This creates a new [ArrayData]
/// with a different offset, len and a shifted null bitmap.
///
/// # Panics
///
/// Panics if `offset + length > self.len()`.
pub fn slice(&self, offset: usize, length: usize) -> ArrayData {
assert!((offset + length) <= self.len());
let mut new_data = self.clone();
new_data.len = length;
new_data.offset = offset + self.offset;
new_data.null_count =
count_nulls(new_data.null_buffer(), new_data.offset, new_data.len);
new_data
}
|
|
Isn't the data contained on a buffer |
|
@jorgecarleitao I think the clone on the |
|
Cloning the vector of Buffers and child ArrayData has some overhead. Incrementing the reference counts should be relatively cheap unless there are concurrent threads accessing the same Arc. I tried replacing the vectors inside ArrayData with SmallVec some time ago. That made the slice benchmarks faster, but several other benchmarks slowed down because of it. Might be worth to revisit that. |
This relates to the other discussion that we had on how slicing an array does a clone without propagating offset information to child_data and buffers. I find it interesting though that Is that not the case? |
|
I don't think indeed it is very expensive on large Arrays compared to the size / operations on the array, but it turns out to be expensive on very small arrays. For this PR I am using I am wondering if instead of trying to make a new array when doing Something like this : |
|
This PR itself is ready for review. I think the performance for slicing small slices would be something to look at later. |
|
Thanks a lot for your points. I am learning a lot! :) Note that for small arrays, we are basically in the metadata problem on which the "payload size" of transmitting 1 element is driven by its metadata, not the data itself. This will always be a problem, as the arrow format was designed to be performant for large arrays. For example, all our buffers are shared via an With that said, we could consider replacing Another idea is to use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. cool idea :)
|
Thanks, that it a great summary of the situation! I think removing the |
b1ba192 to
f578e4c
Compare
|
I rebased this PR @jorgecarleitao @alamb |
|
Thanks @Dandandan . fmt missing :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me @Dandandan -- thank you. I read it fairly closely and it makes sense.
| } | ||
|
|
||
| // Collect all indices + offsets based on keys in this vec | ||
| let mut batch_indices: UInt32Builder = UInt32Builder::new(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you could get any additional performance by using the knowledge of the size of batch_keys
| let mut batch_indices: UInt32Builder = UInt32Builder::new(0); | |
| let mut batch_indices: UInt32Builder = UInt32Builder::new(batch_keys.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe you have to scale it by the number of accumulators too
|
|
||
| // Collect all indices + offsets based on keys in this vec | ||
| let mut batch_indices: UInt32Builder = UInt32Builder::new(0); | ||
| let mut offsets = vec![0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let mut offsets = vec![0]; | |
| let mut offsets = Vec::with_capacity(batch_keys.len()); | |
| offsets.push(0); |
…rays # Rational Currently, all our arrays use an `Arc<ArrayData>`, which they expose via `Array::data` and `Array::data_ref`. This adds a level of indirection. Now, it happens that, afaik, in the current code base `Arc<>` is not needed. On #9271 we are observing some performace issues with small arrays, and one of the ideas that came up was to get rid of `Arc` and see what happens. # This PR Well, this PR replaces all `Arc<ArrayData>` by `ArrayData`. On the one hand, this means that cloning an array is a tad more expensive (`Arc` vs `ArrayData`). On the other hand, it means that often the compiler can optimize out. The gist of the benchmarks below is: * ~10%-20% improvement over basically everything * ~20%-100% improvement in `take` There is some noise, as there are benches that are not expected to be affected and are being affected. Personally, I like this PR because it makes working with `ArrayData` and arrays simpler: no need for `Arc::new` or `as_ref` and company (besides the speed). # questions * does anyone knows why we are using `Arc<ArrayData>` in all arrays? * Do you envision an issue with removing the `Arc`? * Would someone be so kind and run the benches independently, just to be sure. # Benchmarks ```bash # modify cargo.toml by adding `bench = false` to the section [lib] git checkout master cargo bench --benches -- --save-baseline `git branch --show-current`-`git rev-parse --short HEAD` git checkout arcless cargo bench --benches -- --save-baseline `git branch --show-current`-`git rev-parse --short HEAD` ``` ``` Jorges-MacBook-Pro-2:arrow jorgecarleitao$ critcmp master-437c8c944 arcless-3dbcaca49 -t 10 group arcless-3dbcaca49 master-437c8c944 ----- ----------------- ---------------- add 512 1.00 435.3±8.19ns ? B/sec 1.30 565.7±81.95ns ? B/sec add_nulls_512 1.00 451.9±15.41ns ? B/sec 1.29 581.9±98.83ns ? B/sec and 1.00 1516.9±39.34ns ? B/sec 1.21 1842.4±190.71ns ? B/sec array_from_vec 128 1.00 932.0±35.13ns ? B/sec 1.51 1411.2±475.00ns ? B/sec array_from_vec 256 1.00 1125.6±24.63ns ? B/sec 1.23 1382.1±201.90ns ? B/sec array_from_vec 512 1.00 1519.9±52.92ns ? B/sec 1.24 1877.9±368.40ns ? B/sec array_slice 128 1.00 293.3±4.85ns ? B/sec 1.61 471.3±94.42ns ? B/sec array_slice 2048 1.00 319.7±8.25ns ? B/sec 1.50 478.2±293.70ns ? B/sec array_slice 512 1.00 293.8±5.91ns ? B/sec 1.69 496.1±145.06ns ? B/sec array_string_from_vec 128 1.00 3.2±0.10µs ? B/sec 1.35 4.3±1.68µs ? B/sec array_string_from_vec 256 1.00 4.1±0.13µs ? B/sec 1.18 4.9±0.94µs ? B/sec array_string_from_vec 512 1.00 5.9±0.11µs ? B/sec 1.26 7.4±5.14µs ? B/sec bench_bool/bench_bool 1.00 2.0±0.03ms 245.8 MB/sec 1.12 2.3±0.30ms 219.7 MB/sec buffer_bit_ops and 1.00 577.0±11.19ns ? B/sec 1.14 658.5±185.85ns ? B/sec cast date32 to date64 512 1.00 6.8±0.21µs ? B/sec 1.18 8.0±1.25µs ? B/sec cast date64 to date32 512 1.00 6.8±0.42µs ? B/sec 1.27 8.6±2.22µs ? B/sec cast f32 to string 512 1.00 52.2±1.37µs ? B/sec 1.17 61.0±9.24µs ? B/sec cast float32 to int32 512 1.00 2.9±0.07µs ? B/sec 1.20 3.4±0.52µs ? B/sec cast float64 to float32 512 1.00 3.0±0.07µs ? B/sec 1.18 3.6±0.47µs ? B/sec cast float64 to uint64 512 1.00 3.4±0.19µs ? B/sec 1.45 5.0±0.72µs ? B/sec cast int32 to float32 512 1.00 2.9±0.09µs ? B/sec 1.11 3.3±0.52µs ? B/sec cast int32 to float64 512 1.00 2.7±0.12µs ? B/sec 1.27 3.4±0.77µs ? B/sec cast int32 to int64 512 1.00 2.9±0.06µs ? B/sec 1.26 3.6±0.67µs ? B/sec cast int32 to uint32 512 1.00 2.7±0.08µs ? B/sec 1.23 3.3±0.32µs ? B/sec cast int64 to int32 512 1.00 2.7±0.05µs ? B/sec 1.23 3.3±0.89µs ? B/sec cast time32s to time32ms 512 1.00 1487.8±52.94ns ? B/sec 1.11 1648.3±145.69ns ? B/sec cast time32s to time64us 512 1.00 4.8±0.12µs ? B/sec 1.26 6.0±0.82µs ? B/sec cast time64ns to time32s 512 1.00 9.9±0.24µs ? B/sec 1.24 12.3±1.87µs ? B/sec cast timestamp_ms to i64 512 1.00 286.7±13.15ns ? B/sec 1.66 474.9±72.34ns ? B/sec cast timestamp_ms to timestamp_ns 512 1.00 1961.6±38.52ns ? B/sec 1.41 2.8±4.01µs ? B/sec cast timestamp_ns to timestamp_s 512 1.00 26.2±0.31ns ? B/sec 1.12 29.4±3.74ns ? B/sec cast utf8 to date32 512 1.00 43.9±1.05µs ? B/sec 1.15 50.7±10.33µs ? B/sec cast utf8 to f32 1.00 30.9±0.81µs ? B/sec 1.21 37.5±5.32µs ? B/sec concat str 1024 1.00 9.9±0.54µs ? B/sec 1.34 13.2±5.26µs ? B/sec divide 512 1.00 1411.3±40.15ns ? B/sec 1.13 1599.1±200.00ns ? B/sec divide_nulls_512 1.00 1419.2±28.43ns ? B/sec 1.12 1587.9±142.46ns ? B/sec eq Float32 1.00 103.3±3.33µs ? B/sec 1.25 129.4±19.72µs ? B/sec equal_512 1.00 19.2±1.00ns ? B/sec 2.58 49.5±12.56ns ? B/sec equal_bool_512 1.00 19.1±1.39ns ? B/sec 2.17 41.5±2.29ns ? B/sec equal_bool_513 1.00 21.1±0.70ns ? B/sec 2.15 45.4±3.84ns ? B/sec equal_nulls_512 1.00 2.4±0.11µs ? B/sec 1.11 2.7±0.37µs ? B/sec equal_string_512 1.00 84.5±5.08ns ? B/sec 1.32 111.9±5.30ns ? B/sec equal_string_nulls_512 1.00 3.6±0.48µs ? B/sec 1.11 4.0±1.29µs ? B/sec filter context f32 high selectivity 1.00 308.9±10.81µs ? B/sec 1.11 343.2±57.50µs ? B/sec filter context f32 low selectivity 1.00 2.7±0.08µs ? B/sec 1.10 3.0±0.43µs ? B/sec filter context string high selectivity 1.00 1091.3±26.04µs ? B/sec 1.13 1238.6±174.82µs ? B/sec filter context u8 1.00 231.6±4.05µs ? B/sec 1.21 281.0±112.36µs ? B/sec filter context u8 w NULLs 1.00 500.1±20.38µs ? B/sec 1.17 586.5±155.02µs ? B/sec filter context u8 w NULLs high selectivity 1.00 295.2±5.91µs ? B/sec 1.17 344.2±63.42µs ? B/sec filter context u8 w NULLs low selectivity 1.00 2.9±0.51µs ? B/sec 1.21 3.5±5.02µs ? B/sec filter f32 1.00 797.4±33.25µs ? B/sec 1.22 971.4±135.07µs ? B/sec filter u8 low selectivity 1.00 7.9±0.73µs ? B/sec 1.12 8.9±1.47µs ? B/sec from_slice prepared 1.15 961.1±24.81µs ? B/sec 1.00 834.2±59.41µs ? B/sec gt Float32 1.00 66.2±14.03µs ? B/sec 1.28 84.6±9.31µs ? B/sec gt scalar Float32 1.49 60.9±8.59µs ? B/sec 1.00 40.8±4.06µs ? B/sec gt_eq Float32 1.00 117.3±31.66µs ? B/sec 1.10 129.6±25.35µs ? B/sec json_list_primitive_to_record_batch 1.00 63.3±2.09µs ? B/sec 1.16 73.4±11.01µs ? B/sec json_primitive_to_record_batch 1.00 25.1±0.82µs ? B/sec 1.11 27.7±4.88µs ? B/sec length 1.00 2.9±0.10µs ? B/sec 1.30 3.7±0.77µs ? B/sec like_utf8 scalar ends with 1.00 246.5±13.62µs ? B/sec 1.19 294.5±45.77µs ? B/sec like_utf8 scalar equals 1.17 98.3±7.08µs ? B/sec 1.00 83.7±10.13µs ? B/sec limit 512, 512 1.00 291.9±5.45ns ? B/sec 1.55 451.1±130.11ns ? B/sec lt Float32 1.00 70.0±19.63µs ? B/sec 1.26 88.3±12.27µs ? B/sec lt scalar Float32 1.00 62.6±3.01µs ? B/sec 1.39 87.3±25.97µs ? B/sec lt_eq Float32 1.00 104.8±8.32µs ? B/sec 1.43 149.9±56.97µs ? B/sec lt_eq scalar Float32 1.00 82.1±3.59µs ? B/sec 1.11 91.3±12.27µs ? B/sec max nulls 512 1.00 1378.4±51.28ns ? B/sec 1.32 1820.6±260.46ns ? B/sec min 512 1.00 1510.6±13.17ns ? B/sec 1.28 1938.2±515.72ns ? B/sec min nulls 512 1.00 1380.2±51.78ns ? B/sec 1.44 1980.7±259.55ns ? B/sec min nulls string 512 1.00 7.1±0.12µs ? B/sec 1.25 8.9±1.66µs ? B/sec multiply 512 1.00 461.5±42.95ns ? B/sec 1.30 601.0±102.46ns ? B/sec mutable 1.00 475.9±15.21µs ? B/sec 1.19 566.7±77.31µs ? B/sec mutable prepared 1.00 530.8±21.36µs ? B/sec 1.17 621.0±60.61µs ? B/sec mutable str 1024 1.00 1493.7±56.88µs ? B/sec 1.19 1776.6±329.88µs ? B/sec mutable str nulls 1024 1.00 4.6±0.30ms ? B/sec 1.13 5.2±0.51ms ? B/sec neq Float32 1.00 65.8±1.70µs ? B/sec 1.39 91.5±20.30µs ? B/sec neq scalar Float32 1.47 97.6±50.68µs ? B/sec 1.00 66.4±6.21µs ? B/sec nlike_utf8 scalar contains 1.00 2.4±0.03ms ? B/sec 1.16 2.8±0.41ms ? B/sec nlike_utf8 scalar equals 1.00 218.8±19.42µs ? B/sec 1.14 249.2±24.71µs ? B/sec not 1.00 955.5±27.28ns ? B/sec 1.15 1102.4±153.08ns ? B/sec or 1.00 1497.9±37.68ns ? B/sec 1.15 1726.6±88.42ns ? B/sec sort 2^10 1.00 153.9±11.62µs ? B/sec 1.14 175.6±25.84µs ? B/sec sort 2^12 1.00 748.9±51.51µs ? B/sec 1.13 849.8±109.51µs ? B/sec struct_array_from_vec 256 1.00 7.6±0.23µs ? B/sec 1.12 8.5±2.51µs ? B/sec struct_array_from_vec 512 1.00 10.0±0.73µs ? B/sec 1.24 12.4±3.51µs ? B/sec subtract 512 1.00 434.8±9.25ns ? B/sec 1.32 574.3±37.34ns ? B/sec sum 512 1.00 524.8±12.74ns ? B/sec 1.19 626.2±156.66ns ? B/sec take bool 1024 1.00 3.6±0.27µs ? B/sec 1.45 5.2±3.11µs ? B/sec take bool 512 1.00 2.1±0.10µs ? B/sec 1.23 2.6±0.32µs ? B/sec take bool nulls 1024 1.00 5.1±2.09µs ? B/sec 1.60 8.1±2.08µs ? B/sec take bool nulls 512 1.00 2.1±0.14µs ? B/sec 1.82 3.9±0.77µs ? B/sec take i32 1024 1.00 1583.0±111.03ns ? B/sec 1.49 2.4±0.58µs ? B/sec take i32 512 1.00 1058.3±88.60ns ? B/sec 1.29 1364.4±97.02ns ? B/sec take i32 nulls 1024 1.00 1527.5±54.16ns ? B/sec 1.82 2.8±1.00µs ? B/sec take i32 nulls 512 1.00 1086.4±124.91ns ? B/sec 2.34 2.5±1.64µs ? B/sec take str 1024 1.00 5.9±0.51µs ? B/sec 1.20 7.1±1.30µs ? B/sec take str 512 1.00 3.8±0.36µs ? B/sec 1.21 4.6±0.96µs ? B/sec take str null indices 1024 1.00 5.6±0.16µs ? B/sec 1.27 7.2±1.63µs ? B/sec take str null indices 512 1.00 3.7±0.31µs ? B/sec 1.23 4.6±1.32µs ? B/sec take str null values 1024 1.00 5.7±0.14µs ? B/sec 1.25 7.1±1.02µs ? B/sec take str null values null indices 1024 1.00 12.8±0.39µs ? B/sec 1.14 14.6±1.85µs ? B/sec ``` Closes #9329 from jorgecarleitao/arcless Lead-authored-by: Jorge C. Leitao <[email protected]> Co-authored-by: Neville Dipale <[email protected]> Signed-off-by: Neville Dipale <[email protected]>

Based on #9234, this PR improves the situation described in https://issues.apache.org/jira/browse/ARROW-11300.
The current situation is that we call
takeon arrays, which is fine, but causes a lot of smallArraysto be created / allocated. when we have only a small number of rows in each group.This improves the results on the group by queries on db-benchmark:
PR:
#9234 (different results from that PR description as this has now partitioning enabled and a custom allocator)
The PR changes the algorithm to:
takethe arrays based on indices in one go (so it only requires one bigger allocation for each array)slicebased on the offsets to take values from the arrays and pass it to the accumulators.