-
Notifications
You must be signed in to change notification settings - Fork 5.5k
fix: enhance sort merge join rule to do merge join when one sided is sorted #26361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideExtends the MergeJoinForSortedInputOptimizer to run under native execution and Presto on Spark, support all join types, and enable merge join when only one side is sorted by introducing a prestoOnSpark flag, injecting sorts dynamically, propagating the flag throughout the planner and execution components, updating grouped execution logic, and broadening test coverage. Sequence diagram for enhanced sort merge join optimization in Presto on SparksequenceDiagram
participant "Query Planner"
participant "MergeJoinForSortedInputOptimizer"
participant "PlanNode"
participant "SortNode"
participant "MergeJoinNode"
participant "Execution Engine"
"Query Planner"->>"MergeJoinForSortedInputOptimizer": Optimize JoinNode
"MergeJoinForSortedInputOptimizer"->>"PlanNode": Check if left/right input is sorted
alt Only one side is sorted and prestoOnSpark is true
"MergeJoinForSortedInputOptimizer"->>"SortNode": Inject sort on unsorted side
end
"MergeJoinForSortedInputOptimizer"->>"MergeJoinNode": Replace JoinNode with MergeJoinNode
"MergeJoinNode"->>"Execution Engine": Execute merge join
ER diagram for new and updated planner flags and their propagationerDiagram
PlanFragmenter {
bool isPrestoOnSpark
}
IterativePlanFragmenter {
bool isPrestoOnSpark
}
PlanFragmenterUtils {
bool isPrestoOnSpark
}
GroupedExecutionTagger {
bool isPrestoOnSpark
}
PlanFragmenter ||--o| PlanFragmenterUtils : uses
IterativePlanFragmenter ||--o| PlanFragmenterUtils : uses
PlanFragmenterUtils ||--o| GroupedExecutionTagger : propagates
PlanFragmenterUtils ||--o| MergeJoinForSortedInputOptimizer : propagates
MergeJoinForSortedInputOptimizer {
bool prestoOnSpark
}
Class diagram for updated MergeJoinForSortedInputOptimizer and related planner classesclassDiagram
class MergeJoinForSortedInputOptimizer {
- Metadata metadata
- boolean nativeExecution
- boolean prestoOnSpark
- boolean isEnabledForTesting
+ MergeJoinForSortedInputOptimizer(metadata, nativeExecution, prestoOnSpark)
+ boolean isEnabled(Session session)
+ PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator)
}
class Rewriter {
- PlanNodeIdAllocator idAllocator
- Metadata metadata
- Session session
- boolean prestoOnSpark
- boolean planChanged
+ Rewriter(idAllocator, metadata, session, prestoOnSpark)
+ boolean isPlanChanged()
+ PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
+ boolean isPlanOutputSortedByColumns(PlanNode plan, List<VariableReferenceExpression> columns)
}
MergeJoinForSortedInputOptimizer o-- Rewriter
class GroupedExecutionTagger {
- Metadata metadata
- NodePartitioningManager nodePartitioningManager
- boolean groupedExecutionEnabled
- boolean isPrestoOnSpark
+ GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager, boolean groupedExecutionEnabled, boolean isPrestoOnSpark)
}
class PlanFragmenter {
- boolean isPrestoOnSpark
}
class IterativePlanFragmenter {
- boolean isPrestoOnSpark
}
class PlanFragmenterUtils {
+ static SubPlan finalizeSubPlan(..., boolean isPrestoOnSpark)
}
MergeJoinForSortedInputOptimizer <.. PlanOptimizers
PlanFragmenter <.. PlanFragmenterUtils
IterativePlanFragmenter <.. PlanFragmenterUtils
GroupedExecutionTagger <.. PlanFragmenterUtils
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Rename the typo in
ConstatnCheck
toConstantCheck
to improve readability and consistency. - The new constant‐check visitor only handles Project, Filter, and TableScan nodes—either document this limitation or extend it to explicitly handle other PlanNode types.
- Threading the
isPrestoOnSpark
flag through many methods makes signatures verbose—consider bundling environment flags into a single context object to simplify method parameters.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Rename the typo in `ConstatnCheck` to `ConstantCheck` to improve readability and consistency.
- The new constant‐check visitor only handles Project, Filter, and TableScan nodes—either document this limitation or extend it to explicitly handle other PlanNode types.
- Threading the `isPrestoOnSpark` flag through many methods makes signatures verbose—consider bundling environment flags into a single context object to simplify method parameters.
## Individual Comments
### Comment 1
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/SortMergeJoinOptimizer.java:106` </location>
<code_context>
+ && !joinNode.getCriteria().isEmpty();
+ }
+
+ private static class ConstatnCheck
+ extends InternalPlanVisitor<Boolean, Set<VariableReferenceExpression>>
+ {
</code_context>
<issue_to_address>
**nitpick (typo):** Typo in class name 'ConstatnCheck' should be 'ConstantCheck'.
Please rename 'ConstatnCheck' to 'ConstantCheck' for consistency.
Suggested implementation:
```java
private static class ConstantCheck
extends InternalPlanVisitor<Boolean, Set<VariableReferenceExpression>>
{
```
If there are any usages of `ConstatnCheck` elsewhere in this file (e.g., instantiations or references), they should also be renamed to `ConstantCheck`.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...base/src/main/java/com/facebook/presto/sql/planner/optimizations/SortMergeJoinOptimizer.java
Outdated
Show resolved
Hide resolved
ac27911
to
a225204
Compare
775c049
to
a08cb35
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New security issues found
// merge join can't be enabled | ||
assertPlan( | ||
mergeJoinEnabled(), | ||
"select * from test_join_customer2 join test_join_order2 on test_join_customer2.custkey = test_join_order2.custkey", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security (generic-api-key): Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
Source: gitleaks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this custkey
somehow trigger this gitleaks check
presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestSortMergeJoinOptimizer.java
Outdated
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestSortMergeJoinOptimizer.java
Outdated
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestSortMergeJoinOptimizer.java
Outdated
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestSortMergeJoinOptimizer.java
Outdated
Show resolved
Hide resolved
presto-hive/src/test/java/com/facebook/presto/hive/TestSortMergeJoinOptimizer.java
Outdated
Show resolved
Hide resolved
ac32798
to
4ce55e7
Compare
The key in the text is just a test text
@Override | ||
protected FeaturesConfig createFeaturesConfig() | ||
{ | ||
return new FeaturesConfig().setNativeExecutionEnabled(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set the test environment to be native execution
mergeJoinEnabled(), | ||
"select * from test_join_customer_join_type left join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey", | ||
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, false)); | ||
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now not only inner joins are supported
@Override | ||
protected FeaturesConfig createFeaturesConfig() | ||
{ | ||
return new FeaturesConfig().setNativeExecutionEnabled(true).setPrestoSparkExecutionEnvironment(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test for presto on spark environment
// merge join can't be enabled | ||
assertPlan( | ||
mergeJoinEnabled(), | ||
"select * from test_join_customer2 join test_join_order2 on test_join_customer2.custkey = test_join_order2.custkey", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this custkey
somehow trigger this gitleaks check
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable(); | ||
} | ||
|
||
if (isPrestoOnSpark) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When presto on spark is the environment, if one side is grouped execution capable, even if the other side is not, the data will still be partitioned by buckets, and have a bucket by bucket execution in presto on spark
public boolean isEnabled(Session session) | ||
{ | ||
return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session) && !isSingleNodeExecutionEnabled(session); | ||
return isEnabledForTesting || nativeExecution && (isGroupedExecutionEnabled(session) || prestoOnSpark) && preferMergeJoinForSortedInputs(session) && !isSingleNodeExecutionEnabled(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- only run for nativeExecution
- if presto on spark, do not need group execution enabled
boolean leftInputSorted = isPlanOutputSortedByColumns(rewrittenLeft, node.getCriteria().stream().map(EquiJoinClause::getLeft).collect(toImmutableList())); | ||
boolean rightInputSorted = isPlanOutputSortedByColumns(rewrittenRight, node.getCriteria().stream().map(EquiJoinClause::getRight).collect(toImmutableList())); | ||
|
||
if ((!leftInputSorted && !rightInputSorted) || (!prestoOnSpark && (!leftInputSorted || !rightInputSorted))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do not do merge join is no input is sorted
- If only one side is sorted, do not do merge join if not presto on spark
4ce55e7
to
54dcfd7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
Blocking issues:
- Detected a Generic API Key, potentially exposing access to various services and sensitive operations. (link)
- Detected a Generic API Key, potentially exposing access to various services and sensitive operations. (link)
- Detected a Generic API Key, potentially exposing access to various services and sensitive operations. (link)
- Detected a Generic API Key, potentially exposing access to various services and sensitive operations. (link)
- Detected a Generic API Key, potentially exposing access to various services and sensitive operations. (link)
General comments:
- The
prestoOnSpark
flag is propagated through many constructors and static methods; consider centralizing that flag in a shared execution context or reading it directly from session/FeaturesConfig to reduce boilerplate and improve maintainability. - The inversion logic in
isPlanOutputSortedByColumns
aroundLocalProperties.match(...)
can be confusing—extract it into a clearly named helper (e.g.isSortedBy(…)
) or invert the matcher itself to make the intent more readable. - When adding on‐demand
SortNode
s for one‐sided sorting, ensure that redundant or no‐op sorts are pruned downstream or guarded against here, so you don’t introduce unnecessary sort operators into the plan.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `prestoOnSpark` flag is propagated through many constructors and static methods; consider centralizing that flag in a shared execution context or reading it directly from session/FeaturesConfig to reduce boilerplate and improve maintainability.
- The inversion logic in `isPlanOutputSortedByColumns` around `LocalProperties.match(...)` can be confusing—extract it into a clearly named helper (e.g. `isSortedBy(…)`) or invert the matcher itself to make the intent more readable.
- When adding on‐demand `SortNode`s for one‐sided sorting, ensure that redundant or no‐op sorts are pruned downstream or guarded against here, so you don’t introduce unnecessary sort operators into the plan.
## Individual Comments
### Comment 1
<location> `presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java:90-93` </location>
<code_context>
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer_join_type left join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey",
- joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, false));
+ joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, true));
// Right join
</code_context>
<issue_to_address>
**suggestion (testing):** Updated assertions to check for merge join enablement for all join types.
Also, please add tests to verify that merge join is not enabled when inputs are unsorted.
</issue_to_address>
### Comment 2
<location> `presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java:178` </location>
<code_context>
test_join_order2.custkey
</code_context>
<issue_to_address>
**security (generic-api-key):** Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
*Source: gitleaks*
</issue_to_address>
### Comment 3
<location> `presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java:210` </location>
<code_context>
test_join_order3.custkey
</code_context>
<issue_to_address>
**security (generic-api-key):** Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
*Source: gitleaks*
</issue_to_address>
### Comment 4
<location> `presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java:242` </location>
<code_context>
test_join_order4.custkey
</code_context>
<issue_to_address>
**security (generic-api-key):** Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
*Source: gitleaks*
</issue_to_address>
### Comment 5
<location> `presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java:277` </location>
<code_context>
test_join_order5.custkey
</code_context>
<issue_to_address>
**security (generic-api-key):** Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
*Source: gitleaks*
</issue_to_address>
### Comment 6
<location> `presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java:277` </location>
<code_context>
test_join_order5.orderkey
</code_context>
<issue_to_address>
**security (generic-api-key):** Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
*Source: gitleaks*
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Description
The current MergeJoinForSortedInputOptimizer rule does sort merge join only when both inputs are sorted. In this PR, I made the following changes:
GroupedExecutionTagger
, do not fail for merge join node when grouped execution is not available for presto on spark. This is because presto on spark spawn as many tasks as the number of partitions of data. When one side is bucketed, the other side even not bucketed, the join will get as many tasks as the number of buckets, hence still equivalent to a bucket by bucket executionMotivation and Context
Sort merge join for more cases
Impact
Improve performance
Test Plan
Unit tests, and local end to end tests
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.