Skip to content

Commit 6dddfc6

Browse files
committed
Add INFERRED_PREDICATE_ALIAS to manage inferred join predicates and ensure they remain as filters during optimization
1 parent c960bda commit 6dddfc6

File tree

3 files changed

+66
-3
lines changed

3 files changed

+66
-3
lines changed

datafusion/expr/src/expr.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use datafusion_common::tree_node::{
3838
use datafusion_common::{
3939
Column, DFSchema, HashMap, Result, ScalarValue, Spans, TableReference,
4040
};
41+
use crate::utils::INFERRED_PREDICATE_ALIAS;
4142
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
4243
use sqlparser::ast::{
4344
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
@@ -3183,7 +3184,13 @@ pub const UNNEST_COLUMN_PREFIX: &str = "UNNEST";
31833184
impl Display for Expr {
31843185
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
31853186
match self {
3186-
Expr::Alias(Alias { expr, name, .. }) => write!(f, "{expr} AS {name}"),
3187+
Expr::Alias(Alias { expr, name, .. }) => {
3188+
if name == INFERRED_PREDICATE_ALIAS {
3189+
write!(f, "{expr}")
3190+
} else {
3191+
write!(f, "{expr} AS {name}")
3192+
}
3193+
}
31873194
Expr::Column(c) => write!(f, "{c}"),
31883195
Expr::OuterReferenceColumn(_, c) => {
31893196
write!(f, "{OUTER_REFERENCE_COLUMN_PREFIX}({c})")

datafusion/expr/src/utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ use crate::{
2828
};
2929
use datafusion_expr_common::signature::{Signature, TypeSignature};
3030

31+
/// Alias used to mark inferred join predicates that should remain join filters
32+
/// and not be converted into additional equijoin keys during optimization.
33+
pub const INFERRED_PREDICATE_ALIAS: &str = "__datafusion_inferred_join_predicate";
34+
3135
use arrow::datatypes::{DataType, Field, Schema};
3236
use datafusion_common::tree_node::{
3337
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
@@ -1063,7 +1067,7 @@ fn split_binary_owned_impl(
10631067
let exprs = split_binary_owned_impl(*left, operator, exprs);
10641068
split_binary_owned_impl(*right, operator, exprs)
10651069
}
1066-
Expr::Alias(Alias { expr, .. }) => {
1070+
Expr::Alias(Alias { expr, name, .. }) if name != INFERRED_PREDICATE_ALIAS => {
10671071
split_binary_owned_impl(*expr, operator, exprs)
10681072
}
10691073
other => {

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion_expr::expr_rewriter::replace_col;
3535
use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, TableScan, Union};
3636
use datafusion_expr::utils::{
3737
conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
38+
INFERRED_PREDICATE_ALIAS,
3839
};
3940
use datafusion_expr::{
4041
and, or, BinaryExpr, Expr, Filter, Operator, Projection, TableProviderFilterPushDown,
@@ -463,7 +464,11 @@ fn push_down_all_join(
463464
} else if right_preserved && checker.is_right_only(&predicate) {
464465
right_push.push(predicate);
465466
} else {
466-
join_conditions.push(predicate);
467+
// Mark inferred predicates so subsequent optimization passes do not
468+
// treat them as additional equijoin keys. They should remain as
469+
// residual join filters to enable dynamic filtering without
470+
// widening the join key set.
471+
join_conditions.push(predicate.alias(INFERRED_PREDICATE_ALIAS));
467472
}
468473
}
469474

@@ -1487,6 +1492,53 @@ mod tests {
14871492
}};
14881493
}
14891494

1495+
#[test]
1496+
fn inferred_predicate_stays_in_filter() -> Result<()> {
1497+
use datafusion_common::NullEquality;
1498+
use datafusion_expr::logical_plan::JoinConstraint;
1499+
1500+
let left = test_table_scan_with_name("l")?;
1501+
let right = test_table_scan_with_name("r")?;
1502+
1503+
let join = Join::try_new(
1504+
Arc::new(left),
1505+
Arc::new(right),
1506+
vec![],
1507+
None,
1508+
JoinType::Left,
1509+
JoinConstraint::On,
1510+
NullEquality::NullEqualsNull,
1511+
)?;
1512+
1513+
let inferred = col("l.a").eq(col("r.a"));
1514+
let Transformed { data: plan, .. } =
1515+
push_down_all_join(vec![], vec![inferred], join, vec![])?;
1516+
1517+
let join = match plan {
1518+
LogicalPlan::Join(j) => j,
1519+
_ => panic!("expected join"),
1520+
};
1521+
1522+
assert!(join.on.is_empty());
1523+
let filter = join.filter.clone().expect("expected filter");
1524+
match filter {
1525+
Expr::Alias(alias) => assert_eq!(alias.name, INFERRED_PREDICATE_ALIAS),
1526+
_ => panic!("expected aliased filter"),
1527+
}
1528+
1529+
let optimizer = Optimizer::with_rules(vec![
1530+
Arc::new(SimplifyExpressions::new()),
1531+
Arc::new(crate::extract_equijoin_predicate::ExtractEquijoinPredicate::new()),
1532+
]);
1533+
let optimized = optimizer.optimize(LogicalPlan::Join(join), &OptimizerContext::new(), observe)?;
1534+
if let LogicalPlan::Join(j) = optimized {
1535+
assert!(j.on.is_empty());
1536+
} else {
1537+
panic!("expected join");
1538+
}
1539+
Ok(())
1540+
}
1541+
14901542
#[test]
14911543
fn filter_before_projection() -> Result<()> {
14921544
let table_scan = test_table_scan()?;

0 commit comments

Comments
 (0)