Skip to content

Commit ea83c26

Browse files
pepijnvealamb
andauthored
#17982 Make nvl a thin wrapper for coalesce (#17991)
## Which issue does this PR close? - Closes #17982 ## Rationale for this change By making `NVLFunc` a wrapper for `CoalesceFunc` with a more restrictive signature the implementation automatically benefits from any optimisation work related to `coalesce`. ## What changes are included in this PR? - Make `NVLFunc` a thin wrapper of `CoalesceFunc`. This seemed like the simplest way to reuse the coalesce logic, but keep the stricter signature of `nvl`. - Add `ScalarUDF::conditional_arguments` as a more precise complement to `ScalarUDF::short_circuits`. By letting each function expose which arguments are eager and which are lazy, we provide more precise information to the optimizer which may enable better optimisation. ## Are these changes tested? Assumed to be covered by sql logic tests. Unit tests for the custom implementation were removed since those are no longer relevant. ## Are there any user-facing changes? The rewriting of `nvl` to `case when ... then ... else ... end` is visible in the physical query plan. --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 4e03c92 commit ea83c26

File tree

7 files changed

+141
-210
lines changed

7 files changed

+141
-210
lines changed

datafusion/expr/src/udf.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,21 @@ impl ScalarUDF {
252252
Ok(result)
253253
}
254254

255-
/// Get the circuits of inner implementation
255+
/// Determines which of the arguments passed to this function are evaluated eagerly
256+
/// and which may be evaluated lazily.
257+
///
258+
/// See [ScalarUDFImpl::conditional_arguments] for more information.
259+
pub fn conditional_arguments<'a>(
260+
&self,
261+
args: &'a [Expr],
262+
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
263+
self.inner.conditional_arguments(args)
264+
}
265+
266+
/// Returns true if some of this `exprs` subexpressions may not be evaluated
267+
/// and thus any side effects (like divide by zero) may not be encountered.
268+
///
269+
/// See [ScalarUDFImpl::short_circuits] for more information.
256270
pub fn short_circuits(&self) -> bool {
257271
self.inner.short_circuits()
258272
}
@@ -656,10 +670,42 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
656670
///
657671
/// Setting this to true prevents certain optimizations such as common
658672
/// subexpression elimination
673+
///
674+
/// When overriding this function to return `true`, [ScalarUDFImpl::conditional_arguments] can also be
675+
/// overridden to report more accurately which arguments are eagerly evaluated and which ones
676+
/// lazily.
659677
fn short_circuits(&self) -> bool {
660678
false
661679
}
662680

681+
/// Determines which of the arguments passed to this function are evaluated eagerly
682+
/// and which may be evaluated lazily.
683+
///
684+
/// If this function returns `None`, all arguments are eagerly evaluated.
685+
/// Returning `None` is a micro optimization that saves a needless `Vec`
686+
/// allocation.
687+
///
688+
/// If the function returns `Some`, returns (`eager`, `lazy`) where `eager`
689+
/// are the arguments that are always evaluated, and `lazy` are the
690+
/// arguments that may be evaluated lazily (i.e. may not be evaluated at all
691+
/// in some cases).
692+
///
693+
/// Implementations must ensure that the two returned `Vec`s are disjunct,
694+
/// and that each argument from `args` is present in one the two `Vec`s.
695+
///
696+
/// When overriding this function, [ScalarUDFImpl::short_circuits] must
697+
/// be overridden to return `true`.
698+
fn conditional_arguments<'a>(
699+
&self,
700+
args: &'a [Expr],
701+
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
702+
if self.short_circuits() {
703+
Some((vec![], args.iter().collect()))
704+
} else {
705+
None
706+
}
707+
}
708+
663709
/// Computes the output [`Interval`] for a [`ScalarUDFImpl`], given the input
664710
/// intervals.
665711
///
@@ -845,6 +891,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
845891
self.inner.simplify(args, info)
846892
}
847893

894+
fn conditional_arguments<'a>(
895+
&self,
896+
args: &'a [Expr],
897+
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
898+
self.inner.conditional_arguments(args)
899+
}
900+
848901
fn short_circuits(&self) -> bool {
849902
self.inner.short_circuits()
850903
}

datafusion/functions/src/core/coalesce.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use std::any::Any;
4747
)]
4848
#[derive(Debug, PartialEq, Eq, Hash)]
4949
pub struct CoalesceFunc {
50-
signature: Signature,
50+
pub(super) signature: Signature,
5151
}
5252

5353
impl Default for CoalesceFunc {
@@ -126,6 +126,15 @@ impl ScalarUDFImpl for CoalesceFunc {
126126
internal_err!("coalesce should have been simplified to case")
127127
}
128128

129+
fn conditional_arguments<'a>(
130+
&self,
131+
args: &'a [Expr],
132+
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
133+
let eager = vec![&args[0]];
134+
let lazy = args[1..].iter().collect();
135+
Some((eager, lazy))
136+
}
137+
129138
fn short_circuits(&self) -> bool {
130139
true
131140
}

