Skip to content

Commit d342799

Browse files
committed
Run single phase aggregation when possible
1 parent d365412 commit d342799

File tree

6 files changed

+90
-20
lines changed

6 files changed

+90
-20
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static org.hamcrest.Matchers.containsString;
3030
import static org.hamcrest.Matchers.equalTo;
3131
import static org.hamcrest.Matchers.hasSize;
32+
import static org.hamcrest.Matchers.startsWith;
3233

3334
public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
3435

@@ -444,6 +445,19 @@ public void testProfile() {
444445
}
445446
}
446447
assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 3));
448+
{
449+
List<DriverProfile> finalProfiles = profile.drivers().stream().filter(d -> d.description().equals("final")).toList();
450+
assertThat(finalProfiles, hasSize(1));
451+
DriverProfile finalProfile = finalProfiles.getFirst();
452+
assertThat(finalProfile.operators(), hasSize(7));
453+
assertThat(finalProfile.operators().get(0).operator(), startsWith("ExchangeSourceOperator"));
454+
assertThat(finalProfile.operators().get(1).operator(), startsWith("TimeSeriesAggregationOperator"));
455+
assertThat(finalProfile.operators().get(2).operator(), startsWith("ProjectOperator"));
456+
assertThat(finalProfile.operators().get(3).operator(), startsWith("HashAggregationOperator"));
457+
assertThat(finalProfile.operators().get(4).operator(), startsWith("ProjectOperator"));
458+
assertThat(finalProfile.operators().get(5).operator(), startsWith("TopNOperator"));
459+
assertThat(finalProfile.operators().get(6).operator(), startsWith("OutputOperator"));
460+
}
447461
}
448462
}
449463
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.xpack.esql.VerificationException;
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
13+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.SinglePhaseAggregate;
1314
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
1415
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1516
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
@@ -24,7 +25,8 @@
2425
public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPlan, PhysicalOptimizerContext> {
2526

2627
private static final List<RuleExecutor.Batch<PhysicalPlan>> RULES = List.of(
27-
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns())
28+
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()),
29+
new Batch<>("Single phase aggregate", Limiter.ONCE, new SinglePhaseAggregate())
2830
);
2931

