From 40e37c37ec3eae1d1312164408bf6c7c0b1a660a Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 11 Jul 2025 23:23:00 +0800 Subject: [PATCH 1/2] Poc code --- .../group_values/multi_group_by/bytes_view.rs | 75 +++++++++++-------- 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index 63018874a1e4..3d0c7c486be3 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -94,9 +94,7 @@ impl ByteViewGroupValueBuilder { self.do_equal_to_inner(lhs_row, array, rhs_row) } - fn append_val_inner(&mut self, array: &ArrayRef, row: usize) { - let arr = array.as_byte_view::(); - + fn append_val_inner(&mut self, arr: &GenericByteViewArray, row: usize) { // Null row case, set and return if arr.is_null(row) { self.nulls.append(true); @@ -148,15 +146,26 @@ impl ByteViewGroupValueBuilder { match all_null_or_non_null { None => { - for &row in rows { - self.append_val_inner(array, row); + if arr.data_buffers().is_empty() { + for &row in rows { + self.nulls.append(arr.is_null(row)); + } + self.views.extend(rows.iter().map(|&row| arr.views()[row])); + } else { + for &row in rows { + self.append_val_inner(arr, row); + } } } Some(true) => { self.nulls.append_n(rows.len(), false); - for &row in rows { - self.do_append_val_inner(arr, row); + if arr.data_buffers().is_empty() { + self.views.extend(rows.iter().map(|&row| arr.views()[row])); + } else { + for &row in rows { + self.do_append_val_inner(arr, row); + } } } @@ -220,12 +229,17 @@ impl ByteViewGroupValueBuilder { if let Some(result) = nulls_equal_to(exist_null, input_null) { return result; } - // Otherwise, we need to check their values let exist_view = self.views[lhs_row]; - let exist_view_len = exist_view as u32; - let input_view = array.views()[rhs_row]; + + // Fast path when data buffers are empty + if self.completed.is_empty() && self.in_progress.is_empty() && array.data_buffers().is_empty() { + // For eq case, we can directly compare the inlined bytes + return exist_view == input_view; + } + + let exist_view_len = exist_view as u32; let input_view_len = input_view as u32; // The check logic @@ -237,38 +251,32 @@ impl ByteViewGroupValueBuilder { return false; } - if exist_view_len <= 12 { - let exist_inline = unsafe { - GenericByteViewArray::::inline_value( - &exist_view, - exist_view_len as usize, - ) - }; - let input_inline = unsafe { - GenericByteViewArray::::inline_value( - &input_view, - input_view_len as usize, - ) - }; - exist_inline == input_inline - } else { - let exist_prefix = - unsafe { GenericByteViewArray::::inline_value(&exist_view, 4) }; - let input_prefix = - unsafe { GenericByteViewArray::::inline_value(&input_view, 4) }; + // Fast path for empty views + if exist_view_len == 0 && input_view_len == 0 { + return true; + } - if exist_prefix != input_prefix { - return false; + if exist_view_len <= 12 && input_view_len <= 12 { + // When all inlined, we can directly compare the views + exist_view == input_view + } else { + let byte_view = ByteView::from(exist_view); + // Fast path for 4 bytes prefix equality, change &[u8] to u32 + let pref_existed = byte_view.prefix.swap_bytes(); + let pref_input = ByteView::from(input_view).prefix.swap_bytes(); + if pref_existed != pref_input { + return false; } + // If the prefix is equal, we can check the value in buffer let exist_full = { - let byte_view = ByteView::from(exist_view); self.value( byte_view.buffer_index as usize, byte_view.offset as usize, byte_view.length as usize, ) }; + let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() }; exist_full == input_full } @@ -488,7 +496,8 @@ impl GroupColumn for ByteViewGroupValueBuilder { } fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> { - self.append_val_inner(array, row); + let arr = array.as_byte_view::(); + self.append_val_inner(arr, row); Ok(()) } From a148a7bfaf5e4cbc65e115c9dc10904bdf1dc021 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 11 Jul 2025 23:27:05 +0800 Subject: [PATCH 2/2] fmt --- .../aggregates/group_values/multi_group_by/bytes_view.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index 3d0c7c486be3..7c68226ab82d 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -234,7 +234,10 @@ impl ByteViewGroupValueBuilder { let input_view = array.views()[rhs_row]; // Fast path when data buffers are empty - if self.completed.is_empty() && self.in_progress.is_empty() && array.data_buffers().is_empty() { + if self.completed.is_empty() + && self.in_progress.is_empty() + && array.data_buffers().is_empty() + { // For eq case, we can directly compare the inlined bytes return exist_view == input_view; } @@ -256,7 +259,7 @@ impl ByteViewGroupValueBuilder { return true; } - if exist_view_len <= 12 && input_view_len <= 12 { + if exist_view_len <= 12 && input_view_len <= 12 { // When all inlined, we can directly compare the views exist_view == input_view } else { @@ -265,7 +268,7 @@ impl ByteViewGroupValueBuilder { let pref_existed = byte_view.prefix.swap_bytes(); let pref_input = ByteView::from(input_view).prefix.swap_bytes(); if pref_existed != pref_input { - return false; + return false; } // If the prefix is equal, we can check the value in buffer