From c2028146462e74ffa7cd50b75826615abf820c34 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:08:16 -0500 Subject: [PATCH 1/5] support filter pushdown with left-semi joins in HashJoinExec --- .../physical_optimizer/filter_pushdown/mod.rs | 65 ++++++++++++++++ .../physical-plan/src/joins/hash_join.rs | 77 +++++++++++++++++-- 2 files changed, 136 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 1639960fdeac..0aa7218dedf8 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -48,10 +48,12 @@ use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, filter::FilterExec, + joins::{HashJoinExec, PartitionMode}, repartition::RepartitionExec, sorts::sort::SortExec, ExecutionPlan, }; +use datafusion_common::{JoinType}; use futures::StreamExt; use object_store::{memory::InMemory, ObjectStore}; @@ -424,6 +426,69 @@ async fn test_static_filter_pushdown_through_hash_join() { ); } +#[test] +fn test_filter_pushdown_left_semi_join() { + // Create schemas for left and right sides + let left_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let right_side_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("e", DataType::Utf8, false), + Field::new("f", DataType::Float64, false), + ])); + + let left_scan = TestScanBuilder::new(Arc::clone(&left_side_schema)) + .with_support(true) + .build(); + let right_scan = TestScanBuilder::new(Arc::clone(&right_side_schema)) + .with_support(true) + .build(); + + let on = vec![( + col("a", &left_side_schema).unwrap(), + col("d", &right_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + left_scan, + right_scan, + on, + None, + &JoinType::LeftSemi, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + let filter = col_lit_predicate("a", "aa", &join_schema); + let plan = + Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc; + + // Test that filters ARE pushed down for left semi join when they reference only left side + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + " + ); +} + #[test] fn test_filter_collapse() { // filter should be pushed down into the parquet scan with two filters diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d3999c6cd824..5eccc504b9c5 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -86,6 +86,7 @@ use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use datafusion_physical_expr_common::datum::compare_op_for_nested; @@ -369,6 +370,34 @@ pub struct HashJoinExec { dynamic_filter: Arc, } +/// Helper function copied from logical optimizer: For a given JOIN type, determine whether each input of the join is preserved +/// for WHERE clause filters (predicates above the join). +/// +/// It is only correct to push filters below a join for preserved inputs. +/// +/// For left semi joins: (true, false) - meaning left side is preserved, right side is not +fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { + match join_type { + JoinType::Inner => (true, true), + JoinType::Left => (true, false), + JoinType::Right => (false, true), + JoinType::Full => (false, false), + // No columns from the right side of the join can be referenced in output + // predicates for semi/anti joins, so whether we specify t/f doesn't matter. + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), + // No columns from the left side of the join can be referenced in output + // predicates for semi/anti joins, so whether we specify t/f doesn't matter. + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true), + } +} + +/// Check if a physical expression only references columns from the left child +fn is_left_only_predicate(predicate: &Arc, left_schema_len: usize) -> bool { + let columns = collect_columns(predicate); + // All column indices must be less than left_schema_len + columns.iter().all(|col| col.index() < left_schema_len) +} + impl HashJoinExec { /// Tries to create a new [HashJoinExec]. /// @@ -1011,25 +1040,29 @@ impl ExecutionPlan for HashJoinExec { } } + fn gather_filters_for_pushdown( &self, phase: FilterPushdownPhase, parent_filters: Vec>, config: &ConfigOptions, ) -> Result { + // Check if this join type supports filter pushdown + let (_left_preserved, _right_preserved) = lr_is_preserved(self.join_type); + + // For now, only support Inner and LeftSemi joins // Other types of joins can support *some* filters, but restrictions are complex and error prone. - // For now we don't support them. // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs // See https://github.com/apache/datafusion/issues/16973 for tracking. - if self.join_type != JoinType::Inner { + if !matches!(self.join_type, JoinType::Inner | JoinType::LeftSemi) { return Ok(FilterDescription::all_unsupported( &parent_filters, &self.children(), )); } - // Get basic filter descriptions for both children - let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( + // Get basic filter descriptions for both children (for Inner joins, this works as before) + let mut left_child = crate::filter_pushdown::ChildFilterDescription::from_child( &parent_filters, self.left(), )?; @@ -1038,6 +1071,37 @@ impl ExecutionPlan for HashJoinExec { self.right(), )?; + // For left semi joins, override the analysis to only allow left-side filters to go to left child + if self.join_type == JoinType::LeftSemi { + let left_schema_len = self.left().schema().fields().len(); + + // Create new parent filters list with modified support based on column analysis + let mut new_left_parent_filters = Vec::new(); + let mut new_right_parent_filters = Vec::new(); + + for (i, filter) in parent_filters.iter().enumerate() { + // For left child: can only push filters that reference only left-side columns + if is_left_only_predicate(filter, left_schema_len) { + // Check if original analysis said it was supported + if let Some(orig_filter) = left_child.parent_filters.get(i) { + new_left_parent_filters.push(orig_filter.clone()); + } else { + new_left_parent_filters.push(crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone())); + } + } else { + // Cannot push this filter to left child for left semi join + new_left_parent_filters.push(crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone())); + } + + // For right child: left semi joins cannot push any parent filters to right child + new_right_parent_filters.push(crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone())); + } + + // Update the child descriptions + left_child.parent_filters = new_left_parent_filters; + right_child.parent_filters = new_right_parent_filters; + } + // Add dynamic filters in Post phase if enabled if matches!(phase, FilterPushdownPhase::Post) && config.optimizer.enable_dynamic_filter_pushdown @@ -1059,11 +1123,12 @@ impl ExecutionPlan for HashJoinExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { + // For now, only support Inner and LeftSemi joins // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for - // non-inner joins in `gather_filters_for_pushdown`. + // unsupported join types in `gather_filters_for_pushdown`. // However it's a cheap check and serves to inform future devs touching this function that they need to be really // careful pushing down filters through non-inner joins. - if self.join_type != JoinType::Inner { + if !matches!(self.join_type, JoinType::Inner | JoinType::LeftSemi) { // Other types of joins can support *some* filters, but restrictions are complex and error prone. // For now we don't support them. // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs From 5e7ffc1446bad2a4bda9dfc85085446dd3c619ea Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:11:12 -0500 Subject: [PATCH 2/5] fmt --- .../physical_optimizer/filter_pushdown/mod.rs | 2 +- .../physical-plan/src/joins/hash_join.rs | 38 +++++++++++++------ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 0aa7218dedf8..02fc821fe9a3 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -33,6 +33,7 @@ use datafusion::{ scalar::ScalarValue, }; use datafusion_common::config::ConfigOptions; +use datafusion_common::JoinType; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::ScalarUDF; use datafusion_functions::math::random::RandomFunc; @@ -53,7 +54,6 @@ use datafusion_physical_plan::{ sorts::sort::SortExec, ExecutionPlan, }; -use datafusion_common::{JoinType}; use futures::StreamExt; use object_store::{memory::InMemory, ObjectStore}; diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 5eccc504b9c5..2a6cdf72fa9d 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -372,9 +372,9 @@ pub struct HashJoinExec { /// Helper function copied from logical optimizer: For a given JOIN type, determine whether each input of the join is preserved /// for WHERE clause filters (predicates above the join). -/// +/// /// It is only correct to push filters below a join for preserved inputs. -/// +/// /// For left semi joins: (true, false) - meaning left side is preserved, right side is not fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { match join_type { @@ -392,7 +392,10 @@ fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { } /// Check if a physical expression only references columns from the left child -fn is_left_only_predicate(predicate: &Arc, left_schema_len: usize) -> bool { +fn is_left_only_predicate( + predicate: &Arc, + left_schema_len: usize, +) -> bool { let columns = collect_columns(predicate); // All column indices must be less than left_schema_len columns.iter().all(|col| col.index() < left_schema_len) @@ -1040,7 +1043,6 @@ impl ExecutionPlan for HashJoinExec { } } - fn gather_filters_for_pushdown( &self, phase: FilterPushdownPhase, @@ -1049,7 +1051,7 @@ impl ExecutionPlan for HashJoinExec { ) -> Result { // Check if this join type supports filter pushdown let (_left_preserved, _right_preserved) = lr_is_preserved(self.join_type); - + // For now, only support Inner and LeftSemi joins // Other types of joins can support *some* filters, but restrictions are complex and error prone. // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs @@ -1074,11 +1076,11 @@ impl ExecutionPlan for HashJoinExec { // For left semi joins, override the analysis to only allow left-side filters to go to left child if self.join_type == JoinType::LeftSemi { let left_schema_len = self.left().schema().fields().len(); - + // Create new parent filters list with modified support based on column analysis let mut new_left_parent_filters = Vec::new(); let mut new_right_parent_filters = Vec::new(); - + for (i, filter) in parent_filters.iter().enumerate() { // For left child: can only push filters that reference only left-side columns if is_left_only_predicate(filter, left_schema_len) { @@ -1086,17 +1088,29 @@ impl ExecutionPlan for HashJoinExec { if let Some(orig_filter) = left_child.parent_filters.get(i) { new_left_parent_filters.push(orig_filter.clone()); } else { - new_left_parent_filters.push(crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone())); + new_left_parent_filters.push( + crate::filter_pushdown::PushedDownPredicate::unsupported( + filter.clone(), + ), + ); } } else { // Cannot push this filter to left child for left semi join - new_left_parent_filters.push(crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone())); + new_left_parent_filters.push( + crate::filter_pushdown::PushedDownPredicate::unsupported( + filter.clone(), + ), + ); } - + // For right child: left semi joins cannot push any parent filters to right child - new_right_parent_filters.push(crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone())); + new_right_parent_filters.push( + crate::filter_pushdown::PushedDownPredicate::unsupported( + filter.clone(), + ), + ); } - + // Update the child descriptions left_child.parent_filters = new_left_parent_filters; right_child.parent_filters = new_right_parent_filters; From aada713c73e1c79f1a55186b08f2845cac11640e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:39:34 -0500 Subject: [PATCH 3/5] fmt --- datafusion/common/src/join_type.rs | 45 +++++ .../physical_optimizer/filter_pushdown/mod.rs | 109 ++++++++++++ datafusion/optimizer/src/push_down_filter.rs | 41 +---- .../physical-plan/src/joins/hash_join.rs | 165 ++++++++++-------- 4 files changed, 246 insertions(+), 114 deletions(-) diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index d9a1478f0238..938384fd17ae 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -111,6 +111,51 @@ impl JoinType { | JoinType::RightAnti ) } + + /// Determines whether each input of the join is preserved for WHERE clause filters. + /// + /// A join input is "preserved" if every row from that input appears in at least one + /// output row. This property determines whether filters referencing only columns + /// from that input can be safely pushed below the join. + /// + /// For example: + /// - In an inner join, both sides are preserved, because each row of the output + /// maps directly to a row from each side. + /// - In a left join, the left side is preserved (we can push predicates) but + /// the right is not, because there may be rows in the output that don't + /// directly map to a row in the right input (due to nulls filling where there + /// is no match on the right). + /// - In semi joins, only the preserved side's columns appear in the output, + /// so filters can only reference and be pushed to that side. + /// + /// # Returns + /// A tuple of `(left_preserved, right_preserved)` booleans. + /// + /// # Examples + /// + /// ``` + /// use datafusion_common::JoinType; + /// + /// assert_eq!(JoinType::Inner.lr_is_preserved(), (true, true)); + /// assert_eq!(JoinType::Left.lr_is_preserved(), (true, false)); + /// assert_eq!(JoinType::LeftSemi.lr_is_preserved(), (true, false)); + /// ``` + pub fn lr_is_preserved(self) -> (bool, bool) { + match self { + JoinType::Inner => (true, true), + JoinType::Left => (true, false), + JoinType::Right => (false, true), + JoinType::Full => (false, false), + // No columns from the right side of the join can be referenced in output + // predicates for semi/anti joins, so whether we specify t/f doesn't matter. + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), + // No columns from the left side of the join can be referenced in output + // predicates for semi/anti joins, so whether we specify t/f doesn't matter. + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + (false, true) + } + } + } } impl Display for JoinType { diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 02fc821fe9a3..4be980632370 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -1247,3 +1247,112 @@ fn col_lit_predicate( Arc::new(Literal::new(scalar_value)), )) } + +#[tokio::test] +async fn test_left_semi_join_dynamic_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side (left side) with limited values + let build_batches = vec![record_batch!( + ("id", Int32, [1, 2]), + ("name", Utf8, ["Alice", "Bob"]), + ("score", Float64, [95.0, 87.0]) + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("score", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side (right side) with more values + let probe_batches = vec![record_batch!( + ("id", Int32, [1, 2, 3, 4]), + ( + "department", + Utf8, + ["Engineering", "Sales", "Marketing", "HR"] + ), + ("budget", Float64, [100000.0, 80000.0, 60000.0, 50000.0]) + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("department", DataType::Utf8, false), + Field::new("budget", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec with LeftSemi join type + let on = vec![( + col("id", &build_side_schema).unwrap(), + col("id", &probe_side_schema).unwrap(), + )]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::LeftSemi, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ) as Arc; + + // Verify that dynamic filter pushdown creates the expected plan structure + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, name, score], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, department, budget], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, name, score], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, department, budget], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + ", + ); + + // Apply the optimization and execute to see the actual filter bounds + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Execute one batch to populate the dynamic filter + stream.next().await.unwrap().unwrap(); + + // Verify that the dynamic filter shows the expected bounds for left semi join + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(id@0, id@0)], filter=[id@0 >= 1 AND id@0 <= 2] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, name, score], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, department, budget], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ id@0 >= 1 AND id@0 <= 2 ] + " + ); +} diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 35ec7d074d5f..377dd93570e1 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -135,45 +135,6 @@ use crate::{OptimizerConfig, OptimizerRule}; #[derive(Default, Debug)] pub struct PushDownFilter {} -/// For a given JOIN type, determine whether each input of the join is preserved -/// for post-join (`WHERE` clause) filters. -/// -/// It is only correct to push filters below a join for preserved inputs. -/// -/// # Return Value -/// A tuple of booleans - (left_preserved, right_preserved). -/// -/// # "Preserved" input definition -/// -/// We say a join side is preserved if the join returns all or a subset of the rows from -/// the relevant side, such that each row of the output table directly maps to a row of -/// the preserved input table. If a table is not preserved, it can provide extra null rows. -/// That is, there may be rows in the output table that don't directly map to a row in the -/// input table. -/// -/// For example: -/// - In an inner join, both sides are preserved, because each row of the output -/// maps directly to a row from each side. -/// -/// - In a left join, the left side is preserved (we can push predicates) but -/// the right is not, because there may be rows in the output that don't -/// directly map to a row in the right input (due to nulls filling where there -/// is no match on the right). -pub(crate) fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { - match join_type { - JoinType::Inner => (true, true), - JoinType::Left => (true, false), - JoinType::Right => (false, true), - JoinType::Full => (false, false), - // No columns from the right side of the join can be referenced in output - // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), - // No columns from the left side of the join can be referenced in output - // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true), - } -} - /// For a given JOIN type, determine whether each input of the join is preserved /// for the join condition (`ON` clause filters). /// @@ -426,7 +387,7 @@ fn push_down_all_join( ) -> Result> { let is_inner_join = join.join_type == JoinType::Inner; // Get pushable predicates from current optimizer state - let (left_preserved, right_preserved) = lr_is_preserved(join.join_type); + let (left_preserved, right_preserved) = join.join_type.lr_is_preserved(); // The predicates can be divided to three categories: // 1) can push through join to its children(left or right) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 2a6cdf72fa9d..03c400d81f9f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -370,27 +370,6 @@ pub struct HashJoinExec { dynamic_filter: Arc, } -/// Helper function copied from logical optimizer: For a given JOIN type, determine whether each input of the join is preserved -/// for WHERE clause filters (predicates above the join). -/// -/// It is only correct to push filters below a join for preserved inputs. -/// -/// For left semi joins: (true, false) - meaning left side is preserved, right side is not -fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { - match join_type { - JoinType::Inner => (true, true), - JoinType::Left => (true, false), - JoinType::Right => (false, true), - JoinType::Full => (false, false), - // No columns from the right side of the join can be referenced in output - // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), - // No columns from the left side of the join can be referenced in output - // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true), - } -} - /// Check if a physical expression only references columns from the left child fn is_left_only_predicate( predicate: &Arc, @@ -401,6 +380,92 @@ fn is_left_only_predicate( columns.iter().all(|col| col.index() < left_schema_len) } +/// Create child filter descriptions that respect join semantics +/// +/// This function analyzes parent filters and determines which ones can be pushed +/// to each child based on the join type's preservation properties and column analysis. +fn create_join_aware_child_descriptions( + parent_filters: &[Arc], + left_child: &Arc, + right_child: &Arc, + join_type: JoinType, +) -> Result<( + crate::filter_pushdown::ChildFilterDescription, + crate::filter_pushdown::ChildFilterDescription, +)> { + let (left_preserved, right_preserved) = join_type.lr_is_preserved(); + let left_schema_len = left_child.schema().fields().len(); + + // For left child: apply schema analysis + join-specific rules + let mut left_parent_filters = Vec::new(); + // For right child: apply schema analysis + join-specific rules + let mut right_parent_filters = Vec::new(); + + for filter in parent_filters { + // Left child analysis + if left_preserved && is_left_only_predicate(filter, left_schema_len) { + // Check if the filter can actually be pushed based on schema + let temp_left_desc = + crate::filter_pushdown::ChildFilterDescription::from_child( + &[filter.clone()], + left_child, + )?; + + if let Some(first_filter) = temp_left_desc.parent_filters.first() { + left_parent_filters.push(first_filter.clone()); + } else { + left_parent_filters.push( + crate::filter_pushdown::PushedDownPredicate::unsupported( + filter.clone(), + ), + ); + } + } else { + left_parent_filters.push( + crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone()), + ); + } + + // Right child analysis + if right_preserved && !is_left_only_predicate(filter, left_schema_len) { + // For right side, we need filters that DON'T reference only left columns + // Check if the filter can actually be pushed based on schema + let temp_right_desc = + crate::filter_pushdown::ChildFilterDescription::from_child( + &[filter.clone()], + right_child, + )?; + + if let Some(first_filter) = temp_right_desc.parent_filters.first() { + right_parent_filters.push(first_filter.clone()); + } else { + right_parent_filters.push( + crate::filter_pushdown::PushedDownPredicate::unsupported( + filter.clone(), + ), + ); + } + } else { + right_parent_filters.push( + crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone()), + ); + } + } + + // Create the child descriptions with the analyzed filters + let left_desc = crate::filter_pushdown::ChildFilterDescription { + parent_filters: left_parent_filters, + self_filters: vec![], + }; + + let right_desc = crate::filter_pushdown::ChildFilterDescription { + parent_filters: right_parent_filters, + self_filters: vec![], + }; + + Ok((left_desc, right_desc)) +} + impl HashJoinExec { /// Tries to create a new [HashJoinExec]. /// @@ -1049,12 +1114,9 @@ impl ExecutionPlan for HashJoinExec { parent_filters: Vec>, config: &ConfigOptions, ) -> Result { - // Check if this join type supports filter pushdown - let (_left_preserved, _right_preserved) = lr_is_preserved(self.join_type); - // For now, only support Inner and LeftSemi joins - // Other types of joins can support *some* filters, but restrictions are complex and error prone. - // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs + // Even though other join types like Left have one preserved side, the implementation + // is more complex and error prone. See the logical optimizer rules for more details. // See https://github.com/apache/datafusion/issues/16973 for tracking. if !matches!(self.join_type, JoinType::Inner | JoinType::LeftSemi) { return Ok(FilterDescription::all_unsupported( @@ -1063,59 +1125,14 @@ impl ExecutionPlan for HashJoinExec { )); } - // Get basic filter descriptions for both children (for Inner joins, this works as before) - let mut left_child = crate::filter_pushdown::ChildFilterDescription::from_child( + // Create join-aware child descriptions that respect join semantics from the start + let (left_child, mut right_child) = create_join_aware_child_descriptions( &parent_filters, self.left(), - )?; - let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child( - &parent_filters, self.right(), + self.join_type, )?; - // For left semi joins, override the analysis to only allow left-side filters to go to left child - if self.join_type == JoinType::LeftSemi { - let left_schema_len = self.left().schema().fields().len(); - - // Create new parent filters list with modified support based on column analysis - let mut new_left_parent_filters = Vec::new(); - let mut new_right_parent_filters = Vec::new(); - - for (i, filter) in parent_filters.iter().enumerate() { - // For left child: can only push filters that reference only left-side columns - if is_left_only_predicate(filter, left_schema_len) { - // Check if original analysis said it was supported - if let Some(orig_filter) = left_child.parent_filters.get(i) { - new_left_parent_filters.push(orig_filter.clone()); - } else { - new_left_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported( - filter.clone(), - ), - ); - } - } else { - // Cannot push this filter to left child for left semi join - new_left_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported( - filter.clone(), - ), - ); - } - - // For right child: left semi joins cannot push any parent filters to right child - new_right_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported( - filter.clone(), - ), - ); - } - - // Update the child descriptions - left_child.parent_filters = new_left_parent_filters; - right_child.parent_filters = new_right_parent_filters; - } - // Add dynamic filters in Post phase if enabled if matches!(phase, FilterPushdownPhase::Post) && config.optimizer.enable_dynamic_filter_pushdown From 3c001798851adba6af363d6612cf27f030e85809 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:51:10 -0500 Subject: [PATCH 4/5] lint --- datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/physical-plan/src/joins/hash_join.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 377dd93570e1..dbadedc011c7 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -143,7 +143,7 @@ pub struct PushDownFilter {} /// # Return Value /// A tuple of booleans - (left_preserved, right_preserved). /// -/// See [`lr_is_preserved`] for a definition of "preserved". +/// See [`JoinType::lr_is_preserved`] for a definition of "preserved". pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) { match join_type { JoinType::Inner => (true, true), diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 03c400d81f9f..89667977c777 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -407,7 +407,7 @@ fn create_join_aware_child_descriptions( // Check if the filter can actually be pushed based on schema let temp_left_desc = crate::filter_pushdown::ChildFilterDescription::from_child( - &[filter.clone()], + &[Arc::clone(filter)], left_child, )?; @@ -416,13 +416,13 @@ fn create_join_aware_child_descriptions( } else { left_parent_filters.push( crate::filter_pushdown::PushedDownPredicate::unsupported( - filter.clone(), + Arc::clone(filter), ), ); } } else { left_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone()), + crate::filter_pushdown::PushedDownPredicate::unsupported(Arc::clone(filter)), ); } @@ -432,7 +432,7 @@ fn create_join_aware_child_descriptions( // Check if the filter can actually be pushed based on schema let temp_right_desc = crate::filter_pushdown::ChildFilterDescription::from_child( - &[filter.clone()], + &[Arc::clone(filter)], right_child, )?; @@ -441,13 +441,13 @@ fn create_join_aware_child_descriptions( } else { right_parent_filters.push( crate::filter_pushdown::PushedDownPredicate::unsupported( - filter.clone(), + Arc::clone(filter), ), ); } } else { right_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported(filter.clone()), + crate::filter_pushdown::PushedDownPredicate::unsupported(Arc::clone(filter)), ); } } From 538aa4d3dbbf4351100a1b461f8082a37c8cced0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:54:33 -0500 Subject: [PATCH 5/5] fmt --- .../physical-plan/src/joins/hash_join.rs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 89667977c777..d6b76d7b65f0 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -415,14 +415,16 @@ fn create_join_aware_child_descriptions( left_parent_filters.push(first_filter.clone()); } else { left_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported( - Arc::clone(filter), - ), + crate::filter_pushdown::PushedDownPredicate::unsupported(Arc::clone( + filter, + )), ); } } else { left_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported(Arc::clone(filter)), + crate::filter_pushdown::PushedDownPredicate::unsupported(Arc::clone( + filter, + )), ); } @@ -440,14 +442,16 @@ fn create_join_aware_child_descriptions( right_parent_filters.push(first_filter.clone()); } else { right_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported( - Arc::clone(filter), - ), + crate::filter_pushdown::PushedDownPredicate::unsupported(Arc::clone( + filter, + )), ); } } else { right_parent_filters.push( - crate::filter_pushdown::PushedDownPredicate::unsupported(Arc::clone(filter)), + crate::filter_pushdown::PushedDownPredicate::unsupported(Arc::clone( + filter, + )), ); } }