Skip to content
7 changes: 5 additions & 2 deletions datafusion-examples/examples/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ impl AsyncScalarUDFImpl for AskLLM {
/// is processing the query, so you may wish to make actual network requests
/// on a different `Runtime`, as explained in the `thread_pools.rs` example
/// in this directory.
async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result<ArrayRef> {
async fn invoke_async_with_args(
&self,
args: ScalarFunctionArgs,
) -> Result<ColumnarValue> {
// in a real UDF you would likely want to special case constant
// arguments to improve performance, but this example converts the
// arguments to arrays for simplicity.
Expand Down Expand Up @@ -229,6 +232,6 @@ impl AsyncScalarUDFImpl for AskLLM {
})
.collect();

Ok(Arc::new(result_array))
Ok(ColumnarValue::from(Arc::new(result_array) as ArrayRef))
}
}
8 changes: 5 additions & 3 deletions datafusion/expr/src/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::utils::{arc_ptr_eq, arc_ptr_hash};
use crate::{
udf_equals_hash, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, FieldRef};
use async_trait::async_trait;
use datafusion_common::error::Result;
Expand Down Expand Up @@ -48,7 +47,10 @@ pub trait AsyncScalarUDFImpl: ScalarUDFImpl {
}

/// Invoke the function asynchronously with the async arguments
async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result<ArrayRef>;
async fn invoke_async_with_args(
&self,
args: ScalarFunctionArgs,
) -> Result<ColumnarValue>;
}

/// A scalar UDF that must be invoked using async methods
Expand Down Expand Up @@ -95,7 +97,7 @@ impl AsyncScalarUDF {
pub async fn invoke_async_with_args(
&self,
args: ScalarFunctionArgs,
) -> Result<ArrayRef> {
) -> Result<ColumnarValue> {
self.inner.invoke_async_with_args(args).await
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/async_scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl AsyncFuncExpr {
);
}

let datas = result_batches
let datas = ColumnarValue::values_to_arrays(&result_batches)?
.iter()
.map(|b| b.to_data())
.collect::<Vec<_>>();
Expand Down
11 changes: 5 additions & 6 deletions docs/source/library-user-guide/functions/adding-udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,11 @@ impl AsyncScalarUDFImpl for AsyncUpper {
}

/// This method is called to execute the async UDF and is similar
/// to the normal `invoke_with_args` except it returns an `ArrayRef`
/// instead of `ColumnarValue` and is `async`.
/// to the normal `invoke_with_args` except it is `async`.
async fn invoke_async_with_args(
&self,
args: ScalarFunctionArgs,
) -> Result<ArrayRef> {
) -> Result<ColumnarValue> {
let value = &args.args[0];
// This function simply implements a simple string to uppercase conversion
// but can be used for any async operation such as network calls.
Expand All @@ -464,7 +463,7 @@ impl AsyncScalarUDFImpl for AsyncUpper {
}
_ => return internal_err!("Expected a string argument, got {:?}", value),
};
Ok(result)
Ok(ColumnarValue::from(result))
}
}
```
Expand Down Expand Up @@ -548,7 +547,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf`
# async fn invoke_async_with_args(
# &self,
# args: ScalarFunctionArgs,
# ) -> Result<ArrayRef> {
# ) -> Result<ColumnarValue> {
# trace!("Invoking async_upper with args: {:?}", args);
# let value = &args.args[0];
# let result = match value {
Expand All @@ -562,7 +561,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf`
# }
# _ => return internal_err!("Expected a string argument, got {:?}", value),
# };
# Ok(result)
# Ok(ColumnarValue::from(result))
# }
# }
use datafusion::execution::context::SessionContext;
Expand Down
42 changes: 42 additions & 0 deletions docs/source/library-user-guide/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,48 @@
**Note:** DataFusion `50.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version.
You can see the current [status of the `50.0.0 `release here](https://github.com/apache/datafusion/issues/16799)

### `AsyncScalarUDFImpl::invoke_async_with_args` returns `ColumnarValue`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this note here


In order to enable single value optimizations and be consistent with other
user defined function APIs, the `AsyncScalarUDFImpl::invoke_async_with_args` method now
returns a `ColumnarValue` instead of a `ArrayRef`.

To upgrade, change the return type of your implementation

```rust
# /* comment to avoid running
impl AsyncScalarUDFImpl for AskLLM {
async fn invoke_async_with_args(
&self,
args: ScalarFunctionArgs,
_option: &ConfigOptions,
) -> Result<ColumnarValue> {
..
return array_ref; // old code
}
}
# */
```

To return a `ColumnarValue`

```rust
# /* comment to avoid running
impl AsyncScalarUDFImpl for AskLLM {
async fn invoke_async_with_args(
&self,
args: ScalarFunctionArgs,
_option: &ConfigOptions,
) -> Result<ColumnarValue> {
..
return ColumnarValue::from(array_ref); // new code
}
}
# */
```

See [#16896](https://github.com/apache/datafusion/issues/16896) for more details.

### `SessionState`, `SessionConfig`, and `OptimizerConfig` returns `&Arc<ConfigOptions>` instead of `&ConfigOptions`

To provide broader access to `ConfigOptions` and reduce required clones, some
Expand Down