Skip to content

Commit 9f9f0ac

Browse files
committed
Revert "Implement inferred join predicate handling to maintain filters during optimization"
This reverts commit 8337f80.
1 parent 7468e61 commit 9f9f0ac

File tree

5 files changed

+12
-264
lines changed

5 files changed

+12
-264
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
1919
2020
use std::borrow::Cow;
21-
use std::collections::{HashMap, HashSet};
21+
use std::collections::HashMap;
2222
use std::sync::Arc;
2323

2424
use crate::datasource::file_format::file_type_to_format;
@@ -151,46 +151,6 @@ pub trait ExtensionPlanner {
151151
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
152152
}
153153

154-
fn expr_canonical_string(expr: &Arc<dyn PhysicalExpr>) -> String {
155-
format!("{}", expr)
156-
}
157-
158-
fn dedupe_join_key_pairs(
159-
left: Vec<Arc<dyn PhysicalExpr>>,
160-
right: Vec<Arc<dyn PhysicalExpr>>,
161-
) -> (Vec<Arc<dyn PhysicalExpr>>, Vec<Arc<dyn PhysicalExpr>>) {
162-
assert_eq!(left.len(), right.len());
163-
let mut seen = HashSet::new();
164-
let mut out_left = Vec::new();
165-
let mut out_right = Vec::new();
166-
for (l, r) in left.into_iter().zip(right.into_iter()) {
167-
let l_s = expr_canonical_string(&l);
168-
let r_s = expr_canonical_string(&r);
169-
let key = if l_s <= r_s {
170-
format!("{}={}", l_s, r_s)
171-
} else {
172-
format!("{}={}", r_s, l_s)
173-
};
174-
if seen.insert(key) {
175-
out_left.push(l);
176-
out_right.push(r);
177-
}
178-
}
179-
(out_left, out_right)
180-
}
181-
182-
fn dedupe_partition_exprs(exprs: Vec<Arc<dyn PhysicalExpr>>) -> Vec<Arc<dyn PhysicalExpr>> {
183-
let mut seen = HashSet::new();
184-
let mut out = Vec::new();
185-
for e in exprs.into_iter() {
186-
let key = expr_canonical_string(&e);
187-
if seen.insert(key) {
188-
out.push(e);
189-
}
190-
}
191-
out
192-
}
193-
194154
/// Default single node physical query planner that converts a
195155
/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
196156
///
@@ -898,7 +858,6 @@ impl DefaultPhysicalPlanner {
898858
)
899859
})
900860
.collect::<Result<Vec<_>>>()?;
901-
let runtime_expr = dedupe_partition_exprs(runtime_expr);
902861
Partitioning::Hash(runtime_expr, *n)
903862
}
904863
LogicalPartitioning::DistributeBy(_) => {
@@ -1129,10 +1088,6 @@ impl DefaultPhysicalPlanner {
11291088
Ok((l, r))
11301089
})
11311090
.collect::<Result<join_utils::JoinOn>>()?;
1132-
let (left_keys, right_keys): (Vec<_>, Vec<_>) = join_on.into_iter().unzip();
1133-
let (left_keys, right_keys) = dedupe_join_key_pairs(left_keys, right_keys);
1134-
let join_on: join_utils::JoinOn =
1135-
left_keys.into_iter().zip(right_keys).collect();
11361091

