Skip to content

Conversation

goldmedal
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

I have been working with @alamb to implement the functional for the async UDF.

It introduces the following trait:

#[async_trait]
pub trait AsyncScalarUDFImpl: Debug + Send + Sync {
    /// the function cast as any
    fn as_any(&self) -> &dyn Any;

    /// The name of the function
    fn name(&self) -> &str;

    /// The signature of the function
    fn signature(&self) -> &Signature;

    /// The return type of the function
    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>;

    /// The ideal batch size for this function.
    ///
    /// This is used to determine what size of data to be evaluated at once.
    /// If None, the whole batch will be evaluated at once.
    fn ideal_batch_size(&self) -> Option<usize> {
        None
    }

    /// Invoke the function asynchronously with the async arguments
    async fn invoke_async_with_args(
        &self,
        args: AsyncScalarFunctionArgs,
        option: &ConfigOptions,
    ) -> Result<ArrayRef>;
}

It allows the user to implement the UDF for invoking some external remote function in the query.
Given an async udf async_equal, the plan would look like:

> explain select async_equal(a.id, 1) from animal a
+---------------+----------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                   |
+---------------+----------------------------------------------------------------------------------------+
| logical_plan  | Projection: async_equal(a.id, Int64(1))                                                |
|               |   SubqueryAlias: a                                                                     |
|               |     TableScan: animal projection=[id]                                                  |
| physical_plan | ProjectionExec: expr=[__async_fn_0@1 as async_equal(a.id,Int64(1))]                    |
|               |   AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_equal(id@0, 1))] |
|               |     CoalesceBatchesExec: target_batch_size=8192                                        |
|               |       DataSourceExec: partitions=1, partition_sizes=[1]                                |
|               |                                                                                        |
+---------------+----------------------------------------------------------------------------------------+

To reduce the number of invoking the async function, CoalesceAsyncExecInput rule is used for coalescing the input batch of AsyncFuncExec.

See the details usages in the example.

What changes are included in this PR?

Remaining Work

  • Support for ProjectExec
  • Support for FilterExec
  • Support for Join Expression

