Skip to content

Commit 794f121

Browse files
committed
Avoid blocking all filters when one is unsafe
1 parent 66140ed commit 794f121

File tree

1 file changed

+50
-40
lines changed
  • datafusion/physical-plan/src/aggregates

1 file changed

+50
-40
lines changed

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,60 +1046,70 @@ impl ExecutionPlan for AggregateExec {
10461046
// This optimization is NOT safe for filters on aggregated columns (like filtering on
10471047
// the result of SUM or COUNT), as those require computing all groups first.
10481048

1049-
let filter_columns: HashSet<_> =
1050-
parent_filters.iter().flat_map(collect_columns).collect();
1051-
10521049
let grouping_columns: HashSet<_> = self
10531050
.group_by
10541051
.expr()
10551052
.iter()
10561053
.flat_map(|(expr, _)| collect_columns(expr))
10571054
.collect();
10581055

1059-
// Check if filters reference non-grouping columns
1060-
if !grouping_columns.is_empty() && !filter_columns.is_subset(&grouping_columns) {
1061-
let unsupported_filters = parent_filters
1062-
.into_iter()
1063-
.map(PushedDownPredicate::unsupported)
1064-
.collect();
1065-
return Ok(FilterDescription::new().with_child(ChildFilterDescription {
1066-
parent_filters: unsupported_filters,
1067-
self_filters: vec![],
1068-
}));
1069-
}
1056+
// Analyze each filter separately to determine if it can be pushed down
1057+
let mut safe_filters = Vec::new();
1058+
let mut unsafe_filters = Vec::new();
10701059

1071-
// For GROUPING SETS, verify filtered columns appear in all grouping sets
1072-
if self.group_by.groups().len() > 1 {
1073-
let filter_column_indices: HashSet<usize> = filter_columns
1074-
.iter()
1075-
.filter_map(|filter_col| {
1076-
self.group_by
1077-
.expr()
1078-
.iter()
1079-
.position(|(expr, _)| collect_columns(expr).contains(filter_col))
1080-
})
1081-
.collect();
1060+
for filter in parent_filters {
1061+
let filter_columns: HashSet<_> =
1062+
collect_columns(&filter).into_iter().collect();
10821063

1083-
// Check if any filtered column is missing from any grouping set
1084-
let has_missing_column = self.group_by.groups().iter().any(|null_mask| {
1085-
filter_column_indices
1086-
.iter()
1087-
.any(|&idx| null_mask.get(idx) == Some(&true))
1088-
});
1064+
// Check if this filter references non-grouping columns
1065+
let references_non_grouping = !grouping_columns.is_empty()
1066+
&& !filter_columns.is_subset(&grouping_columns);
1067+
1068+
if references_non_grouping {
1069+
unsafe_filters.push(filter);
1070+
continue;
1071+
}
10891072

1090-
if has_missing_column {
1091-
let unsupported_filters = parent_filters
1092-
.into_iter()
1093-
.map(PushedDownPredicate::unsupported)
1073+
// For GROUPING SETS, verify this filter's columns appear in all grouping sets
1074+
if self.group_by.groups().len() > 1 {
1075+
let filter_column_indices: HashSet<usize> = filter_columns
1076+
.iter()
1077+
.filter_map(|filter_col| {
1078+
self.group_by.expr().iter().position(|(expr, _)| {
1079+
collect_columns(expr).contains(filter_col)
1080+
})
1081+
})
10941082
.collect();
1095-
return Ok(FilterDescription::new().with_child(ChildFilterDescription {
1096-
parent_filters: unsupported_filters,
1097-
self_filters: vec![],
1098-
}));
1083+
1084+
// Check if any of this filter's columns are missing from any grouping set
1085+
let has_missing_column = self.group_by.groups().iter().any(|null_mask| {
1086+
filter_column_indices
1087+
.iter()
1088+
.any(|&idx| null_mask.get(idx) == Some(&true))
1089+
});
1090+
1091+
if has_missing_column {
1092+
unsafe_filters.push(filter);
1093+
continue;
1094+
}
10991095
}
1096+
1097+
// This filter is safe to push down
1098+
safe_filters.push(filter);
11001099
}
11011100

1102-
FilterDescription::from_children(parent_filters, &self.children())
1101+
// Build child filter description with both safe and unsafe filters
1102+
let child = self.children()[0];
1103+
let mut child_desc = ChildFilterDescription::from_child(&safe_filters, child)?;
1104+
1105+
// Add unsafe filters as unsupported
1106+
child_desc.parent_filters.extend(
1107+
unsafe_filters
1108+
.into_iter()
1109+
.map(PushedDownPredicate::unsupported),
1110+
);
1111+
1112+
Ok(FilterDescription::new().with_child(child_desc))
11031113
}
11041114
}
11051115

0 commit comments

Comments
 (0)