11371092
let join_filter = match filter {
11381093
Some(expr) => {
@@ -2390,7 +2345,6 @@ impl<'a> OptimizationInvariantChecker<'a> {
23902345

23912346
Ok(())
23922347
}
2393-
23942348
}
23952349

23962350
impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
@@ -3566,29 +3520,4 @@ digraph {
35663520

35673521
Ok(())
35683522
}
3569-
3570-
#[test]
3571-
fn test_dedupe_join_key_pairs() {
3572-
let l1: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
3573-
let r1: Arc<dyn PhysicalExpr> = Arc::new(Column::new("b", 0));
3574-
let l2: Arc<dyn PhysicalExpr> = Arc::new(Column::new("b", 0));
3575-
let r2: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
3576-
let (left, right) = dedupe_join_key_pairs(vec![l1.clone(), l2], vec![r1.clone(), r2]);
3577-
assert_eq!(left.len(), 1);
3578-
assert_eq!(format!("{}", left[0]), "a@0");
3579-
assert_eq!(format!("{}", right[0]), "b@0");
3580-
}
3581-
3582-
#[test]
3583-
fn test_dedupe_partition_exprs() {
3584-
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
3585-
Arc::new(Column::new("a", 0)),
3586-
Arc::new(Column::new("a", 0)),
3587-
Arc::new(Column::new("b", 1)),
3588-
];
3589-
let result = dedupe_partition_exprs(exprs);
3590-
assert_eq!(result.len(), 2);
3591-
assert_eq!(format!("{}", result[0]), "a@0");
3592-
assert_eq!(format!("{}", result[1]), "b@1");
3593-
}
35943523
}

