Skip to content

Commit f870dcd

Browse files
kazantsev-maksimKazantsev Maksim
andauthored
fix: Support Dictionary[Int32, Binary] for bitmap count spark function (#18273)
## Which issue does this PR close? Closes #18058 ## Rationale for this change When adding the bitmap_count function to Comet, we get the following error - org.apache.comet.CometNativeException: Error from DataFusion: bitmap_count expects Binary/BinaryView/FixedSizeBinary/LargeBinary as argument, got Dictionary(Int32, Binary). ## Are these changes tested? Added new UT --------- Co-authored-by: Kazantsev Maksim <[email protected]>
1 parent 0a8f154 commit f870dcd

File tree

2 files changed

+91
-6
lines changed

2 files changed

+91
-6
lines changed

datafusion/spark/src/function/bitmap/bitmap_count.rs

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ use std::any::Any;
1919
use std::sync::Arc;
2020

2121
use arrow::array::{
22-
Array, ArrayRef, BinaryArray, BinaryViewArray, FixedSizeBinaryArray, Int64Array,
23-
LargeBinaryArray,
22+
as_dictionary_array, Array, ArrayRef, BinaryArray, BinaryViewArray,
23+
FixedSizeBinaryArray, Int64Array, LargeBinaryArray,
2424
};
25-
use arrow::datatypes::DataType;
2625
use arrow::datatypes::DataType::{
27-
Binary, BinaryView, FixedSizeBinary, Int64, LargeBinary,
26+
Binary, BinaryView, Dictionary, FixedSizeBinary, LargeBinary,
2827
};
28+
use arrow::datatypes::{DataType, Int16Type, Int32Type, Int64Type, Int8Type};
2929
use datafusion_common::utils::take_function_args;
3030
use datafusion_common::{internal_err, Result};
3131
use datafusion_expr::{
@@ -71,7 +71,7 @@ impl ScalarUDFImpl for BitmapCount {
7171
}
7272

7373
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
74-
Ok(Int64)
74+
Ok(DataType::Int64)
7575
}
7676

7777
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
@@ -90,6 +90,17 @@ macro_rules! downcast_and_count_ones {
9090
}};
9191
}
9292

93+
macro_rules! downcast_dict_and_count_ones {
94+
($input_dict:expr, $key_array_type:ident) => {{
95+
let dict_array = as_dictionary_array::<$key_array_type>($input_dict);
96+
let array = dict_array.downcast_dict::<BinaryArray>().unwrap();
97+
Ok(array
98+
.into_iter()
99+
.map(binary_count_ones)
100+
.collect::<Int64Array>())
101+
}};
102+
}
103+
93104
pub fn bitmap_count_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
94105
let [input_array] = take_function_args("bitmap_count", arg)?;
95106

@@ -100,6 +111,17 @@ pub fn bitmap_count_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
100111
FixedSizeBinary(_size) => {
101112
downcast_and_count_ones!(input_array, FixedSizeBinaryArray)
102113
}
114+
Dictionary(k, v) if v.as_ref() == &Binary => match k.as_ref() {
115+
DataType::Int8 => downcast_dict_and_count_ones!(input_array, Int8Type),
116+
DataType::Int16 => downcast_dict_and_count_ones!(input_array, Int16Type),
117+
DataType::Int32 => downcast_dict_and_count_ones!(input_array, Int32Type),
118+
DataType::Int64 => downcast_dict_and_count_ones!(input_array, Int64Type),
119+
data_type => {
120+
internal_err!(
121+
"bitmap_count does not support Dictionary({data_type}, Binary)"
122+
)
123+
}
124+
},
103125
data_type => {
104126
internal_err!("bitmap_count does not support {data_type}")
105127
}
@@ -114,8 +136,12 @@ mod tests {
114136
use crate::function::utils::test::test_scalar_function;
115137
use arrow::array::{Array, Int64Array};
116138
use arrow::datatypes::DataType::Int64;
139+
use arrow::datatypes::{DataType, Field};
140+
use datafusion_common::config::ConfigOptions;
117141
use datafusion_common::{Result, ScalarValue};
118-
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
142+
use datafusion_expr::ColumnarValue::Scalar;
143+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
144+
use std::sync::Arc;
119145

120146
macro_rules! test_bitmap_count_binary_invoke {
121147
($INPUT:expr, $EXPECTED:expr) => {
@@ -171,4 +197,31 @@ mod tests {
171197
);
172198
Ok(())
173199
}
200+
201+
#[test]
202+
fn test_dictionary_encoded_bitmap_count_invoke() -> Result<()> {
203+
let dict = Scalar(ScalarValue::Dictionary(
204+
Box::new(DataType::Int32),
205+
Box::new(ScalarValue::Binary(Some(vec![0xFFu8, 0xFFu8]))),
206+
));
207+
208+
let arg_fields = vec![Field::new(
209+
"a",
210+
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
211+
true,
212+
)
213+
.into()];
214+
let args = ScalarFunctionArgs {
215+
args: vec![dict.clone()],
216+
arg_fields,
217+
number_rows: 1,
218+
return_field: Field::new("f", Int64, true).into(),
219+
config_options: Arc::new(ConfigOptions::default()),
220+
};
221+
let udf = BitmapCount::new();
222+
let actual = udf.invoke_with_args(args)?;
223+
let expect = Scalar(ScalarValue::Int64(Some(16)));
224+
assert_eq!(*actual.into_array(1)?, *expect.into_array(1)?);
225+
Ok(())
226+
}
174227
}

datafusion/sqllogictest/test_files/spark/bitmap/bitmap_count.slt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,35 @@ SELECT bitmap_count(arrow_cast(a, 'FixedSizeBinary(2)')) FROM (VALUES (X'1010'),
5959
5
6060
16
6161
NULL
62+
63+
query I
64+
SELECT bitmap_count(arrow_cast(a, 'Dictionary(Int32, Binary)')) FROM (VALUES (X'1010'), (X'0AB0'), (X'FFFF'), (NULL)) AS t(a);
65+
----
66+
2
67+
5
68+
16
69+
NULL
70+
71+
query I
72+
SELECT bitmap_count(arrow_cast(a, 'Dictionary(Int8, Binary)')) FROM (VALUES (X'1010'), (X'0AB0'), (X'FFFF'), (NULL)) AS t(a);
73+
----
74+
2
75+
5
76+
16
77+
NULL
78+
79+
query I
80+
SELECT bitmap_count(arrow_cast(a, 'Dictionary(Int16, Binary)')) FROM (VALUES (X'1010'), (X'0AB0'), (X'FFFF'), (NULL)) AS t(a);
81+
----
82+
2
83+
5
84+
16
85+
NULL
86+
87+
query I
88+
SELECT bitmap_count(arrow_cast(a, 'Dictionary(Int64, Binary)')) FROM (VALUES (X'1010'), (X'0AB0'), (X'FFFF'), (NULL)) AS t(a);
89+
----
90+
2
91+
5
92+
16
93+
NULL

0 commit comments

Comments
 (0)