From 9bc0f7927ee6a6c6e27034d846563ebb8811dd13 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 23 Jun 2025 15:25:03 -0400 Subject: [PATCH 1/4] Simplify AsyncScalarUdfImpl so it extends ScalarUdfImpl --- datafusion-examples/examples/async_udf.rs | 40 ++++++++++++----- datafusion/core/src/physical_planner.rs | 10 +++-- datafusion/expr/src/async_udf.rs | 44 +++---------------- .../src/async_scalar_function.rs | 31 +++++++++---- datafusion/physical-plan/src/async_func.rs | 2 + 5 files changed, 67 insertions(+), 60 deletions(-) diff --git a/datafusion-examples/examples/async_udf.rs b/datafusion-examples/examples/async_udf.rs index 57925a0980a1..f7c781aa4aee 100644 --- a/datafusion-examples/examples/async_udf.rs +++ b/datafusion-examples/examples/async_udf.rs @@ -20,16 +20,14 @@ use arrow::compute::kernels::cmp::eq; use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; use datafusion::common::error::Result; -use datafusion::common::internal_err; +use datafusion::common::{internal_err, not_impl_err}; use datafusion::common::types::{logical_int64, logical_string}; use datafusion::common::utils::take_function_args; use datafusion::config::ConfigOptions; use datafusion::logical_expr::async_udf::{ - AsyncScalarFunctionArgs, AsyncScalarUDF, AsyncScalarUDFImpl, -}; -use datafusion::logical_expr::{ - ColumnarValue, Signature, TypeSignature, TypeSignatureClass, Volatility, + AsyncScalarUDF, AsyncScalarUDFImpl, }; +use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, TypeSignatureClass, Volatility}; use datafusion::logical_expr_common::signature::Coercion; use datafusion::physical_expr_common::datum::apply_cmp; use datafusion::prelude::SessionContext; @@ -153,7 +151,7 @@ impl AsyncUpper { } #[async_trait] -impl AsyncScalarUDFImpl for AsyncUpper { +impl ScalarUDFImpl for AsyncUpper { fn as_any(&self) -> &dyn Any { self } @@ -170,13 +168,24 @@ impl AsyncScalarUDFImpl for AsyncUpper { Ok(DataType::Utf8) } - fn ideal_batch_size(&self) -> Option { + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> Result { + not_impl_err!("AsyncUpper can only be called from async contexts") + } +} + +#[async_trait] +impl AsyncScalarUDFImpl for AsyncUpper { + +fn ideal_batch_size(&self) -> Option { Some(10) } async fn invoke_async_with_args( &self, - args: AsyncScalarFunctionArgs, + args: ScalarFunctionArgs, _option: &ConfigOptions, ) -> Result { trace!("Invoking async_upper with args: {:?}", args); @@ -226,7 +235,7 @@ impl AsyncEqual { } #[async_trait] -impl AsyncScalarUDFImpl for AsyncEqual { +impl ScalarUDFImpl for AsyncEqual { fn as_any(&self) -> &dyn Any { self } @@ -243,9 +252,20 @@ impl AsyncScalarUDFImpl for AsyncEqual { Ok(DataType::Boolean) } + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> Result { + not_impl_err!("AsyncEqual can only be called from async contexts") + } +} + +#[async_trait] + impl AsyncScalarUDFImpl for AsyncEqual { + async fn invoke_async_with_args( &self, - args: AsyncScalarFunctionArgs, + args: ScalarFunctionArgs, _option: &ConfigOptions, ) -> Result { let [arg1, arg2] = take_function_args(self.name(), &args.args)?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 8bf513a55a66..90cc0b572fef 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -779,9 +779,11 @@ impl DefaultPhysicalPlanner { let runtime_expr = self.create_physical_expr(predicate, input_dfschema, session_state)?; + let input_schema = input.schema(); let filter = match self.try_plan_async_exprs( - input.schema().fields().len(), + input_schema.fields().len(), PlannedExprResult::Expr(vec![runtime_expr]), + input_schema.as_arrow(), )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)? @@ -2082,6 +2084,7 @@ impl DefaultPhysicalPlanner { match self.try_plan_async_exprs( num_input_columns, PlannedExprResult::ExprWithName(physical_exprs), + input_physical_schema.as_ref(), )? { PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => Ok( Arc::new(ProjectionExec::try_new(physical_exprs, input_exec)?), @@ -2104,18 +2107,19 @@ impl DefaultPhysicalPlanner { &self, num_input_columns: usize, physical_expr: PlannedExprResult, + schema: &Schema, ) -> Result { let mut async_map = AsyncMapper::new(num_input_columns); match &physical_expr { PlannedExprResult::ExprWithName(exprs) => { exprs .iter() - .try_for_each(|(expr, _)| async_map.find_references(expr))?; + .try_for_each(|(expr, _)| async_map.find_references(expr, schema))?; } PlannedExprResult::Expr(exprs) => { exprs .iter() - .try_for_each(|expr| async_map.find_references(expr))?; + .try_for_each(|expr| async_map.find_references(expr, schema))?; } } diff --git a/datafusion/expr/src/async_udf.rs b/datafusion/expr/src/async_udf.rs index 4f2b593b421a..94d5d90c83b5 100644 --- a/datafusion/expr/src/async_udf.rs +++ b/datafusion/expr/src/async_udf.rs @@ -17,7 +17,7 @@ use crate::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl}; use arrow::array::ArrayRef; -use arrow::datatypes::{DataType, Field, FieldRef, SchemaRef}; +use arrow::datatypes::{DataType, FieldRef}; use async_trait::async_trait; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; @@ -35,34 +35,7 @@ use std::sync::Arc; /// /// The name is chosen to mirror ScalarUDFImpl #[async_trait] -pub trait AsyncScalarUDFImpl: Debug + Send + Sync { - /// the function cast as any - fn as_any(&self) -> &dyn Any; - - /// The name of the function - fn name(&self) -> &str; - - /// The signature of the function - fn signature(&self) -> &Signature; - - /// The return type of the function - fn return_type(&self, _arg_types: &[DataType]) -> Result; - - /// What type will be returned by this function, given the arguments? - /// - /// By default, this function calls [`Self::return_type`] with the - /// types of each argument. - fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { - let data_types = args - .arg_fields - .iter() - .map(|f| f.data_type()) - .cloned() - .collect::>(); - let return_type = self.return_type(&data_types)?; - Ok(Arc::new(Field::new(self.name(), return_type, true))) - } - +pub trait AsyncScalarUDFImpl: ScalarUDFImpl { /// The ideal batch size for this function. /// /// This is used to determine what size of data to be evaluated at once. @@ -74,7 +47,7 @@ pub trait AsyncScalarUDFImpl: Debug + Send + Sync { /// Invoke the function asynchronously with the async arguments async fn invoke_async_with_args( &self, - args: AsyncScalarFunctionArgs, + args: ScalarFunctionArgs, option: &ConfigOptions, ) -> Result; } @@ -107,7 +80,7 @@ impl AsyncScalarUDF { /// Invoke the function asynchronously with the async arguments pub async fn invoke_async_with_args( &self, - args: AsyncScalarFunctionArgs, + args: ScalarFunctionArgs, option: &ConfigOptions, ) -> Result { self.inner.invoke_async_with_args(args, option).await @@ -144,11 +117,4 @@ impl Display for AsyncScalarUDF { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "AsyncScalarUDF: {}", self.inner.name()) } -} - -#[derive(Debug)] -pub struct AsyncScalarFunctionArgs { - pub args: Vec, - pub number_rows: usize, - pub schema: SchemaRef, -} +} \ No newline at end of file diff --git a/datafusion/physical-expr/src/async_scalar_function.rs b/datafusion/physical-expr/src/async_scalar_function.rs index 97ba642b06e3..6839784fecb2 100644 --- a/datafusion/physical-expr/src/async_scalar_function.rs +++ b/datafusion/physical-expr/src/async_scalar_function.rs @@ -17,17 +17,18 @@ use crate::ScalarFunctionExpr; use arrow::array::{make_array, MutableArrayData, RecordBatch}; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::Result; use datafusion_common::{internal_err, not_impl_err}; -use datafusion_expr::async_udf::{AsyncScalarFunctionArgs, AsyncScalarUDF}; +use datafusion_expr::async_udf::{AsyncScalarUDF}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::any::Any; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::sync::Arc; +use datafusion_expr::ScalarFunctionArgs; /// Wrapper around a scalar function that can be evaluated asynchronously #[derive(Debug, Clone, Eq)] @@ -36,6 +37,8 @@ pub struct AsyncFuncExpr { pub name: String, /// The actual function (always `ScalarFunctionExpr`) pub func: Arc, + /// The field that this function will return + return_field: FieldRef, } impl Display for AsyncFuncExpr { @@ -59,7 +62,8 @@ impl Hash for AsyncFuncExpr { impl AsyncFuncExpr { /// create a new AsyncFuncExpr - pub fn try_new(name: impl Into, func: Arc) -> Result { + pub fn try_new(name: impl Into, func: Arc, schema: &Schema, + ) -> Result { let Some(_) = func.as_any().downcast_ref::() else { return internal_err!( "unexpected function type, expected ScalarFunctionExpr, got: {:?}", @@ -67,9 +71,11 @@ impl AsyncFuncExpr { ); }; + let return_field = func.return_field(schema)?; Ok(Self { name: name.into(), func, + return_field, }) } @@ -128,6 +134,12 @@ impl AsyncFuncExpr { ); }; + let arg_fields = scalar_function_expr + .args() + .iter() + .map(|e| e.return_field(batch.schema_ref())) + .collect::>>()?; + let mut result_batches = vec![]; if let Some(ideal_batch_size) = self.ideal_batch_size()? { let mut remainder = batch.clone(); @@ -148,10 +160,11 @@ impl AsyncFuncExpr { result_batches.push( async_udf .invoke_async_with_args( - AsyncScalarFunctionArgs { - args: args.to_vec(), + ScalarFunctionArgs { + args, + arg_fields: arg_fields.clone(), number_rows: current_batch.num_rows(), - schema: current_batch.schema(), + return_field: Arc::clone(&self.return_field), }, option, ) @@ -168,10 +181,11 @@ impl AsyncFuncExpr { result_batches.push( async_udf .invoke_async_with_args( - AsyncScalarFunctionArgs { + ScalarFunctionArgs { args: args.to_vec(), + arg_fields, number_rows: batch.num_rows(), - schema: batch.schema(), + return_field: Arc::clone(&self.return_field), }, option, ) @@ -223,6 +237,7 @@ impl PhysicalExpr for AsyncFuncExpr { Ok(Arc::new(AsyncFuncExpr { name: self.name.clone(), func: new_func, + return_field: Arc::clone(&self.return_field), })) } diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index c808c5711755..7e9ae827d5d1 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -245,6 +245,7 @@ impl AsyncMapper { pub fn find_references( &mut self, physical_expr: &Arc, + schema: &Schema, ) -> Result<()> { // recursively look for references to async functions physical_expr.apply(|expr| { @@ -256,6 +257,7 @@ impl AsyncMapper { self.async_exprs.push(Arc::new(AsyncFuncExpr::try_new( next_name, Arc::clone(expr), + schema, )?)); } } From f2e738d3c001500bf742f23c3e4100280bda8d0b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 23 Jun 2025 15:37:18 -0400 Subject: [PATCH 2/4] Update one example --- .../functions/adding-udfs.md | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 66ffd69b4545..e6d15268d5ef 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -347,7 +347,9 @@ async fn main() { ## Adding a Scalar Async UDF -A Scalar Async UDF allows you to implement user-defined functions that support asynchronous execution, such as performing network or I/O operations within the UDF. +A Scalar Async UDF allows you to implement user-defined functions that support +asynchronous execution, such as performing network or I/O operations within the +UDF. To add a Scalar Async UDF, you need to: @@ -361,14 +363,13 @@ use arrow::array::{ArrayIter, ArrayRef, AsArray, StringArray}; use arrow_schema::DataType; use async_trait::async_trait; use datafusion::common::error::Result; -use datafusion::common::internal_err; +use datafusion::common::{internal_err, not_impl_err}; use datafusion::common::types::logical_string; use datafusion::config::ConfigOptions; -use datafusion::logical_expr::async_udf::{ - AsyncScalarFunctionArgs, AsyncScalarUDFImpl, -}; +use datafusion_expr::ScalarUDFImpl; +use datafusion::logical_expr::async_udf::AsyncScalarUDFImpl; use datafusion::logical_expr::{ - ColumnarValue, Signature, TypeSignature, TypeSignatureClass, Volatility, + ColumnarValue, Signature, TypeSignature, TypeSignatureClass, Volatility, ScalarFunctionArgs }; use datafusion::logical_expr_common::signature::Coercion; use log::trace; @@ -399,8 +400,9 @@ impl AsyncUpper { } } +/// Implement the normal ScalarUDFImpl trait for AsyncUpper #[async_trait] -impl AsyncScalarUDFImpl for AsyncUpper { +impl ScalarUDFImpl for AsyncUpper { fn as_any(&self) -> &dyn Any { self } @@ -417,13 +419,24 @@ impl AsyncScalarUDFImpl for AsyncUpper { Ok(DataType::Utf8) } + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> Result { + not_impl_err!("AsyncUpper can only be called from async contexts") + } +} + +/// The actual implementation of the async UDF +#[async_trait] +impl AsyncScalarUDFImpl for AsyncUpper { fn ideal_batch_size(&self) -> Option { Some(10) } async fn invoke_async_with_args( &self, - args: AsyncScalarFunctionArgs, + args: ScalarFunctionArgs, _option: &ConfigOptions, ) -> Result { trace!("Invoking async_upper with args: {:?}", args); From f8f21955854a6dc962603f82ee189d8b5a15e3bc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 23 Jun 2025 15:39:25 -0400 Subject: [PATCH 3/4] Update one example --- .../functions/adding-udfs.md | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index e6d15268d5ef..31ddcb27c52b 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -464,14 +464,14 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf` # use arrow_schema::DataType; # use async_trait::async_trait; # use datafusion::common::error::Result; -# use datafusion::common::internal_err; +# use datafusion::common::{internal_err, not_impl_err}; # use datafusion::common::types::logical_string; # use datafusion::config::ConfigOptions; -# use datafusion::logical_expr::async_udf::{ -# AsyncScalarFunctionArgs, AsyncScalarUDFImpl, -# }; +use datafusion_expr::ScalarUDFImpl; + +# use datafusion::logical_expr::async_udf::AsyncScalarUDFImpl; # use datafusion::logical_expr::{ -# ColumnarValue, Signature, TypeSignature, TypeSignatureClass, Volatility, +# ColumnarValue, Signature, TypeSignature, TypeSignatureClass, Volatility, ScalarFunctionArgs # }; # use datafusion::logical_expr_common::signature::Coercion; # use log::trace; @@ -503,7 +503,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf` # } # # #[async_trait] -# impl AsyncScalarUDFImpl for AsyncUpper { +# impl ScalarUDFImpl for AsyncUpper { # fn as_any(&self) -> &dyn Any { # self # } @@ -519,7 +519,17 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf` # fn return_type(&self, _arg_types: &[DataType]) -> Result { # Ok(DataType::Utf8) # } +# +# fn invoke_with_args( +# &self, +# _args: ScalarFunctionArgs, +# ) -> Result { +# not_impl_err!("AsyncUpper can only be called from async contexts") +# } +# } # +# #[async_trait] +# impl AsyncScalarUDFImpl for AsyncUpper { # fn ideal_batch_size(&self) -> Option { # Some(10) # } From db73bc583af04ca89676635a3348bfed506b251f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 23 Jun 2025 15:40:34 -0400 Subject: [PATCH 4/4] prettier --- datafusion-examples/examples/async_udf.rs | 25 +++++++------------ datafusion/expr/src/async_udf.rs | 4 +-- .../src/async_scalar_function.rs | 13 ++++++---- .../functions/adding-udfs.md | 7 +++--- 4 files changed, 22 insertions(+), 27 deletions(-) diff --git a/datafusion-examples/examples/async_udf.rs b/datafusion-examples/examples/async_udf.rs index f7c781aa4aee..3037a971dfd9 100644 --- a/datafusion-examples/examples/async_udf.rs +++ b/datafusion-examples/examples/async_udf.rs @@ -20,14 +20,15 @@ use arrow::compute::kernels::cmp::eq; use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; use datafusion::common::error::Result; -use datafusion::common::{internal_err, not_impl_err}; use datafusion::common::types::{logical_int64, logical_string}; use datafusion::common::utils::take_function_args; +use datafusion::common::{internal_err, not_impl_err}; use datafusion::config::ConfigOptions; -use datafusion::logical_expr::async_udf::{ - AsyncScalarUDF, AsyncScalarUDFImpl, +use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + TypeSignatureClass, Volatility, }; -use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, TypeSignatureClass, Volatility}; use datafusion::logical_expr_common::signature::Coercion; use datafusion::physical_expr_common::datum::apply_cmp; use datafusion::prelude::SessionContext; @@ -168,18 +169,14 @@ impl ScalarUDFImpl for AsyncUpper { Ok(DataType::Utf8) } - fn invoke_with_args( - &self, - _args: ScalarFunctionArgs, - ) -> Result { + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { not_impl_err!("AsyncUpper can only be called from async contexts") } } #[async_trait] impl AsyncScalarUDFImpl for AsyncUpper { - -fn ideal_batch_size(&self) -> Option { + fn ideal_batch_size(&self) -> Option { Some(10) } @@ -252,17 +249,13 @@ impl ScalarUDFImpl for AsyncEqual { Ok(DataType::Boolean) } - fn invoke_with_args( - &self, - _args: ScalarFunctionArgs, - ) -> Result { + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { not_impl_err!("AsyncEqual can only be called from async contexts") } } #[async_trait] - impl AsyncScalarUDFImpl for AsyncEqual { - +impl AsyncScalarUDFImpl for AsyncEqual { async fn invoke_async_with_args( &self, args: ScalarFunctionArgs, diff --git a/datafusion/expr/src/async_udf.rs b/datafusion/expr/src/async_udf.rs index 94d5d90c83b5..d900c1634523 100644 --- a/datafusion/expr/src/async_udf.rs +++ b/datafusion/expr/src/async_udf.rs @@ -17,7 +17,7 @@ use crate::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl}; use arrow::array::ArrayRef; -use arrow::datatypes::{DataType, FieldRef}; +use arrow::datatypes::{DataType, FieldRef}; use async_trait::async_trait; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; @@ -117,4 +117,4 @@ impl Display for AsyncScalarUDF { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "AsyncScalarUDF: {}", self.inner.name()) } -} \ No newline at end of file +} diff --git a/datafusion/physical-expr/src/async_scalar_function.rs b/datafusion/physical-expr/src/async_scalar_function.rs index 6839784fecb2..547b9c13da62 100644 --- a/datafusion/physical-expr/src/async_scalar_function.rs +++ b/datafusion/physical-expr/src/async_scalar_function.rs @@ -21,14 +21,14 @@ use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::Result; use datafusion_common::{internal_err, not_impl_err}; -use datafusion_expr::async_udf::{AsyncScalarUDF}; +use datafusion_expr::async_udf::AsyncScalarUDF; +use datafusion_expr::ScalarFunctionArgs; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::any::Any; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use datafusion_expr::ScalarFunctionArgs; /// Wrapper around a scalar function that can be evaluated asynchronously #[derive(Debug, Clone, Eq)] @@ -62,7 +62,10 @@ impl Hash for AsyncFuncExpr { impl AsyncFuncExpr { /// create a new AsyncFuncExpr - pub fn try_new(name: impl Into, func: Arc, schema: &Schema, + pub fn try_new( + name: impl Into, + func: Arc, + schema: &Schema, ) -> Result { let Some(_) = func.as_any().downcast_ref::() else { return internal_err!( @@ -71,7 +74,7 @@ impl AsyncFuncExpr { ); }; - let return_field = func.return_field(schema)?; + let return_field = func.return_field(schema)?; Ok(Self { name: name.into(), func, @@ -134,7 +137,7 @@ impl AsyncFuncExpr { ); }; - let arg_fields = scalar_function_expr + let arg_fields = scalar_function_expr .args() .iter() .map(|e| e.return_field(batch.schema_ref())) diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 31ddcb27c52b..3cbfbb1e3333 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -467,8 +467,7 @@ We can now transfer the async UDF into the normal scalar using `into_scalar_udf` # use datafusion::common::{internal_err, not_impl_err}; # use datafusion::common::types::logical_string; # use datafusion::config::ConfigOptions; -use datafusion_expr::ScalarUDFImpl; - +# use datafusion_expr::ScalarUDFImpl; # use datafusion::logical_expr::async_udf::AsyncScalarUDFImpl; # use datafusion::logical_expr::{ # ColumnarValue, Signature, TypeSignature, TypeSignatureClass, Volatility, ScalarFunctionArgs @@ -519,7 +518,7 @@ use datafusion_expr::ScalarUDFImpl; # fn return_type(&self, _arg_types: &[DataType]) -> Result { # Ok(DataType::Utf8) # } -# +# # fn invoke_with_args( # &self, # _args: ScalarFunctionArgs, @@ -536,7 +535,7 @@ use datafusion_expr::ScalarUDFImpl; # # async fn invoke_async_with_args( # &self, -# args: AsyncScalarFunctionArgs, +# args: ScalarFunctionArgs, # _option: &ConfigOptions, # ) -> Result { # trace!("Invoking async_upper with args: {:?}", args);