Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.Session;
import com.facebook.presto.spi.plan.JoinType;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.assertions.PlanMatchPattern;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
Expand Down Expand Up @@ -56,6 +57,12 @@ protected QueryRunner createQueryRunner()
Optional.empty());
}

@Override
protected FeaturesConfig createFeaturesConfig()
{
return new FeaturesConfig().setNativeExecutionEnabled(true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set the test environment to be native execution

}

@Test
public void testJoinType()
{
Expand Down Expand Up @@ -83,19 +90,19 @@ public void testJoinType()
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer_join_type left join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey",
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, false));
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, true));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now not only inner joins are supported


// Right join
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer_join_type right join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey",
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), RIGHT, false));
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), RIGHT, true));

// Outer join
assertPlan(
mergeJoinEnabled(),
"select * from test_join_customer_join_type full join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey",
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), FULL, false));
joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), FULL, true));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_join_type");
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.OptionalInt;

import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.preferSortMergeJoin;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PLAN_ERROR;
import static com.facebook.presto.spi.connector.ConnectorCapabilities.SUPPORTS_PAGE_SINK_COMMIT;
Expand All @@ -58,13 +57,15 @@ class GroupedExecutionTagger
private final Metadata metadata;
private final NodePartitioningManager nodePartitioningManager;
private final boolean groupedExecutionEnabled;
private final boolean isPrestoOnSpark;

public GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager)
public GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager, boolean groupedExecutionEnabled, boolean isPrestoOnSpark)
{
this.session = requireNonNull(session, "session is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.groupedExecutionEnabled = isGroupedExecutionEnabled(session);
this.groupedExecutionEnabled = groupedExecutionEnabled;
this.isPrestoOnSpark = isPrestoOnSpark;
}

@Override
Expand Down Expand Up @@ -166,6 +167,15 @@ public GroupedExecutionTagger.GroupedExecutionProperties visitMergeJoin(MergeJoi
// TODO: This will break the other use case for merge join operating on sorted tables, which requires grouped execution for correctness.
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}

if (isPrestoOnSpark) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When presto on spark is the environment, if one side is grouped execution capable, even if the other side is not, the data will still be partitioned by buckets, and have a bucket by bucket execution in presto on spark

GroupedExecutionTagger.GroupedExecutionProperties mergeJoinLeft = node.getLeft().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, true, true), null);
GroupedExecutionTagger.GroupedExecutionProperties mergeJoinRight = node.getRight().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, true, true), null);
if (mergeJoinLeft.currentNodeCapable || mergeJoinRight.currentNodeCapable) {
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
}

throw new PrestoException(
INVALID_PLAN_ERROR,
format("When grouped execution can't be enabled, merge join plan is not valid." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class PlanFragmenter
private final QueryManagerConfig config;
private final PlanChecker distributedPlanChecker;
private final PlanChecker singleNodePlanChecker;
private final boolean isPrestoOnSpark;

@Inject
public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager)
Expand All @@ -61,6 +62,7 @@ public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitionin
this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false, planCheckerProviderManager);
this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, planCheckerProviderManager);
this.isPrestoOnSpark = featuresConfig.isPrestoSparkExecutionEnvironment();
}

public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
Expand Down Expand Up @@ -90,7 +92,7 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, Pl
PlanNode root = SimplePlanRewriter.rewriteWith(fragmenter, plan.getRoot(), properties);

SubPlan subPlan = fragmenter.buildRootFragment(root, properties);
return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, noExchange, warningCollector, subPlan.getFragment().getPartitioning());
return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, noExchange, warningCollector, subPlan.getFragment().getPartitioning(), isPrestoOnSpark);
}

private static class Fragmenter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
Expand Down Expand Up @@ -92,12 +93,13 @@ public static SubPlan finalizeSubPlan(
Session session,
boolean noExchange,
WarningCollector warningCollector,
PartitioningHandle partitioningHandle)
PartitioningHandle partitioningHandle,
boolean isPrestoOnSpark)
{
subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle);
if (!noExchange && !isSingleNodeExecutionEnabled(session)) {
// grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_NODE_EXECUTION_ENABLED
subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager);
subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager, isPrestoOnSpark);
}

checkState(subPlan.getFragment().getId().getId() != ROOT_FRAGMENT_ID || !isForceSingleNodeOutput(session) || subPlan.getFragment().getPartitioning().isSingleNode(), "Root of PlanFragment is not single node");
Expand Down Expand Up @@ -148,10 +150,10 @@ private static void sanityCheckFragmentedPlan(

* TODO: We should introduce "query section" and make recoverability analysis done at query section level.
*/
private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, boolean parentContainsTableFinish, Metadata metadata, NodePartitioningManager nodePartitioningManager)
private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, boolean parentContainsTableFinish, Metadata metadata, NodePartitioningManager nodePartitioningManager, boolean isPrestoOnSpark)
{
PlanFragment fragment = subPlan.getFragment();
GroupedExecutionTagger.GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager), null);
GroupedExecutionTagger.GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, isGroupedExecutionEnabled(session), isPrestoOnSpark), null);
if (properties.isSubTreeUseful()) {
boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)
&& new HashSet<>(properties.getCapableTableScanNodes()).containsAll(fragment.getTableScanSchedulingOrder());
Expand Down Expand Up @@ -185,7 +187,7 @@ private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan,
ImmutableList.Builder<SubPlan> result = ImmutableList.builder();
boolean containsTableFinishNode = containsTableFinishNode(fragment);
for (SubPlan child : subPlan.getChildren()) {
result.add(analyzeGroupedExecution(session, child, containsTableFinishNode, metadata, nodePartitioningManager));
result.add(analyzeGroupedExecution(session, child, containsTableFinishNode, metadata, nodePartitioningManager, isPrestoOnSpark));
}
return new SubPlan(fragment, result.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ public PlanOptimizers(
// MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation
// Should be placed after AddExchanges, but before AddLocalExchange
// To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()),
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled(), featuresConfig.isPrestoSparkExecutionEnvironment()),
new SortMergeJoinOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()));

// Optimizers above this don't understand local exchanges, so be careful moving this.
Expand Down
Loading
Loading