Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,33 @@ async fn test_nvl2() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_nvl2_short_circuit() -> Result<()> {
let expr = nvl2(
col("a"),
arrow_cast(lit("1"), lit("Int32")),
arrow_cast(col("a"), lit("Int32")),
);

let batches = get_batches(expr).await?;

assert_snapshot!(
batches_to_string(&batches),
@r#"
+-----------------------------------------------------------------------------------+
| nvl2(test.a,arrow_cast(Utf8("1"),Utf8("Int32")),arrow_cast(test.a,Utf8("Int32"))) |
+-----------------------------------------------------------------------------------+
| 1 |
| 1 |
| 1 |
| 1 |
+-----------------------------------------------------------------------------------+
"#
);

Ok(())
}
#[tokio::test]
async fn test_fn_arrow_typeof() -> Result<()> {
let expr = arrow_typeof(col("l"));
Expand Down
31 changes: 31 additions & 0 deletions datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,37 @@ async fn test_create_physical_expr() {
create_simplified_expr_test(lit(1i32) + lit(2i32), "3");
}

#[test]
fn test_create_physical_expr_nvl2() {
#[rustfmt::skip]
evaluate_expr_test(
nvl2(col("i"), lit(1i64), lit(0i64)),
vec![
"+------+",
"| expr |",
"+------+",
"| 1 |",
"| 0 |",
"| 1 |",
"+------+",
],
);

#[rustfmt::skip]
evaluate_expr_test(
nvl2(lit(1i64), col("i"), lit(0i64)),
vec![
"+------+",
"| expr |",
"+------+",
"| 10 |",
"| |",
"| 5 |",
"+------+",
],
);
}

#[tokio::test]
async fn test_create_physical_expr_coercion() {
// create_physical_expr does apply type coercion and unwrapping in cast
Expand Down
117 changes: 71 additions & 46 deletions datafusion/functions/src/core/nvl2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::Array;
use arrow::compute::is_not_null;
use arrow::compute::kernels::zip::zip;
use arrow::datatypes::DataType;
use datafusion_common::{internal_err, utils::take_function_args, Result};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::{internal_err, plan_err, utils::take_function_args, Result};
use datafusion_expr::{
type_coercion::binary::comparison_coercion, ColumnarValue, Documentation,
ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
conditional_expressions::CaseBuilder,
simplify::{ExprSimplifyResult, SimplifyInfo},
type_coercion::binary::comparison_coercion,
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
ScalarUDFImpl, Signature, Volatility,
};
use datafusion_macros::user_doc;
use std::sync::Arc;

#[user_doc(
doc_section(label = "Conditional Functions"),
Expand Down Expand Up @@ -95,8 +96,71 @@ impl ScalarUDFImpl for NVL2Func {
Ok(arg_types[1].clone())
}

fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
let nullable =
args.arg_fields[1].is_nullable() || args.arg_fields[2].is_nullable();
let return_type = args.arg_fields[1].data_type().clone();
Ok(Field::new(self.name(), return_type, nullable).into())
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
nvl2_func(&args.args)
let [test, if_non_null, if_null] = take_function_args(self.name(), args.args)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #17357 the author chose to make invoke_with_args return an internal error instead of retaining the implementation. Would we want to do the same here?

I'm a bit on the fence myself. On the one hand, this is effectively dead code for most users. On the other hand, raising an error here may cause breakage for users who have customised their optimiser passes and are not doing simplification. No idea if anyone actually does that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point 🤔

Personally I'm of the mind to remove this impl and have it return error; part of the benefit of this PR is reducing the amount of code we'd need to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make invoke_with_args return an internal error instead of retaining the implementation.

The new fallback evaluator is exercised directly in test_create_physical_expr_nvl2, which builds physical expressions without running the simplifier. Returning an error here would regress those non-simplified code paths.

It isn’t just about satisfying a unit test, it's about preserving a supported API surface. SessionContext::create_physical_expr explicitly states that it performs coercion and rewrite passes but does not run the expression simplifier, so any expression handed directly to that API must still execute correctly without being rewritten to a CASE statement first.
The test_create_physical_expr_nvl2 fixture exercises exactly that public workflow by building a physical expression through SessionContext::create_physical_expr and evaluating it without simplification.
If we changed invoke_with_args to return an error, that flow would regress for library users in the same way it would fail for the test.

Rather than removing or rewriting the test, I think we should keep it to guard this behavior; it’s effectively documenting that nvl2 continues to work for consumers who rely on the non-simplifying physical-expr builder, which the function implementation currently supports.

I recommend keeping the implementation so those tests—and any downstream consumers that bypass simplification—continue to work.

Copy link
Contributor

@pepijnve pepijnve Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change in coalesce (and now indirectly also nvl/ifnull) already broke this though. If unsimplified execution is desirable, perhaps nvl should be restored too because to not have arbitrary behaviour depending on the used UDF. In other words, I think you have to be consistent about this. Either all physical exprs should work or you shouldn’t bother with this. Cherry picking is a bit pointless in my opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any expression handed directly to that API must still execute correctly without being rewritten to a CASE statement first.

One subtlety here is that there is a change in semantics before and after simplification. nvl2(1, 1, 1 / 0) will fail pre simplification but will work correctly once simplified due to the switch from eager to lazy evaluation. I think I would prefer a clear failure over a subtle difference in behaviour.

If we do want to keep the invoke_with_args implementations, one option could be to consider #17997 (or some variant of that idea) so that it can also be implemented lazily.

Regarding code maintenance/duplication, nvl2 is an instance of the ExpressionOrExpression evaluation method from CaseExpr. Perhaps a slightly modified version of CaseExpr::expr_or_expr could be made so that nvl and nvl2 could call that? I think what I'm trying to say is that maybe code reuse via simplify is maybe not the best idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #17357 the author chose to make invoke_with_args return an internal error instead of retaining the implementation. Would we want to do the same here?

I amended invoke_with_args to return internal_err for consistency and also to reduce code.


match test {
ColumnarValue::Scalar(test_scalar) => {
if test_scalar.is_null() {
Ok(if_null)
} else {
Ok(if_non_null)
}
}
ColumnarValue::Array(test_array) => {
let len = test_array.len();

let if_non_null_array = match if_non_null {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len)?,
};

let if_null_array = match if_null {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len)?,
};

let mask = is_not_null(&test_array)?;
let result = zip(&mask, &if_non_null_array, &if_null_array)?;
Ok(ColumnarValue::Array(result))
}
}
}

