Skip to content

dissallow pushdown of volatile PhysicalExprs #16861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

Closes #16545

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate labels Jul 23, 2025
@adriangb
Copy link
Contributor Author

@theirix could you take a look? Are there any other expressions we should dissallow?

@alamb are there any existing APIs to get "volatility" from a PhysicalExpr? If not should we add some?

@theirix
Copy link
Contributor

theirix commented Jul 23, 2025

Thank you! I'll check my cases.

@theirix
Copy link
Contributor

theirix commented Jul 23, 2025

Thank you, @adriangb ! I can confirm that it works great with the table sampling, since I use random function (matched by name):

query TT
EXPLAIN SELECT COUNT(*) from t TABLESAMPLE 42 WHERE a < 10;
----
logical_plan
01)Projection: count(Int64(1)) AS count(*)
02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
03)----Projection:
04)------Filter: t.a < Int32(10) AND random() < Float64(0.42)
05)--------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------ProjectionExec: expr=[]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: a@0 < 10 AND random() < 0.42
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]

The volatile filter is not pushed to the datasource. Without this patch, it looked like predicate=random() < 0.1.

I agree it'd be more scalable to have an abstract way to specify UDF volatility.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb and @theirix -- this looks good to me. I think we need to change the volatility check but otherwise this is good to go

It is probably reasonable to add a pub fn volatility() method to PhysicalExpr as well so that user defined expressions that are volatile / shouldn't be pushed down can be excluded as well

expr.as_any()
.downcast_ref::<datafusion_physical_expr::ScalarFunctionExpr>()
{
let name = scalar_function.fun().name();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -485,21 +497,32 @@ fn push_down_filters(
// currently. `self_filters` are the predicates which are provided by the current node,
// and tried to be pushed down over the child similarly.

let num_self_filters = self_filters.len();
let mut all_predicates = self_filters.clone();
// Filter out self_filters that contain volatile expressions and track indices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the book keeping for volatile functions got complicated (though I can't really see a better way to do it).

Maybe we could encapsulate the book keeping into some structure that would make it easier to track or something 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed. I'll see what I can come up with...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude and I came up with pydantic#34. It's not exactly simple or pretty, but it does put the indexing logic in one place and make it testable in isolation. Curious what you folks think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Physical plan pushdown for volatile predicates
3 participants