Skip to content

Commit d1ecceb

Browse files
committed
Add more tests
1 parent 794f121 commit d1ecceb

File tree

2 files changed

+122
-5
lines changed
  • datafusion
    • core/tests/physical_optimizer/filter_pushdown
    • physical-plan/src/aggregates

2 files changed

+122
-5
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1957,10 +1957,10 @@ async fn test_aggregate_filter_pushdown() {
19571957
}
19581958

19591959
#[tokio::test]
1960-
async fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
1961-
// Test that filters on non-grouping columns (like aggregate results) are NOT pushed through
1962-
// Simulates: SELECT a, COUNT(b) as cnt FROM table GROUP BY a HAVING cnt > 5
1963-
// The filter on 'cnt' cannot be pushed down because it's an aggregate result, not a grouping column
1960+
async fn test_no_pushdown_filter_on_aggregate_result() {
1961+
// Test that filters on aggregate results (not grouping columns) are NOT pushed through
1962+
// SELECT a, COUNT(b) as cnt FROM table GROUP BY a HAVING cnt > 5
1963+
// The filter on 'cnt' cannot be pushed down because it's an aggregate result
19641964

19651965
let batches =
19661966
vec![
@@ -2009,7 +2009,7 @@ async fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
20092009
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap())
20102010
as Arc<dyn ExecutionPlan>;
20112011

2012-
// The filter should NOT be pushed through the aggregate since it references a non-grouping column
2012+
// The filter should NOT be pushed through the aggregate since it's on an aggregate result
20132013
insta::assert_snapshot!(
20142014
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
20152015
@r"
@@ -2165,3 +2165,118 @@ fn test_pushdown_grouping_sets_filter_on_common_column() {
21652165
"
21662166
);
21672167
}
2168+
2169+
#[test]
2170+
fn test_pushdown_with_empty_group_by() {
2171+
// Test that filters can be pushed down when GROUP BY is empty (no grouping columns)
2172+
// SELECT count(*) as cnt FROM table WHERE a = 'foo'
2173+
// There are no grouping columns, so the filter should still push down
2174+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
2175+
2176+
let aggregate_expr =
2177+
vec![
2178+
AggregateExprBuilder::new(count_udaf(), vec![col("c", &schema()).unwrap()])
2179+
.schema(schema())
2180+
.alias("cnt")
2181+
.build()
2182+
.map(Arc::new)
2183+
.unwrap(),
2184+
];
2185+
2186+
// Empty GROUP BY - no grouping columns
2187+
let group_by = PhysicalGroupBy::new_single(vec![]);
2188+
2189+
let aggregate = Arc::new(
2190+
AggregateExec::try_new(
2191+
AggregateMode::Final,
2192+
group_by,
2193+
aggregate_expr.clone(),
2194+
vec![None],
2195+
scan,
2196+
schema(),
2197+
)
2198+
.unwrap(),
2199+
);
2200+
2201+
// Filter on 'a'
2202+
let predicate = col_lit_predicate("a", "foo", &schema());
2203+
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
2204+
2205+
// The filter should be pushed down even with empty GROUP BY
2206+
insta::assert_snapshot!(
2207+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2208+
@r"
2209+
OptimizationTest:
2210+
input:
2211+
- FilterExec: a@0 = foo
2212+
- AggregateExec: mode=Final, gby=[], aggr=[cnt]
2213+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2214+
output:
2215+
Ok:
2216+
- AggregateExec: mode=Final, gby=[], aggr=[cnt]
2217+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2218+
"
2219+
);
2220+
}
2221+
2222+
#[test]
2223+
fn test_pushdown_with_computed_grouping_key() {
2224+
// Test filter pushdown with computed grouping expression
2225+
// SELECT (c + 1.0) as c_plus_1, count(*) FROM table WHERE c > 5.0 GROUP BY (c + 1.0)
2226+
2227+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
2228+
2229+
let predicate = Arc::new(BinaryExpr::new(
2230+
col("c", &schema()).unwrap(),
2231+
Operator::Gt,
2232+
Arc::new(Literal::new(ScalarValue::Float64(Some(5.0)))),
2233+
)) as Arc<dyn PhysicalExpr>;
2234+
let filter = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
2235+
2236+
let aggregate_expr =
2237+
vec![
2238+
AggregateExprBuilder::new(count_udaf(), vec![col("a", &schema()).unwrap()])
2239+
.schema(schema())
2240+
.alias("cnt")
2241+
.build()
2242+
.map(Arc::new)
2243+
.unwrap(),
2244+
];
2245+
2246+
let c_plus_one = Arc::new(BinaryExpr::new(
2247+
col("c", &schema()).unwrap(),
2248+
Operator::Plus,
2249+
Arc::new(Literal::new(ScalarValue::Float64(Some(1.0)))),
2250+
)) as Arc<dyn PhysicalExpr>;
2251+
2252+
let group_by =
2253+
PhysicalGroupBy::new_single(vec![(c_plus_one, "c_plus_1".to_string())]);
2254+
2255+
let plan = Arc::new(
2256+
AggregateExec::try_new(
2257+
AggregateMode::Final,
2258+
group_by,
2259+
aggregate_expr.clone(),
2260+
vec![None],
2261+
filter,
2262+
schema(),
2263+
)
2264+
.unwrap(),
2265+
);
2266+
2267+
// The filter should be pushed down because 'c' is extracted from the grouping expression (c + 1.0)
2268+
insta::assert_snapshot!(
2269+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2270+
@r"
2271+
OptimizationTest:
2272+
input:
2273+
- AggregateExec: mode=Final, gby=[c@2 + 1 as c_plus_1], aggr=[cnt]
2274+
- FilterExec: c@2 > 5
2275+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2276+
output:
2277+
Ok:
2278+
- AggregateExec: mode=Final, gby=[c@2 + 1 as c_plus_1], aggr=[cnt]
2279+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=c@2 > 5
2280+
"
2281+
);
2282+
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,8 @@ impl ExecutionPlan for AggregateExec {
10321032
CardinalityEffect::LowerEqual
10331033
}
10341034

1035+
/// Push down parent filters when possible (see implementation comment for details),
1036+
/// but do not introduce any new self filters.
10351037
fn gather_filters_for_pushdown(
10361038
&self,
10371039
_phase: FilterPushdownPhase,

0 commit comments

Comments
 (0)