fn simplify(
&self,
args: Vec<Expr>,
_info: &dyn SimplifyInfo,
) -> Result<ExprSimplifyResult> {
if args.len() != 3 {
return plan_err!("nvl2 must have exactly three arguments");
}

let mut args = args.into_iter();
let test = args.next().unwrap();
let if_non_null = args.next().unwrap();
let if_null = args.next().unwrap();

let expr = CaseBuilder::new(
None,
vec![test.is_not_null()],
vec![if_non_null],
Some(Box::new(if_null)),
)
.end()?;

Ok(ExprSimplifyResult::Simplified(expr))
}

fn short_circuits(&self) -> bool {
true
}

fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
Expand All @@ -123,42 +187,3 @@ impl ScalarUDFImpl for NVL2Func {
self.doc()
}
}

fn nvl2_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let mut len = 1;
let mut is_array = false;
for arg in args {
if let ColumnarValue::Array(array) = arg {
len = array.len();
is_array = true;
break;
}
}
if is_array {
let args = args
.iter()
.map(|arg| match arg {
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len),
ColumnarValue::Array(array) => Ok(Arc::clone(array)),
})
.collect::<Result<Vec<_>>>()?;
let [tested, if_non_null, if_null] = take_function_args("nvl2", args)?;
let to_apply = is_not_null(&tested)?;
let value = zip(&to_apply, &if_non_null, &if_null)?;
Ok(ColumnarValue::Array(value))
} else {
let [tested, if_non_null, if_null] = take_function_args("nvl2", args)?;
match &tested {
ColumnarValue::Array(_) => {
internal_err!("except Scalar value, but got Array")
}
ColumnarValue::Scalar(scalar) => {
if scalar.is_null() {
Ok(if_null.clone())
} else {
Ok(if_non_null.clone())
}
}
}
}
}