Maybe implement in the follow-up PR

  • Async aggregation function
  • Async window function
  • Async table function (?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Feb 23, 2025
@alamb
Copy link
Contributor

alamb commented Feb 24, 2025

😮 -- thanks @goldmedal -- I'll put this on my list of things to review

@goldmedal goldmedal marked this pull request as ready for review March 12, 2025 02:54
@goldmedal
Copy link
Contributor Author

@alamb Sorry for the late. This PR is ready for review now.
I want to focus on Projection and Filter, which currently invoke the async UDF. After ensuring the approach makes sense, I'll create the follow-up PR for other plans.

@alamb
Copy link
Contributor

alamb commented Mar 12, 2025

Thanks I'll put it on my list

@berkaysynnada
Copy link
Contributor

What's the status of this PR?

@goldmedal
Copy link
Contributor Author

What's the status of this PR?

It's ready to review. I'm still waiting for someone to help review it.

@berkaysynnada
Copy link
Contributor

What's the status of this PR?

It's ready to review. I'm still waiting for someone to help review it.

Thanks @goldmedal. We'll need this as well, so let's revive it. I'm putting this into my review list.

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi again @goldmedal. I finally found some time to look into this. First of all, thank you for your work. This PR is in very good shape overall, and easy to follow the idea.

However, when I first imagined the design of this feature, I was thinking of approaching the problem from a different angle, which I believe could simplify things quite a bit:

What if we just added a new method to the PhysicalExpr trait, like evaluate_async()? We could then call this from streams that might involve async work. The default implementation would delegate to evaluate(), but in the case of ScalarFunctionExpr, we could branch depending on the function type.

This way, we wouldn't need to introduce a new physical rule or operator, which add overhead to both planning and execution. As I mentioned below, the special handling in the planner isn't well scalable IMO.

I'd love to hear your thoughts on my suggestion

@@ -775,12 +776,44 @@ impl DefaultPhysicalPlanner {

let runtime_expr =
self.create_physical_expr(predicate, input_dfschema, session_state)?;

let filter = match self.try_plan_async_exprs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to apply this pattern for every operator which has PhysicalExprs inside it that need to be evaluated during runtime? I think we can figure out another way to not make people modify the planner code for such every operator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at a really high level this pattern is basically the same as the "Common Subexpression Elimination" and many of the other optimizer passes -- that is pulling some subset of the expressions into a new node, and rewriting the others.

If we want to avoid having to follow the same model I think we could follow the model of some of the other recent optimizer passes and add a method to ExecutionPlan -- something like this perhaps

trait ExecutionPlan {
  /// Factor all async expressions in this ExecutionPlan from any internal expressions
  /// returning a list of such Async expressions and the rewritten plan
  ///
  /// The async expression values will be provided to the rewritten plan after all the existing
  /// input columns
  rewrite_async(&self) -> Transformed<(Vec<AsyncExpr>, Arc<dyn ExecutionPlan>) -> { 
    // default to not supporting async functins
    Transformed::no()
   }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this perhaps

rewritten plan is (async_exec + original plan)?

I think at a really high level this pattern is basically the same as the "Common Subexpression Elimination" and many of the other optimizer passes -- that is pulling some subset of the expressions into a new node, and rewriting the others.

I see the pattern now, but IMO for this async evaluation, adding a new operator for each async fn in the query seems a bit unnatural to me. I feel like we should encapsulate this feature in PhysicalExpr's level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it feels unnatural

The downside in my mind of trying to put it in PhysicalExpr is then it complicates implementing PhysicalExpr even when most PhysicalExprs don't need to worry about it

Thus I think treating async udfs specially while not ideal will make it easier to understand how different they are

@adriangb
Copy link
Contributor

What if we just added a new method to the PhysicalExpr trait, like evaluate_async()? We could then call this from streams that might involve async work. The default implementation would delegate to evaluate(), but in the case of ScalarFunctionExpr, we could branch depending on the function type.

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @goldmedal -- I am sorry I missed this PR for so long. I think it is a great extension for DataFusion and will make using DataFusion with various new LLMs / services easier

I am approving this PR as I think it follows the existing patterns for optimizers and adds some key functionality

However, note I am quite biased as I had something to do with this pattern here goldmedal/datafusion-llm-function#1. Thus I believe that we should address @berkaysynnada and @adriangb 's concerns prior to megign

I think we should file some follow on tickets to

  1. Add support for the remaining nodes
  2. Add some more documentation / examples

@@ -775,12 +776,44 @@ impl DefaultPhysicalPlanner {

let runtime_expr =
self.create_physical_expr(predicate, input_dfschema, session_state)?;

let filter = match self.try_plan_async_exprs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at a really high level this pattern is basically the same as the "Common Subexpression Elimination" and many of the other optimizer passes -- that is pulling some subset of the expressions into a new node, and rewriting the others.

If we want to avoid having to follow the same model I think we could follow the model of some of the other recent optimizer passes and add a method to ExecutionPlan -- something like this perhaps

trait ExecutionPlan {
  /// Factor all async expressions in this ExecutionPlan from any internal expressions
  /// returning a list of such Async expressions and the rewritten plan
  ///
  /// The async expression values will be provided to the rewritten plan after all the existing
  /// input columns
  rewrite_async(&self) -> Transformed<(Vec<AsyncExpr>, Arc<dyn ExecutionPlan>) -> { 
    // default to not supporting async functins
    Transformed::no()
   }
}

@alamb alamb mentioned this pull request May 11, 2025
24 tasks
@berkaysynnada
Copy link
Contributor

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Easy answer is converting original evaluate()'s to async, and move all evalute() impls to evaluate_sync(), but I cannot fully estimate its effects and challenges. Any comes to your mind?

@adriangb
Copy link
Contributor

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Easy answer is converting original evaluate()'s to async, and move all evalute() impls to evaluate_sync(), but I cannot fully estimate its effects and challenges. Any comes to your mind?

I mean that makes sense but sounds like a lot of churn? I'm not sure tbh sync / async coloring is always a pain and I don't know of any good solutions :(

@berkaysynnada
Copy link
Contributor

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Easy answer is converting original evaluate()'s to async, and move all evalute() impls to evaluate_sync(), but I cannot fully estimate its effects and challenges. Any comes to your mind?

I mean that makes sense but sounds like a lot of churn? I'm not sure tbh sync / async coloring is always a pain and I don't know of any good solutions :(

I'll try a POC when I find some time, and wonder @alamb's opinion

@alamb
Copy link
Contributor

alamb commented May 11, 2025

How would that work going from sync -> async? For example: 1 = 2 OR 1 = call_llm_model_async(). I imagine this would build something like BinaryExpr(BinaryExpr(1, Eq, 2), Or, ScalarFunc(call_llm_model_async)). If we call evaluate_async on the outer BinaryExpr it would call evaluate() by default so now you're in sync world. How do you break back into async world? Do we pass around a handle to the tokio runtime?

Easy answer is converting original evaluate()'s to async, and move all evalute() impls to evaluate_sync(), but I cannot fully estimate its effects and challenges. Any comes to your mind?

I mean that makes sense but sounds like a lot of churn? I'm not sure tbh sync / async coloring is always a pain and I don't know of any good solutions :(

I'll try a POC when I find some time, and wonder @alamb's opinion

My feeling (without any solid data) is that using async functions is not ideal because:

  1. The async overhead (e.g. what it takes to make await vs a normal function) could be noticable, but maybe not that big a deal
  2. The fact that everything that calls UDF would have to be async (as only async functions can call other async functions) -- the so called "what color are your functions" problem -- we be quite disruptive.

Another benefit of the approach in this PR is that it requires no changes to any existing functions or APIs (in fact the original POC can be implemented entirely as a DataFusion user defined optimizer extension)

@goldmedal
Copy link
Contributor Author

hi @alamb
I have fixed the conflicts. If no more comments, I think we can merge it.

@alamb
Copy link
Contributor

alamb commented Jun 23, 2025

Thanks @goldmedal -- I will file some follow on tickets and then merge

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks like a good start -- let's merge it and those who want to use it can iterate on it as we go

Thanks @goldmedal

@@ -775,12 +776,44 @@ impl DefaultPhysicalPlanner {

let runtime_expr =
self.create_physical_expr(predicate, input_dfschema, session_state)?;

let filter = match self.try_plan_async_exprs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it feels unnatural

The downside in my mind of trying to put it in PhysicalExpr is then it complicates implementing PhysicalExpr even when most PhysicalExprs don't need to worry about it

Thus I think treating async udfs specially while not ideal will make it easier to understand how different they are

fn signature(&self) -> &Signature;

/// The return type of the function
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we originally made this PR, the udfs have been changed again (to use Fields/FieldRef rather than DataType)...

@alamb alamb merged commit cdaaef7 into apache:main Jun 23, 2025
30 checks passed
@goldmedal goldmedal deleted the epic/async-udf branch June 24, 2025 01:49
@goldmedal
Copy link
Contributor Author

Thanks @alamb @berkaysynnada @kylebarron @ozankabak @Omega359 @paleolimbot for reviewing and suggestions 🚀

@alamb
Copy link
Contributor

alamb commented Jul 21, 2025

I made a follow on PR to update the docs a bit:

This is so exciting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate documentation Improvements or additions to documentation logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Async User Defined Functions (UDF)
9 participants