Skip to content

Commit 94e8548

Browse files
adriangbalamb
andauthored
disallow pushdown of volatile functions (#16861)
* dissallow pushdown of volatile PhysicalExprs * fix * add FilteredVec helper to handle filter / remap pattern (#34) * checkpoint: Address PR feedback in https://github.com/apach... * add FilteredVec to consolidate handling of filter / remap pattern * lint * Add slt test for pushing volatile predicates down (#35) --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent f2086f3 commit 94e8548

File tree

4 files changed

+380
-38
lines changed

4 files changed

+380
-38
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@ use datafusion::{
3434
};
3535
use datafusion_common::config::ConfigOptions;
3636
use datafusion_execution::object_store::ObjectStoreUrl;
37+
use datafusion_expr::ScalarUDF;
38+
use datafusion_functions::math::random::RandomFunc;
3739
use datafusion_functions_aggregate::count::count_udaf;
38-
use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning};
40+
use datafusion_physical_expr::{
41+
aggregate::AggregateExprBuilder, Partitioning, ScalarFunctionExpr,
42+
};
3943
use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr};
4044
use datafusion_physical_optimizer::{
4145
filter_pushdown::FilterPushdown, PhysicalOptimizerRule,
@@ -76,6 +80,40 @@ fn test_pushdown_into_scan() {
7680
);
7781
}
7882

83+
#[test]
84+
fn test_pushdown_volatile_functions_not_allowed() {
85+
// Test that we do not push down filters with volatile functions
86+
// Use random() as an example of a volatile function
87+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
88+
let predicate = Arc::new(BinaryExpr::new(
89+
Arc::new(Column::new_with_schema("a", &schema()).unwrap()),
90+
Operator::Eq,
91+
Arc::new(
92+
ScalarFunctionExpr::try_new(
93+
Arc::new(ScalarUDF::from(RandomFunc::new())),
94+
vec![],
95+
&schema(),
96+
)
97+
.unwrap(),
98+
),
99+
)) as Arc<dyn PhysicalExpr>;
100+
let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
101+
// expect the filter to not be pushed down
102+
insta::assert_snapshot!(
103+
OptimizationTest::new(plan, FilterPushdown::new(), true),
104+
@r"
105+
OptimizationTest:
106+
input:
107+
- FilterExec: a@0 = random()
108+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
109+
output:
110+
Ok:
111+
- FilterExec: a@0 = random()
112+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
113+
",
114+
);
115+
}
116+
79117
/// Show that we can use config options to determine how to do pushdown.
80118
#[test]
81119
fn test_pushdown_into_scan_with_config_options() {

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use datafusion_datasource::{
2626
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
2727
schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile,
2828
};
29-
use datafusion_physical_expr::conjunction;
3029
use datafusion_physical_expr_common::physical_expr::fmt_sql;
3130
use datafusion_physical_optimizer::PhysicalOptimizerRule;
3231
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, PushedDown};
@@ -224,7 +223,9 @@ impl FileSource for TestSource {
224223
filters.push(Arc::clone(internal));
225224
}
226225
let new_node = Arc::new(TestSource {
227-
predicate: Some(conjunction(filters.clone())),
226+
predicate: datafusion_physical_expr::utils::conjunction_opt(
227+
filters.clone(),
228+
),
228229
..self.clone()
229230
});
230231
Ok(FilterPushdownPropagation::with_parent_pushdown_result(

0 commit comments

Comments
 (0)