-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Enable dynamic filter pushdown for LEFT/RIGHT/SEMI/ANTI/Mark joins; surface probe metadata in plans; add join-preservation docs #17090
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
feat: enhance predicate handling for join optimization - Retain inferred predicates that cannot be pushed through joins as join filters for dynamic filter pushdown. - Update join filter assertions in tests to reflect new logic. - Add tests for dynamic filter pushdown scenarios, including: - Left join with a filter on the preserved side. - Right join with a filter on the preserved side. - Handling filters that do not restrict nulls. ```
…ecution plan clarity
6dddfc6
to
8337f80
Compare
@@ -2730,7 +2787,7 @@ mod tests { | |||
assert_optimized_plan_equal!( | |||
plan, | |||
@r" | |||
Right Join: Using test.a = test2.a | |||
Right Join: Using test.a = test2.a Filter: test.a <= Int64(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These filters seem redundant (always true)?
9f9f0ac
to
5e4472d
Compare
f8310ec
to
5ae7863
Compare
@Dandandan , @adriangb , This PR is not ready for review yet. |
01a8636
to
4799c01
Compare
@adriangb |
…ces in filter pushdown tests
17f3d21
to
08a0e29
Compare
…accessor for TestSource
a436167
to
80c0f29
Compare
80c0f29
to
3023d88
Compare
This is a monumental piece of work, I’m astounded! Thank you so much for working on this. I’ll try to review it but I immediately will ask if we can somehow split it up into multiple logical PRs to make it easier to review and isolate future issues that may pop up. I’m also wondering if you’ve seen #17196 and #17188. I imagine this PR may fix them but it would be nice to fix those bugs first with a more targeted change / smaller PR first before adding more complexity. |
I also think it's important that we get something like #17177 in place before adding more support to avoid regressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing work overall! A lot of the diff is updating debug outputs / slt tests. I think it will help a lot to split this up into multiple PRs so that e.g. that can be reviewed separately from a smaller diff with complex logic changes.
@@ -111,6 +111,63 @@ impl JoinType { | |||
| JoinType::RightAnti | |||
) | |||
} | |||
/// Returns true if the left side of this join preserves its input rows | |||
/// for filters applied *after* the join. | |||
#[inline] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a lot of use of #[inline]
. My understanding is that without specific evidence that it helps performance it may actually sometimes hurt it and it's best to not throw it around unless it's very obvious or can be proven to help performance.
/// Returns true if the left side of this join preserves its input rows | ||
/// for filters applied *after* the join. | ||
#[inline] | ||
pub const fn preserves_left_for_output_filters(self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I arrived at this same refactor in #17153 - I think it's a good one. Can we pull this out into its own PR?
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] | | ||
| | TableScan: t2 projection=[] | | ||
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 | | ||
| | HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1], probe_side=Left, probe_keys=0 | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pull out the addition of these fields to the debug output into its own PR?
/// Number of keys currently contained in this dynamic filter. | ||
/// Uses relaxed atomics as this counter is for diagnostics only. | ||
key_count: Arc<AtomicUsize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm just reading this module I'd have no idea what key_count
is. Is it a join specific thing? What is a "key" in this context? If we actually do need this I propose we add it in an isolated PR.
@@ -217,8 +234,25 @@ impl DynamicFilterPhysicalExpr { | |||
current.expr = new_expr; | |||
// Increment the generation to indicate that the expression has changed. | |||
current.generation += 1; | |||
// Relaxed ordering is sufficient as `key_count` is only used for | |||
// observability and does not synchronize with other data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
observability is super vague - what is it actually used for?
@@ -708,6 +711,7 @@ impl<T: Clone> FilteredVec<T> { | |||
} | |||
} | |||
|
|||
#[inline] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about putting inlines without any obvious justification
# Join preservation | ||
|
||
Dynamic filter pushdown and other optimizations rely on whether a join preserves | ||
rows from its inputs. The tables below summarise which sides are preserved for | ||
post-join output filtering and for evaluation of `ON`-clause predicates. | ||
|
||
## Output filtering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much. This is an instant commit / approve (with verification that it's correct) if made as it's own PR.
[tests](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/hash_join.rs#L2033-L2049)). Formats without predicate pushdown (CSV/JSON) will not benefit. | ||
Full joins and non‑equi (range or composite) predicates are not yet supported; | ||
see [#7955](https://github.com/apache/datafusion/issues/7955). Dynamic filters | ||
add planning overhead for high-cardinality keys; disable via: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me understand where the overhead comes from at the moment? Is it measurable at all?
- Enable dynamic filter pushdown for left, right, semi, anti, and mark joins | ||
[#16445](https://github.com/apache/datafusion/pull/16445) (adriangb). Mark joins | ||
push filters to the side opposite the preserved input (`dynamic_filter_side`; see | ||
[tests](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/hash_join.rs#L2033-L2049)). Formats without predicate pushdown (CSV/JSON) will not benefit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually if they have statistics they will still benefit (those two generally don't)
} | ||
filter.update(predicate, self.heap.len())?; | ||
} else { | ||
// Even when the dynamic predicate is a tautology we still update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tautology is a very fancy word - would it be possible to give an explanation of what the term means?
I agree with @adriangb that splitting this into multiple PRs would make this much easier to thoroughly review From my past experience, handling pushdown for outerjoins in general is quite subtle (especially in the presence of nulls, etc) and I have chased down many very subtle bugs in my past lives. Testing (with nulls) is especially important too |
Yes a LOT of specific tests + fuzz tests are going to be needed to be certain we don't introduce bugs |
Which issue does this PR close?
Rationale for this change
Dynamic filter pushdown can significantly reduce I/O and compute by pruning non‑matching partitions/files on the fly. Previously this optimization was limited in scope. This PR expands it to additional join types and clarifies which input is pruned for each join. It also makes execution plans more transparent by explicitly annotating the probe side/keys for
HashJoinExec
, aiding debugging and user understanding.What changes are included in this PR?
Optimizer / Execution
LEFT
,RIGHT
,SEMI
,ANTI
, andLeftMark
joins (full joins remain unsupported; only equi-join keys contribute).HashJoinExec
now records and exposesprobe_side
andprobe_keys
in displayed physical plans to reflect runtime pruning behavior.DynamicFilterPhysicalExpr::update
now requires an additionalkey_count
argument to handle composite/equi key tracking.Tests
sqllogictest
expected plans to includeprobe_side=...
andprobe_keys=...
across tpch and other suites (e.g.,predicates.slt
,subquery.slt
,union.slt
, and multipletpch/plans/q*.slt.part
).This PR updates 30 slt, slt.part files.
Documentation
docs/source/library-user-guide/join-preservation.md
summarizing which join sides are preserved for output filtering and ON‑clause evaluation.configs.md
) to clarify the semantics ofdatafusion.optimizer.enable_dynamic_filter_pushdown
, including the dynamic filter target per join type and prerequisites for pushdown (e.g., Parquet pushdown enabled).Are these changes tested?
Yes. This PR updates the
sqllogictest
goldens to reflect the new plan annotations and the broader dynamic filter pushdown coverage:.slt
andtpch
plan snapshots now showprobe_side
/probe_keys
onHashJoinExec
where applicable.Are there any user-facing changes?
Yes (docs/observability only):
probe_side
andprobe_keys
forHashJoinExec
.Breaking API change:
DynamicFilterPhysicalExpr::update
requires a newkey_count
argument. Downstream users implementing custom dynamic filters must update their code.api change
label.Behavioral notes (non‑breaking):
datafusion.optimizer.enable_dynamic_filter_pushdown=false
.execution.parquet.pushdown_filters=true
).