Skip to content
4 changes: 2 additions & 2 deletions datafusion-examples/examples/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl AsyncScalarUDFImpl for AskLLM {
&self,
args: ScalarFunctionArgs,
_option: &ConfigOptions,
) -> Result<ArrayRef> {
) -> Result<ColumnarValue> {
// in a real UDF you would likely want to special case constant
// arguments to improve performance, but this example converts the
// arguments to arrays for simplicity.
Expand Down Expand Up @@ -234,6 +234,6 @@ impl AsyncScalarUDFImpl for AskLLM {
})
.collect();

Ok(Arc::new(result_array))
Ok(ColumnarValue::from(Arc::new(result_array) as ArrayRef))
}
}
5 changes: 2 additions & 3 deletions datafusion/expr/src/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use crate::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, FieldRef};
use async_trait::async_trait;
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -50,7 +49,7 @@ pub trait AsyncScalarUDFImpl: ScalarUDFImpl {
&self,
args: ScalarFunctionArgs,
option: &ConfigOptions,
) -> Result<ArrayRef>;
) -> Result<ColumnarValue>;
}

/// A scalar UDF that must be invoked using async methods
Expand Down Expand Up @@ -83,7 +82,7 @@ impl AsyncScalarUDF {
&self,
args: ScalarFunctionArgs,
option: &ConfigOptions,
) -> Result<ArrayRef> {
) -> Result<ColumnarValue> {
self.inner.invoke_async_with_args(args, option).await
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/async_scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl AsyncFuncExpr {
);
}

let datas = result_batches
let datas = ColumnarValue::values_to_arrays(&result_batches)?
.iter()
.map(|b| b.to_data())
.collect::<Vec<_>>();
Expand Down
11 changes: 5 additions & 6 deletions docs/source/library-user-guide/functions/adding-udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,12 @@ impl AsyncScalarUDFImpl for AsyncUpper {
}

/// This method is called to execute the async UDF and is similar
/// to the normal `invoke_with_args` except it returns an `ArrayRef`
/// instead of `ColumnarValue` and is `async`.
/// to the normal `invoke_with_args` except it is `async`.
async fn invoke_async_with_args(
&self,
args: ScalarFunctionArgs,
_option: &ConfigOptions,
) -> Result<ArrayRef> {
) -> Result<ColumnarValue> {
let value = &args.args[0];
// This function simply implements a simple string to uppercase conversion
// but can be used for any async operation such as network calls.
Expand All @@ -465,7 +464,7 @@ impl AsyncScalarUDFImpl for AsyncUpper {
}
_ => return internal_err!("Expected a string argument, got {:?}", value),
};
Ok(result)
Ok(ColumnarValue::from(result))
}
}
```
Expand Down Expand Up @@ -550,7 +549,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf`
# &self,
# args: ScalarFunctionArgs,
# _option: &ConfigOptions,
# ) -> Result<ArrayRef> {
# ) -> Result<ColumnarValue> {
# trace!("Invoking async_upper with args: {:?}", args);
# let value = &args.args[0];
# let result = match value {
Expand All @@ -564,7 +563,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf`
# }
# _ => return internal_err!("Expected a string argument, got {:?}", value),
# };
# Ok(result)
# Ok(ColumnarValue::from(result))
# }
# }
use datafusion::execution::context::SessionContext;
Expand Down