datafusion/functions/src/core/nvl.rs

Lines changed: 38 additions & 202 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,19 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::array::Array;
19-
use arrow::compute::is_not_null;
20-
use arrow::compute::kernels::zip::zip;
21-
use arrow::datatypes::DataType;
22-
use datafusion_common::{utils::take_function_args, Result};
18+
use crate::core::coalesce::CoalesceFunc;
19+
use arrow::datatypes::{DataType, FieldRef};
20+
use datafusion_common::Result;
21+
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
2322
use datafusion_expr::{
24-
ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
25-
Volatility,
23+
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
24+
ScalarUDFImpl, Signature, Volatility,
2625
};
2726
use datafusion_macros::user_doc;
28-
use std::sync::Arc;
2927

3028
#[user_doc(
3129
doc_section(label = "Conditional Functions"),
32-
description = "Returns _expression2_ if _expression1_ is NULL otherwise it returns _expression1_.",
30+
description = "Returns _expression2_ if _expression1_ is NULL otherwise it returns _expression1_ and _expression2_ is not evaluated. This function can be used to substitute a default value for NULL values.",
3331
syntax_example = "nvl(expression1, expression2)",
3432
sql_example = r#"```sql
3533
> select nvl(null, 'a');
@@ -57,7 +55,7 @@ use std::sync::Arc;
5755
)]
5856
#[derive(Debug, PartialEq, Eq, Hash)]
5957
pub struct NVLFunc {
60-
signature: Signature,
58+
coalesce: CoalesceFunc,
6159
aliases: Vec<String>,
6260
}
6361

@@ -90,11 +88,13 @@ impl Default for NVLFunc {
9088
impl NVLFunc {
9189
pub fn new() -> Self {
9290
Self {
93-
signature: Signature::uniform(
94-
2,
95-
SUPPORTED_NVL_TYPES.to_vec(),
96-
Volatility::Immutable,
97-
),
91+
coalesce: CoalesceFunc {
92+
signature: Signature::uniform(
93+
2,
94+
SUPPORTED_NVL_TYPES.to_vec(),
95+
Volatility::Immutable,
96+
),
97+
},
9898
aliases: vec![String::from("ifnull")],
9999
}
100100
}
@@ -110,209 +110,45 @@ impl ScalarUDFImpl for NVLFunc {
110110
}
111111

112112
fn signature(&self) -> &Signature {
113-
&self.signature
113+
&self.coalesce.signature
114114
}
115115

116116
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
117-
Ok(arg_types[0].clone())
117+
self.coalesce.return_type(arg_types)
118118
}
119119

120-
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
121-
nvl_func(&args.args)
122-
}
123-
124-
fn aliases(&self) -> &[String] {
125-
&self.aliases
126-
}
127-
128-
fn documentation(&self) -> Option<&Documentation> {
129-
self.doc()
120+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
121+
self.coalesce.return_field_from_args(args)
130122
}
131-
}
132-
133-
fn nvl_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
134-
let [lhs, rhs] = take_function_args("nvl/ifnull", args)?;
135-
let (lhs_array, rhs_array) = match (lhs, rhs) {
136-
(ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => {
137-
(Arc::clone(lhs), rhs.to_array_of_size(lhs.len())?)
138-
}
139-
(ColumnarValue::Array(lhs), ColumnarValue::Array(rhs)) => {
140-
(Arc::clone(lhs), Arc::clone(rhs))
141-
}
142-
(ColumnarValue::Scalar(lhs), ColumnarValue::Array(rhs)) => {
143-
(lhs.to_array_of_size(rhs.len())?, Arc::clone(rhs))
144-
}
145-
(ColumnarValue::Scalar(lhs), ColumnarValue::Scalar(rhs)) => {
146-
let mut current_value = lhs;
147-
if lhs.is_null() {
148-
current_value = rhs;
149-
}
150-
return Ok(ColumnarValue::Scalar(current_value.clone()));
151-
}
152-
};
153-
let to_apply = is_not_null(&lhs_array)?;
154-
let value = zip(&to_apply, &lhs_array, &rhs_array)?;
155-
Ok(ColumnarValue::Array(value))
156-
}
157-
158-
#[cfg(test)]
159-
mod tests {
160-
use std::sync::Arc;
161-
162-
use arrow::array::*;
163123

164-
use super::*;
165-
use datafusion_common::ScalarValue;
166-
167-
#[test]
168-
fn nvl_int32() -> Result<()> {
169-
let a = Int32Array::from(vec![
170-
Some(1),
171-
Some(2),
172-
None,
173-
None,
174-
Some(3),
175-
None,
176-
None,
177-
Some(4),
178-
Some(5),
179-
]);
180-
let a = ColumnarValue::Array(Arc::new(a));
181-
182-
let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(6i32)));
183-
184-
let result = nvl_func(&[a, lit_array])?;
185-
let result = result.into_array(0).expect("Failed to convert to array");
186-
187-
let expected = Arc::new(Int32Array::from(vec![
188-
Some(1),
189-
Some(2),
190-
Some(6),
191-
Some(6),
192-
Some(3),
193-
Some(6),
194-
Some(6),
195-
Some(4),
196-
Some(5),
197-
])) as ArrayRef;
198-
assert_eq!(expected.as_ref(), result.as_ref());
199-
Ok(())
124+
fn simplify(
125+
&self,
126+
args: Vec<Expr>,
127+
info: &dyn SimplifyInfo,
128+
) -> Result<ExprSimplifyResult> {
129+
self.coalesce.simplify(args, info)
200130
}
201131

