Skip to content

Commit b0c8dd6

Browse files
Make AsyncScalarUDFImpl::invoke_async_with_args consistent with ScalarUDFImpl::invoke_with_args (#16902)
* Change AsyncScalarUDFImpl::invoke_async_with_args return type to ColumnarValue * fix docs * cargo fmt * cargo clippy * Add a note in the upgrade guide * Fix merge blunder --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 71b92bc commit b0c8dd6

File tree

5 files changed

+58
-12
lines changed

5 files changed

+58
-12
lines changed

datafusion-examples/examples/async_udf.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ impl AsyncScalarUDFImpl for AskLLM {
194194
/// is processing the query, so you may wish to make actual network requests
195195
/// on a different `Runtime`, as explained in the `thread_pools.rs` example
196196
/// in this directory.
197-
async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result<ArrayRef> {
197+
async fn invoke_async_with_args(
198+
&self,
199+
args: ScalarFunctionArgs,
200+
) -> Result<ColumnarValue> {
198201
// in a real UDF you would likely want to special case constant
199202
// arguments to improve performance, but this example converts the
200203
// arguments to arrays for simplicity.
@@ -229,6 +232,6 @@ impl AsyncScalarUDFImpl for AskLLM {
229232
})
230233
.collect();
231234

232-
Ok(Arc::new(result_array))
235+
Ok(ColumnarValue::from(Arc::new(result_array) as ArrayRef))
233236
}
234237
}

datafusion/expr/src/async_udf.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use crate::utils::{arc_ptr_eq, arc_ptr_hash};
1919
use crate::{
2020
udf_equals_hash, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
2121
};
22-
use arrow::array::ArrayRef;
2322
use arrow::datatypes::{DataType, FieldRef};
2423
use async_trait::async_trait;
2524
use datafusion_common::error::Result;
@@ -48,7 +47,10 @@ pub trait AsyncScalarUDFImpl: ScalarUDFImpl {
4847
}
4948

5049
/// Invoke the function asynchronously with the async arguments
51-
async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result<ArrayRef>;
50+
async fn invoke_async_with_args(
51+
&self,
52+
args: ScalarFunctionArgs,
53+
) -> Result<ColumnarValue>;
5254
}
5355

5456
/// A scalar UDF that must be invoked using async methods
@@ -95,7 +97,7 @@ impl AsyncScalarUDF {
9597
pub async fn invoke_async_with_args(
9698
&self,
9799
args: ScalarFunctionArgs,
98-
) -> Result<ArrayRef> {
100+
) -> Result<ColumnarValue> {
99101
self.inner.invoke_async_with_args(args).await
100102
}
101103
}

datafusion/physical-expr/src/async_scalar_function.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl AsyncFuncExpr {
192192
);
193193
}
194194

195-
let datas = result_batches
195+
let datas = ColumnarValue::values_to_arrays(&result_batches)?
196196
.iter()
197197
.map(|b| b.to_data())
198198
.collect::<Vec<_>>();

docs/source/library-user-guide/functions/adding-udfs.md

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -444,12 +444,11 @@ impl AsyncScalarUDFImpl for AsyncUpper {
444444
}
445445

446446
/// This method is called to execute the async UDF and is similar
447-
/// to the normal `invoke_with_args` except it returns an `ArrayRef`
448-
/// instead of `ColumnarValue` and is `async`.
447+
/// to the normal `invoke_with_args` except it is `async`.
449448
async fn invoke_async_with_args(
450449
&self,
451450
args: ScalarFunctionArgs,
452-
) -> Result<ArrayRef> {
451+
) -> Result<ColumnarValue> {
453452
let value = &args.args[0];
454453
// This function simply implements a simple string to uppercase conversion
455454
// but can be used for any async operation such as network calls.
@@ -464,7 +463,7 @@ impl AsyncScalarUDFImpl for AsyncUpper {
464463
}
465464
_ => return internal_err!("Expected a string argument, got {:?}", value),
466465
};
467-
Ok(result)
466+
Ok(ColumnarValue::from(result))
468467
}
469468
}
470469
```
@@ -548,7 +547,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf`
548547
# async fn invoke_async_with_args(
549548
# &self,
550549
# args: ScalarFunctionArgs,
551-
# ) -> Result<ArrayRef> {
550+
# ) -> Result<ColumnarValue> {
552551
# trace!("Invoking async_upper with args: {:?}", args);
553552
# let value = &args.args[0];
554553
# let result = match value {
@@ -562,7 +561,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf`
562561
# }
563562
# _ => return internal_err!("Expected a string argument, got {:?}", value),
564563
# };
565-
# Ok(result)
564+
# Ok(ColumnarValue::from(result))
566565
# }
567566
# }
568567
use datafusion::execution::context::SessionContext;

docs/source/library-user-guide/upgrading.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,48 @@
2424
**Note:** DataFusion `50.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version.
2525
You can see the current [status of the `50.0.0 `release here](https://github.com/apache/datafusion/issues/16799)
2626

27+
### `AsyncScalarUDFImpl::invoke_async_with_args` returns `ColumnarValue`
28+
29+
In order to enable single value optimizations and be consistent with other
30+
user defined function APIs, the `AsyncScalarUDFImpl::invoke_async_with_args` method now
31+
returns a `ColumnarValue` instead of a `ArrayRef`.
32+
33+
To upgrade, change the return type of your implementation
34+
35+
```rust
36+
# /* comment to avoid running
37+
impl AsyncScalarUDFImpl for AskLLM {
38+
async fn invoke_async_with_args(
39+
&self,
40+
args: ScalarFunctionArgs,
41+
_option: &ConfigOptions,
42+
) -> Result<ColumnarValue> {
43+
..
44+
return array_ref; // old code
45+
}
46+
}
47+
# */
48+
```
49+
50+
To return a `ColumnarValue`
51+
52+
```rust
53+
# /* comment to avoid running
54+
impl AsyncScalarUDFImpl for AskLLM {
55+
async fn invoke_async_with_args(
56+
&self,
57+
args: ScalarFunctionArgs,
58+
_option: &ConfigOptions,
59+
) -> Result<ColumnarValue> {
60+
..
61+
return ColumnarValue::from(array_ref); // new code
62+
}
63+
}
64+
# */
65+
```
66+
67+
See [#16896](https://github.com/apache/datafusion/issues/16896) for more details.
68+
2769
### `SessionState`, `SessionConfig`, and `OptimizerConfig` returns `&Arc<ConfigOptions>` instead of `&ConfigOptions`
2870

2971
To provide broader access to `ConfigOptions` and reduce required clones, some

0 commit comments

Comments
 (0)