Skip to content

Commit a858b21

Browse files
committed
Add more tests
1 parent 794f121 commit a858b21

File tree

2 files changed

+175
-5
lines changed
  • datafusion
    • core/tests/physical_optimizer/filter_pushdown
    • physical-plan/src/aggregates

2 files changed

+175
-5
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 173 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1957,10 +1957,10 @@ async fn test_aggregate_filter_pushdown() {
19571957
}
19581958

19591959
#[tokio::test]
1960-
async fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
1961-
// Test that filters on non-grouping columns (like aggregate results) are NOT pushed through
1962-
// Simulates: SELECT a, COUNT(b) as cnt FROM table GROUP BY a HAVING cnt > 5
1963-
// The filter on 'cnt' cannot be pushed down because it's an aggregate result, not a grouping column
1960+
async fn test_no_pushdown_filter_on_aggregate_result() {
1961+
// Test that filters on aggregate results (not grouping columns) are NOT pushed through
1962+
// SELECT a, COUNT(b) as cnt FROM table GROUP BY a HAVING cnt > 5
1963+
// The filter on 'cnt' cannot be pushed down because it's an aggregate result
19641964

19651965
let batches =
19661966
vec![
@@ -2009,7 +2009,7 @@ async fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
20092009
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap())
20102010
as Arc<dyn ExecutionPlan>;
20112011

2012-
// The filter should NOT be pushed through the aggregate since it references a non-grouping column
2012+
// The filter should NOT be pushed through the aggregate since it's on an aggregate result
20132013
insta::assert_snapshot!(
20142014
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
20152015
@r"
@@ -2027,6 +2027,59 @@ async fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
20272027
);
20282028
}
20292029