datafusion/expr/src/expr.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use datafusion_common::tree_node::{
3838
use datafusion_common::{
3939
Column, DFSchema, HashMap, Result, ScalarValue, Spans, TableReference,
4040
};
41-
use crate::utils::INFERRED_PREDICATE_ALIAS;
4241
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
4342
use sqlparser::ast::{
4443
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
@@ -3184,13 +3183,7 @@ pub const UNNEST_COLUMN_PREFIX: &str = "UNNEST";
31843183
impl Display for Expr {
31853184
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
31863185
match self {
3187-
Expr::Alias(Alias { expr, name, .. }) => {
3188-
if name == INFERRED_PREDICATE_ALIAS {
3189-
write!(f, "{expr}")
3190-
} else {
3191-
write!(f, "{expr} AS {name}")
3192-
}
3193-
}
3186+
Expr::Alias(Alias { expr, name, .. }) => write!(f, "{expr} AS {name}"),
31943187
Expr::Column(c) => write!(f, "{c}"),
31953188
Expr::OuterReferenceColumn(_, c) => {
31963189
write!(f, "{OUTER_REFERENCE_COLUMN_PREFIX}({c})")

datafusion/expr/src/utils.rs

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,6 @@ use crate::{
2828
};
2929
use datafusion_expr_common::signature::{Signature, TypeSignature};
3030

31-
/// Alias used to mark inferred join predicates that should remain join filters
32-
/// and not be converted into additional equijoin keys during optimization.
33-
pub const INFERRED_PREDICATE_ALIAS: &str = "__datafusion_inferred_join_predicate";
34-
35-
/// Returns true if expr is an alias used to mark inferred join predicates.
36-
pub fn is_inferred_alias(expr: &Expr) -> bool {
37-
matches!(expr, Expr::Alias(Alias { name, .. }) if name == INFERRED_PREDICATE_ALIAS)
38-
}
39-
4031
use arrow::datatypes::{DataType, Field, Schema};
4132
use datafusion_common::tree_node::{
4233
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
@@ -955,9 +946,7 @@ fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&
955946
let exprs = split_conjunction_impl(left, exprs);
956947
split_conjunction_impl(right, exprs)
957948
}
958-
Expr::Alias(alias) if !is_inferred_alias(expr) => {
959-
split_conjunction_impl(&alias.expr, exprs)
960-
}
949+
Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs),
961950
other => {
962951
exprs.push(other);
963952
exprs
@@ -1074,13 +1063,8 @@ fn split_binary_owned_impl(
10741063
let exprs = split_binary_owned_impl(*left, operator, exprs);
10751064
split_binary_owned_impl(*right, operator, exprs)
10761065
}
1077-
Expr::Alias(alias) => {
1078-
if is_inferred_alias(&Expr::Alias(alias.clone())) {
1079-
exprs.push(Expr::Alias(alias));
1080-
exprs
1081-
} else {
1082-
split_binary_owned_impl(*alias.expr, operator, exprs)
1083-
}
1066+
Expr::Alias(Alias { expr, .. }) => {
1067+
split_binary_owned_impl(*expr, operator, exprs)
10841068
}
10851069
other => {
10861070
exprs.push(other);
@@ -1106,9 +1090,7 @@ fn split_binary_impl<'a>(
11061090
let exprs = split_binary_impl(left, operator, exprs);
11071091
split_binary_impl(right, operator, exprs)
11081092
}
1109-
Expr::Alias(alias) if !is_inferred_alias(expr) => {
1110-
split_binary_impl(&alias.expr, operator, exprs)
1111-
}
1093+
Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator, exprs),
11121094
other => {
11131095
exprs.push(other);
11141096
exprs
@@ -1666,17 +1648,6 @@ mod tests {
16661648
assert_eq!(result, vec![&expr1, &expr2]);
16671649
}
16681650

1669-
#[test]
1670-
fn test_split_conjunction_inferred_alias() {
1671-
let expr = col("a").eq(col("b")).alias(INFERRED_PREDICATE_ALIAS);
1672-
let result = split_conjunction(&expr);
1673-
assert_eq!(result.len(), 1);
1674-
match result[0] {
1675-
Expr::Alias(alias) => assert_eq!(alias.name, INFERRED_PREDICATE_ALIAS),
1676-
_ => panic!("expected alias"),
1677-
}
1678-
}
1679-
16801651
#[test]
16811652
fn test_split_conjunction_or() {
16821653
let expr = col("a").eq(lit(5)).or(col("b"));

datafusion/optimizer/src/extract_equijoin_predicate.rs

Lines changed: 5 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ use datafusion_common::tree_node::Transformed;
2222
use datafusion_common::DFSchema;
2323
use datafusion_common::Result;
2424
use datafusion_expr::utils::split_conjunction_owned;
25-
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair, is_inferred_alias};
25+
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
2626
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
27-
use std::collections::HashSet;
2827
// equijoin predicate
2928
type EquijoinPredicate = (Expr, Expr);
3029

@@ -82,17 +81,14 @@ impl OptimizerRule for ExtractEquijoinPredicate {
8281
let right_schema = right.schema();
8382
let (equijoin_predicates, non_equijoin_expr) =
8483
split_eq_and_noneq_join_predicate(expr, left_schema, right_schema)?;
85-
let has_new_keys = !equijoin_predicates.is_empty();
8684

87-
on.extend(equijoin_predicates);
88-
let filter = residual_minus_on(non_equijoin_expr, &on);
89-
90-
if has_new_keys {
85+
if !equijoin_predicates.is_empty() {
86+
on.extend(equijoin_predicates);
9187
Ok(Transformed::yes(LogicalPlan::Join(Join {
9288
left,
9389
right,
9490
on,
95-
filter,
91+
filter: non_equijoin_expr,
9692
join_type,
9793
join_constraint,
9894
schema,
@@ -103,7 +99,7 @@ impl OptimizerRule for ExtractEquijoinPredicate {
10399
left,
104100
right,
105101
on,
106-
filter,
102+
filter: non_equijoin_expr,
107103
join_type,
108104
join_constraint,
109105
schema,
@@ -126,10 +122,6 @@ fn split_eq_and_noneq_join_predicate(
126122
let mut accum_join_keys: Vec<(Expr, Expr)> = vec![];
127123
let mut accum_filters: Vec<Expr> = vec![];
128124
for expr in exprs {
129-
if is_inferred_alias(&expr) {
130-
accum_filters.push(expr);
131-
continue;
132-
}
133125
match expr {
134126
Expr::BinaryExpr(BinaryExpr {
135127
ref left,
@@ -160,77 +152,17 @@ fn split_eq_and_noneq_join_predicate(
160152
Ok((accum_join_keys, result_filter))
161153
}
162154

163-
fn residual_minus_on(filter: Option<Expr>, on: &[(Expr, Expr)]) -> Option<Expr> {
164-
let filter = filter?;
165-
let exprs = split_conjunction_owned(filter);
166-
167-
let mut on_set: HashSet<(String, String)> = HashSet::new();
168-
for (l, r) in on {
169-
let (a, b) = canonical_pair(l, r);
170-
on_set.insert((a.clone(), b.clone()));
171-
}
172-
173-
let remaining: Vec<Expr> = exprs
174-
.into_iter()
175-
.filter(|e| !is_self_equality(e))
176-
.filter(|e| {
177-
let inner = match e {
178-
Expr::Alias(alias) => alias.expr.as_ref(),
179-
_ => e,
180-
};
181-
if let Expr::BinaryExpr(BinaryExpr { left, op: Operator::Eq, right }) = inner {
182-
let (a, b) = canonical_pair(left, right);
183-
!on_set.contains(&(a, b))
184-
} else {
185-
true
186-
}
187-
})
188-
.collect();
189-
190-
remaining.into_iter().reduce(Expr::and)
191-
}
192-
193-
fn canonical_pair(left: &Expr, right: &Expr) -> (String, String) {
194-
let l = canonical_str(left);
195-
let r = canonical_str(right);
196-
if l <= r { (l, r) } else { (r, l) }
197-
}
198-
199-
fn canonical_str(expr: &Expr) -> String {
200-
match expr {
201-
Expr::Alias(alias) => canonical_str(&alias.expr),
202-
_ => format!("{}", expr),
203-
}
204-
}
205-
206-
fn is_self_equality(expr: &Expr) -> bool {
207-
let inner = match expr {
208-
Expr::Alias(alias) => alias.expr.as_ref(),
209-
_ => expr,
210-
};
211-
match inner {
212-
Expr::BinaryExpr(BinaryExpr { left, op: Operator::Eq, right }) => left.as_ref() == right.as_ref(),
213-
_ => false,
214-
}
215-
}
216-
217-
218155
#[cfg(test)]
219156
mod tests {
220157
use super::*;
221158
use crate::assert_optimized_plan_eq_display_indent_snapshot;
222159
use crate::test::*;
223-
use crate::{Optimizer, OptimizerContext};
224160
use arrow::datatypes::DataType;
225-
use datafusion_common::NullEquality;
226161
use datafusion_expr::{
227162
col, lit, logical_plan::builder::LogicalPlanBuilder, JoinType,
228-
logical_plan::JoinConstraint,
229163
};
230164
use std::sync::Arc;
231165

232-
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
233-
234166
macro_rules! assert_optimized_plan_equal {
235167
(
236168
$plan:expr,
@@ -287,31 +219,6 @@ mod tests {
287219
)
288220
}
289221

290-
#[test]
291-
fn residual_minus_on_removes_symmetric_dup() -> Result<()> {
292-
let left = test_table_scan_with_name("l")?;
293-
let right = test_table_scan_with_name("r")?;
294-
let on = vec![(col("l.a"), col("r.a"))];
295-
let filter = Some(col("r.a").eq(col("l.a")).and(col("l.a").eq(col("l.a"))));
296-
let join = Join::try_new(
297-
Arc::new(left),
298-
Arc::new(right),
299-
on,
300-
filter,
301-
JoinType::Inner,
302-
JoinConstraint::On,
303-
NullEquality::NullEqualsNull,
304-
)?;
305-
let optimizer = Optimizer::with_rules(vec![Arc::new(ExtractEquijoinPredicate::new())]);
306-
let optimized = optimizer.optimize(LogicalPlan::Join(join), &OptimizerContext::new(), observe)?;
307-
if let LogicalPlan::Join(j) = optimized {
308-
assert!(j.filter.is_none());
309-
} else {
310-
panic!("expected join");
311-
}
312-
Ok(())
313-
}
314-
315222
#[test]
316223
fn join_with_only_none_equi_predicate() -> Result<()> {
317224
let t1 = test_table_scan_with_name("t1")?;

0 commit comments

Comments
 (0)