Skip to content

Commit 5e4472d

Browse files
committed
Implement deduplication for join key pairs and partition expressions; add inferred predicate alias handling
1 parent c960bda commit 5e4472d

File tree

6 files changed

+296
-15
lines changed

6 files changed

+296
-15
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
1919
2020
use std::borrow::Cow;
21-
use std::collections::HashMap;
21+
use std::collections::{HashMap, HashSet};
2222
use std::sync::Arc;
2323

2424
use crate::datasource::file_format::file_type_to_format;
@@ -151,6 +151,46 @@ pub trait ExtensionPlanner {
151151
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
152152
}
153153

154+
fn expr_canonical_string(expr: &Arc<dyn PhysicalExpr>) -> String {
155+
format!("{}", expr)
156+
}
157+
158+
fn dedupe_join_key_pairs(
159+
left: Vec<Arc<dyn PhysicalExpr>>,
160+
right: Vec<Arc<dyn PhysicalExpr>>,
161+
) -> (Vec<Arc<dyn PhysicalExpr>>, Vec<Arc<dyn PhysicalExpr>>) {
162+
assert_eq!(left.len(), right.len());
163+
let mut seen = HashSet::new();
164+
let mut out_left = Vec::new();
165+
let mut out_right = Vec::new();
166+
for (l, r) in left.into_iter().zip(right.into_iter()) {
167+
let l_s = expr_canonical_string(&l);
168+
let r_s = expr_canonical_string(&r);
169+
let key = if l_s <= r_s {
170+
format!("{}={}", l_s, r_s)
171+
} else {
172+
format!("{}={}", r_s, l_s)
173+
};
174+
if seen.insert(key) {
175+
out_left.push(l);
176+
out_right.push(r);
177+
}
178+
}
179+
(out_left, out_right)
180+
}
181+
182+
fn dedupe_partition_exprs(exprs: Vec<Arc<dyn PhysicalExpr>>) -> Vec<Arc<dyn PhysicalExpr>> {
183+
let mut seen = HashSet::new();
184+
let mut out = Vec::new();
185+
for e in exprs.into_iter() {
186+
let key = expr_canonical_string(&e);
187+
if seen.insert(key) {
188+
out.push(e);
189+
}
190+
}
191+
out
192+
}
193+
154194
/// Default single node physical query planner that converts a
155195
/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
156196
///
@@ -858,6 +898,7 @@ impl DefaultPhysicalPlanner {
858898
)
859899
})
860900
.collect::<Result<Vec<_>>>()?;
901+
let runtime_expr = dedupe_partition_exprs(runtime_expr);
861902
Partitioning::Hash(runtime_expr, *n)
862903
}
863904
LogicalPartitioning::DistributeBy(_) => {
@@ -1088,6 +1129,10 @@ impl DefaultPhysicalPlanner {
10881129
Ok((l, r))
10891130
})
10901131
.collect::<Result<join_utils::JoinOn>>()?;
1132+
let (left_keys, right_keys): (Vec<_>, Vec<_>) = join_on.into_iter().unzip();
1133+
let (left_keys, right_keys) = dedupe_join_key_pairs(left_keys, right_keys);
1134+
let join_on: join_utils::JoinOn =
1135+
left_keys.into_iter().zip(right_keys).collect();
10911136

10921137
let join_filter = match filter {
10931138
Some(expr) => {
@@ -2345,6 +2390,7 @@ impl<'a> OptimizationInvariantChecker<'a> {
23452390

23462391
Ok(())
23472392
}
2393+
23482394
}
23492395

23502396
impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
@@ -3520,4 +3566,29 @@ digraph {
35203566

35213567
Ok(())
35223568
}
3569+
3570+
#[test]
3571+
fn test_dedupe_join_key_pairs() {
3572+
let l1: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
3573+
let r1: Arc<dyn PhysicalExpr> = Arc::new(Column::new("b", 0));
3574+
let l2: Arc<dyn PhysicalExpr> = Arc::new(Column::new("b", 0));
3575+
let r2: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
3576+
let (left, right) = dedupe_join_key_pairs(vec![l1.clone(), l2], vec![r1.clone(), r2]);
3577+
assert_eq!(left.len(), 1);
3578+
assert_eq!(format!("{}", left[0]), "a@0");
3579+
assert_eq!(format!("{}", right[0]), "b@0");
3580+
}
3581+
3582+
#[test]
3583+
fn test_dedupe_partition_exprs() {
3584+
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
3585+
Arc::new(Column::new("a", 0)),
3586+
Arc::new(Column::new("a", 0)),
3587+
Arc::new(Column::new("b", 1)),
3588+
];
3589+
let result = dedupe_partition_exprs(exprs);
3590+
assert_eq!(result.len(), 2);
3591+
assert_eq!(format!("{}", result[0]), "a@0");
3592+
assert_eq!(format!("{}", result[1]), "b@1");
3593+
}
35233594
}