2030+
#[test]
2031+
fn test_pushdown_filter_on_non_first_grouping_column() {
2032+
// Test that filters on non-first grouping columns are still pushed down
2033+
// SELECT a, b, count(*) as cnt FROM table GROUP BY a, b HAVING b = 'bar'
2034+
// The filter is on 'b' (second grouping column), should push down
2035+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
2036+
2037+
let aggregate_expr =
2038+
vec![
2039+
AggregateExprBuilder::new(count_udaf(), vec![col("c", &schema()).unwrap()])
2040+
.schema(schema())
2041+
.alias("cnt")
2042+
.build()
2043+
.map(Arc::new)
2044+
.unwrap(),
2045+
];
2046+
2047+
let group_by = PhysicalGroupBy::new_single(vec![
2048+
(col("a", &schema()).unwrap(), "a".to_string()),
2049+
(col("b", &schema()).unwrap(), "b".to_string()),
2050+
]);
2051+
2052+
let aggregate = Arc::new(
2053+
AggregateExec::try_new(
2054+
AggregateMode::Final,
2055+
group_by,
2056+
aggregate_expr.clone(),
2057+
vec![None],
2058+
scan,
2059+
schema(),
2060+
)
2061+
.unwrap(),
2062+
);
2063+
2064+
let predicate = col_lit_predicate("b", "bar", &schema());
2065+
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
2066+
2067+
insta::assert_snapshot!(
2068+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2069+
@r"
2070+
OptimizationTest:
2071+
input:
2072+
- FilterExec: b@1 = bar
2073+
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
2074+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2075+
output:
2076+
Ok:
2077+
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([1])
2078+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar
2079+
"
2080+
);
2081+
}
2082+
20302083
#[test]
20312084
fn test_no_pushdown_grouping_sets_filter_on_missing_column() {
20322085
// Test that filters on columns missing from some grouping sets are NOT pushed through
@@ -2165,3 +2218,118 @@ fn test_pushdown_grouping_sets_filter_on_common_column() {
21652218
"
21662219
);
21672220
}
2221+
2222+
#[test]
2223+
fn test_pushdown_with_empty_group_by() {
2224+
// Test that filters can be pushed down when GROUP BY is empty (no grouping columns)
2225+
// SELECT count(*) as cnt FROM table WHERE a = 'foo'
2226+
// There are no grouping columns, so the filter should still push down
2227+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
2228+
2229+
let aggregate_expr =
2230+
vec![
2231+
AggregateExprBuilder::new(count_udaf(), vec![col("c", &schema()).unwrap()])
2232+
.schema(schema())
2233+
.alias("cnt")
2234+
.build()
2235+
.map(Arc::new)
2236+
.unwrap(),
2237+
];
2238+
2239+
// Empty GROUP BY - no grouping columns
2240+
let group_by = PhysicalGroupBy::new_single(vec![]);
2241+
2242+
let aggregate = Arc::new(
2243+
AggregateExec::try_new(
2244+
AggregateMode::Final,
2245+
group_by,
2246+
aggregate_expr.clone(),
2247+
vec![None],
2248+
scan,
2249+
schema(),
2250+
)
2251+
.unwrap(),
2252+
);
2253+
2254+
// Filter on 'a'
2255+
let predicate = col_lit_predicate("a", "foo", &schema());
2256+
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
2257+
2258+
// The filter should be pushed down even with empty GROUP BY
2259+
insta::assert_snapshot!(
2260+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2261+
@r"
2262+
OptimizationTest:
2263+
input:
2264+
- FilterExec: a@0 = foo
2265+
- AggregateExec: mode=Final, gby=[], aggr=[cnt]
2266+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2267+
output:
2268+
Ok:
2269+
- AggregateExec: mode=Final, gby=[], aggr=[cnt]
2270+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2271+
"
2272+
);
2273+
}
2274+
2275+
#[test]
2276+
fn test_pushdown_with_computed_grouping_key() {
2277+
// Test filter pushdown with computed grouping expression
2278+
// SELECT (c + 1.0) as c_plus_1, count(*) FROM table WHERE c > 5.0 GROUP BY (c + 1.0)
2279+
2280+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
2281+
2282+
let predicate = Arc::new(BinaryExpr::new(
2283+
col("c", &schema()).unwrap(),
2284+
Operator::Gt,
2285+
Arc::new(Literal::new(ScalarValue::Float64(Some(5.0)))),
2286+
)) as Arc<dyn PhysicalExpr>;
2287+
let filter = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
2288+
2289+
let aggregate_expr =
2290+
vec![
2291+
AggregateExprBuilder::new(count_udaf(), vec![col("a", &schema()).unwrap()])
2292+
.schema(schema())
2293+
.alias("cnt")
2294+
.build()
2295+
.map(Arc::new)
2296+
.unwrap(),
2297+
];
2298+
2299+
let c_plus_one = Arc::new(BinaryExpr::new(
2300+
col("c", &schema()).unwrap(),
2301+
Operator::Plus,
2302+
Arc::new(Literal::new(ScalarValue::Float64(Some(1.0)))),
2303+
)) as Arc<dyn PhysicalExpr>;
2304+
2305+
let group_by =
2306+
PhysicalGroupBy::new_single(vec![(c_plus_one, "c_plus_1".to_string())]);
2307+
2308+
let plan = Arc::new(
2309+
AggregateExec::try_new(
2310+
AggregateMode::Final,
2311+
group_by,
2312+
aggregate_expr.clone(),
2313+
vec![None],
2314+
filter,
2315+
schema(),
2316+
)
2317+
.unwrap(),
2318+
);
2319+
2320+
// The filter should be pushed down because 'c' is extracted from the grouping expression (c + 1.0)
2321+
insta::assert_snapshot!(
2322+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2323+
@r"
2324+
OptimizationTest:
2325+
input:
2326+
- AggregateExec: mode=Final, gby=[c@2 + 1 as c_plus_1], aggr=[cnt]
2327+
- FilterExec: c@2 > 5
2328+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2329+
output:
2330+
Ok:
2331+
- AggregateExec: mode=Final, gby=[c@2 + 1 as c_plus_1], aggr=[cnt]
2332+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=c@2 > 5
2333+
"
2334+
);
2335+
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,8 @@ impl ExecutionPlan for AggregateExec {
10321032
CardinalityEffect::LowerEqual
10331033
}
10341034

1035+
/// Push down parent filters when possible (see implementation comment for details),
1036+
/// but do not introduce any new self filters.
10351037
fn gather_filters_for_pushdown(
10361038
&self,
10371039
_phase: FilterPushdownPhase,

0 commit comments

Comments
 (0)