3032
private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.physical;
9+
10+
import org.elasticsearch.compute.aggregation.AggregatorMode;
11+
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
12+
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
13+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
14+
15+
/**
16+
* Collapses two-phase aggregation into a single phase when possible.
17+
* For example, in FROM .. | STATS first | STATS second, the STATS second aggregation
18+
* can be executed in a single phase on the coordinator instead of two phases.
19+
*/
20+
public class SinglePhaseAggregate extends PhysicalOptimizerRules.OptimizerRule<AggregateExec> {
21+
@Override
22+
protected PhysicalPlan rule(AggregateExec plan) {
23+
if (plan instanceof AggregateExec parent
24+
&& parent.getMode() == AggregatorMode.FINAL
25+
&& parent.child() instanceof AggregateExec child
26+
&& child.getMode() == AggregatorMode.INITIAL) {
27+
return child.withMode(AggregatorMode.SINGLE);
28+
}
29+
return plan;
30+
}
31+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ public final PhysicalOperation groupingPhysicalOperation(
7575
List<Aggregator.Factory> aggregatorFactories = new ArrayList<>();
7676

7777
// append channels to the layout
78-
if (aggregatorMode == AggregatorMode.FINAL) {
79-
layout.append(aggregates);
80-
} else {
78+
if (aggregatorMode.isOutputPartial()) {
8179
layout.append(aggregateMapper.mapNonGrouping(aggregates));
80+
} else {
81+
layout.append(aggregates);
8282
}
8383

8484
// create the agg factories
@@ -146,14 +146,14 @@ else if (aggregatorMode.isOutputPartial()) {
146146
groupSpecs.add(new GroupSpec(groupInput == null ? null : groupInput.channel(), sourceGroupAttribute, group));
147147
}
148148

149-
if (aggregatorMode == AggregatorMode.FINAL) {
149+
if (aggregatorMode.isOutputPartial()) {
150+
layout.append(aggregateMapper.mapGrouping(aggregates));
151+
} else {
150152
for (var agg : aggregates) {
151153
if (Alias.unwrap(agg) instanceof AggregateFunction) {
152154
layout.append(agg);
153155
}
154156
}
155-
} else {
156-
layout.append(aggregateMapper.mapGrouping(aggregates));
157157
}
158158

159159
// create the agg factories
@@ -264,7 +264,13 @@ private void aggregatesToFactory(
264264
if (child instanceof AggregateFunction aggregateFunction) {
265265
List<NamedExpression> sourceAttr = new ArrayList<>();
266266

267-
if (mode == AggregatorMode.INITIAL) {
267+
if (mode.isInputPartial()) {
268+
if (grouping) {
269+
sourceAttr = aggregateMapper.mapGrouping(ne);
270+
} else {
271+
sourceAttr = aggregateMapper.mapNonGrouping(ne);
272+
}
273+
} else {
268274
// TODO: this needs to be made more reliable - use casting to blow up when dealing with expressions (e+1)
269275
Expression field = aggregateFunction.field();
270276
// Only count can now support literals - all the other aggs should be optimized away
@@ -292,16 +298,6 @@ private void aggregatesToFactory(
292298
}
293299
}
294300
}
295-
// coordinator/exchange phase
296-
else if (mode == AggregatorMode.FINAL || mode == AggregatorMode.INTERMEDIATE) {
297-
if (grouping) {
298-
sourceAttr = aggregateMapper.mapGrouping(ne);
299-
} else {
300-
sourceAttr = aggregateMapper.mapNonGrouping(ne);
301-
}
302-
} else {
303-
throw new EsqlIllegalArgumentException("illegal aggregation mode");
304-
}
305301

306302
AggregatorFunctionSupplier aggSupplier = supplier(aggregateFunction);
307303

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.common.util.BigArrays;
1515
import org.elasticsearch.common.util.Maps;
1616
import org.elasticsearch.compute.Describable;
17-
import org.elasticsearch.compute.aggregation.AggregatorMode;
1817
import org.elasticsearch.compute.data.Block;
1918
import org.elasticsearch.compute.data.BlockFactory;
2019
import org.elasticsearch.compute.data.ElementType;
@@ -215,7 +214,7 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
215214
// workaround for https://github.com/elastic/elasticsearch/issues/99782
216215
localPhysicalPlan = localPhysicalPlan.transformUp(
217216
AggregateExec.class,
218-
a -> a.getMode() == AggregatorMode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a
217+
a -> a.getMode().isOutputPartial() ? a : new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates()))
219218
);
220219
PhysicalOperation physicalOperation = plan(localPhysicalPlan, context);
221220

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@
162162
import static java.util.Arrays.asList;
163163
import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
164164
import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL;
165+
import static org.elasticsearch.compute.aggregation.AggregatorMode.SINGLE;
165166
import static org.elasticsearch.core.Tuple.tuple;
166167
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
167168
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
@@ -7814,6 +7815,33 @@ public void testLookupJoinFieldLoadingDropAllFields() throws Exception {
78147815
assertLookupJoinFieldNames(query, data, List.of(Set.of(), Set.of("foo", "bar", "baz")));
78157816
}
78167817

7818+
/**
7819+
* LimitExec[1000[INTEGER],null]
7820+
* \_AggregateExec[[last_name{r}#8],[COUNT(first_name{r}#5,true[BOOLEAN]) AS count(first_name)#11, last_name{r}#8],SINGLE,[last_name
7821+
* {r}#8, $$count(first_name)$count{r}#25, $$count(first_name)$seen{r}#26],null]
7822+
* \_AggregateExec[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A
7823+
* S last_name#8],FINAL,[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],null]
7824+
* \_ExchangeExec[[emp_no{f}#12, $$first_name$values{r}#23, $$last_name$values{r}#24],true]
7825+
* \_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[
7826+
* Aggregate[[emp_no{f}#12],[VALUES(first_name{f}#13,true[BOOLEAN]) AS first_name#5, VALUES(last_name{f}#16,true[BOOLEAN]) A
7827+
* S last_name#8]]
7828+
* \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..]]]
7829+
*/
7830+
public void testSingleModeAggregate() {
7831+
String q = """
7832+
FROM test
7833+
| STATS first_name = VALUES(first_name), last_name = VALUES(last_name) BY emp_no
7834+
| STATS count(first_name) BY last_name""";
7835+
PhysicalPlan plan = physicalPlan(q);
7836+
PhysicalPlan optimized = physicalPlanOptimizer.optimize(plan);
7837+
LimitExec limit = as(optimized, LimitExec.class);
7838+
AggregateExec second = as(limit.child(), AggregateExec.class);
7839+
assertThat(second.getMode(), equalTo(SINGLE));
7840+
AggregateExec first = as(second.child(), AggregateExec.class);
7841+
assertThat(first.getMode(), equalTo(FINAL));
7842+
as(first.child(), ExchangeExec.class);
7843+
}
7844+
78177845
private void assertLookupJoinFieldNames(String query, TestDataSource data, List<Set<String>> expectedFieldNames) {
78187846
assertLookupJoinFieldNames(query, data, expectedFieldNames, false);
78197847
}

0 commit comments

Comments
 (0)