202-
#[test]
203-
// Ensure that arrays with no nulls can also invoke nvl() correctly
204-
fn nvl_int32_non_nulls() -> Result<()> {
205-
let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]);
206-
let a = ColumnarValue::Array(Arc::new(a));
207-
208-
let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(20i32)));
209-
210-
let result = nvl_func(&[a, lit_array])?;
211-
let result = result.into_array(0).expect("Failed to convert to array");
212-
213-
let expected = Arc::new(Int32Array::from(vec![
214-
Some(1),
215-
Some(3),
216-
Some(10),
217-
Some(7),
218-
Some(8),
219-
Some(1),
220-
Some(2),
221-
Some(4),
222-
Some(5),
223-
])) as ArrayRef;
224-
assert_eq!(expected.as_ref(), result.as_ref());
225-
Ok(())
132+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
133+
self.coalesce.invoke_with_args(args)
226134
}
227135

228-
#[test]
229-
fn nvl_boolean() -> Result<()> {
230-
let a = BooleanArray::from(vec![Some(true), Some(false), None]);
231-
let a = ColumnarValue::Array(Arc::new(a));
232-
233-
let lit_array = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));
234-
235-
let result = nvl_func(&[a, lit_array])?;
236-
let result = result.into_array(0).expect("Failed to convert to array");
237-
238-
let expected = Arc::new(BooleanArray::from(vec![
239-
Some(true),
240-
Some(false),
241-
Some(false),
242-
])) as ArrayRef;
243-
244-
assert_eq!(expected.as_ref(), result.as_ref());
245-
Ok(())
136+
fn conditional_arguments<'a>(
137+
&self,
138+
args: &'a [Expr],
139+
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
140+
self.coalesce.conditional_arguments(args)
246141
}
247142

248-
#[test]
249-
fn nvl_string() -> Result<()> {
250-
let a = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
251-
let a = ColumnarValue::Array(Arc::new(a));
252-
253-
let lit_array = ColumnarValue::Scalar(ScalarValue::from("bax"));
254-
255-
let result = nvl_func(&[a, lit_array])?;
256-
let result = result.into_array(0).expect("Failed to convert to array");
257-
258-
let expected = Arc::new(StringArray::from(vec![
259-
Some("foo"),
260-
Some("bar"),
261-
Some("bax"),
262-
Some("baz"),
263-
])) as ArrayRef;
264-
265-
assert_eq!(expected.as_ref(), result.as_ref());
266-
Ok(())
143+
fn short_circuits(&self) -> bool {
144+
self.coalesce.short_circuits()
267145
}
268146

269-
#[test]
270-
fn nvl_literal_first() -> Result<()> {
271-
let a = Int32Array::from(vec![Some(1), Some(2), None, None, Some(3), Some(4)]);
272-
let a = ColumnarValue::Array(Arc::new(a));
273-
274-
let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
275-
276-
let result = nvl_func(&[lit_array, a])?;
277-
let result = result.into_array(0).expect("Failed to convert to array");
278-
279-
let expected = Arc::new(Int32Array::from(vec![
280-
Some(2),
281-
Some(2),
282-
Some(2),
283-
Some(2),
284-
Some(2),
285-
Some(2),
286-
])) as ArrayRef;
287-
assert_eq!(expected.as_ref(), result.as_ref());
288-
Ok(())
147+
fn aliases(&self) -> &[String] {
148+
&self.aliases
289149
}
290150

291-
#[test]
292-
fn nvl_scalar() -> Result<()> {
293-
let a_null = ColumnarValue::Scalar(ScalarValue::Int32(None));
294-
let b_null = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
295-
296-
let result_null = nvl_func(&[a_null, b_null])?;
297-
let result_null = result_null
298-
.into_array(1)
299-
.expect("Failed to convert to array");
300-
301-
let expected_null = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef;
302-
303-
assert_eq!(expected_null.as_ref(), result_null.as_ref());
304-
305-
let a_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
306-
let b_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));
307-
308-
let result_nnull = nvl_func(&[a_nnull, b_nnull])?;
309-
let result_nnull = result_nnull
310-
.into_array(1)
311-
.expect("Failed to convert to array");
312-
313-
let expected_nnull = Arc::new(Int32Array::from(vec![Some(2i32)])) as ArrayRef;
314-
assert_eq!(expected_nnull.as_ref(), result_nnull.as_ref());
315-
316-
Ok(())
151+
fn documentation(&self) -> Option<&Documentation> {
152+
self.doc()
317153
}
318154
}

0 commit comments

Comments
 (0)