datafusion/expr/src/expr.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use datafusion_common::tree_node::{
3838
use datafusion_common::{
3939
Column, DFSchema, HashMap, Result, ScalarValue, Spans, TableReference,
4040
};
41+
use crate::utils::INFERRED_PREDICATE_ALIAS;
4142
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
4243
use sqlparser::ast::{
4344
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
@@ -3183,7 +3184,13 @@ pub const UNNEST_COLUMN_PREFIX: &str = "UNNEST";
31833184
impl Display for Expr {
31843185
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
31853186
match self {
3186-
Expr::Alias(Alias { expr, name, .. }) => write!(f, "{expr} AS {name}"),
3187+
Expr::Alias(Alias { expr, name, .. }) => {
3188+
if name == INFERRED_PREDICATE_ALIAS {
3189+
write!(f, "{expr}")
3190+
} else {
3191+
write!(f, "{expr} AS {name}")
3192+
}
3193+
}
31873194
Expr::Column(c) => write!(f, "{c}"),
31883195
Expr::OuterReferenceColumn(_, c) => {
31893196
write!(f, "{OUTER_REFERENCE_COLUMN_PREFIX}({c})")

datafusion/expr/src/utils.rs

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ use crate::{
2828
};
2929
use datafusion_expr_common::signature::{Signature, TypeSignature};
3030

31+
/// Alias used to mark inferred join predicates that should remain join filters
32+
/// and not be converted into additional equijoin keys during optimization.
33+
pub const INFERRED_PREDICATE_ALIAS: &str = "__datafusion_inferred_join_predicate";
34+
35+
/// Returns true if expr is an alias used to mark inferred join predicates.
36+
pub fn is_inferred_alias(expr: &Expr) -> bool {
37+
matches!(expr, Expr::Alias(Alias { name, .. }) if name == INFERRED_PREDICATE_ALIAS)
38+
}
39+
3140
use arrow::datatypes::{DataType, Field, Schema};
3241
use datafusion_common::tree_node::{
3342
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
@@ -946,7 +955,9 @@ fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&
946955
let exprs = split_conjunction_impl(left, exprs);
947956
split_conjunction_impl(right, exprs)
948957
}
949-
Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs),
958+
Expr::Alias(alias) if !is_inferred_alias(expr) => {
959+
split_conjunction_impl(&alias.expr, exprs)
960+
}
950961
other => {
951962
exprs.push(other);
952963
exprs
@@ -970,7 +981,11 @@ pub fn iter_conjunction(expr: &Expr) -> impl Iterator<Item = &Expr> {
970981
stack.push(right);
971982
stack.push(left);
972983
}
973-
Expr::Alias(Alias { expr, .. }) => stack.push(expr),
984+
Expr::Alias(Alias { expr, name, .. })
985+
if name != INFERRED_PREDICATE_ALIAS =>
986+
{
987+
stack.push(expr);
988+
}
974989
other => return Some(other),
975990
}
976991
}
@@ -994,7 +1009,11 @@ pub fn iter_conjunction_owned(expr: Expr) -> impl Iterator<Item = Expr> {
9941009
stack.push(*right);
9951010
stack.push(*left);
9961011
}
997-
Expr::Alias(Alias { expr, .. }) => stack.push(*expr),
1012+
Expr::Alias(Alias { expr, name, .. })
1013+
if name != INFERRED_PREDICATE_ALIAS =>
1014+
{
1015+
stack.push(*expr);
1016+
}
9981017
other => return Some(other),
9991018
}
10001019
}
@@ -1063,8 +1082,13 @@ fn split_binary_owned_impl(
10631082
let exprs = split_binary_owned_impl(*left, operator, exprs);
10641083
split_binary_owned_impl(*right, operator, exprs)
10651084
}
1066-
Expr::Alias(Alias { expr, .. }) => {
1067-
split_binary_owned_impl(*expr, operator, exprs)
1085+
Expr::Alias(alias) => {
1086+
if is_inferred_alias(&Expr::Alias(alias.clone())) {
1087+
exprs.push(Expr::Alias(alias));
1088+
exprs
1089+
} else {
1090+
split_binary_owned_impl(*alias.expr, operator, exprs)
1091+
}
10681092
}
10691093
other => {
10701094
exprs.push(other);
@@ -1090,7 +1114,9 @@ fn split_binary_impl<'a>(
10901114
let exprs = split_binary_impl(left, operator, exprs);
10911115
split_binary_impl(right, operator, exprs)
10921116
}
1093-
Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator, exprs),
1117+
Expr::Alias(alias) if !is_inferred_alias(expr) => {
1118+
split_binary_impl(&alias.expr, operator, exprs)
1119+
}
10941120
other => {
10951121
exprs.push(other);
10961122
exprs
@@ -1648,6 +1674,17 @@ mod tests {
16481674
assert_eq!(result, vec![&expr1, &expr2]);
16491675
}
16501676

1677+
#[test]
1678+
fn test_split_conjunction_inferred_alias() {
1679+
let expr = col("a").eq(col("b")).alias(INFERRED_PREDICATE_ALIAS);
1680+
let result = split_conjunction(&expr);
1681+
assert_eq!(result.len(), 1);
1682+
match result[0] {
1683+
Expr::Alias(alias) => assert_eq!(alias.name, INFERRED_PREDICATE_ALIAS),
1684+
_ => panic!("expected alias"),
1685+
}
1686+
}
1687+
16511688
#[test]
16521689
fn test_split_conjunction_or() {
16531690
let expr = col("a").eq(lit(5)).or(col("b"));
@@ -1705,6 +1742,17 @@ mod tests {
17051742
);
17061743
}
17071744

1745+
#[test]
1746+
fn test_split_conjunction_owned_inferred_alias() {
1747+
let expr = col("a").eq(col("b")).alias(INFERRED_PREDICATE_ALIAS);
1748+
let result = split_conjunction_owned(expr);
1749+
assert_eq!(result.len(), 1);
1750+
match &result[0] {
1751+
Expr::Alias(alias) => assert_eq!(alias.name, INFERRED_PREDICATE_ALIAS),
1752+
_ => panic!("expected alias"),
1753+
}
1754+
}
1755+
17081756
#[test]
17091757
fn test_conjunction_empty() {
17101758
assert_eq!(conjunction(vec![]), None);

0 commit comments

Comments
 (0)