diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 06fc02c304d..38ad7106b73 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1926,7 +1926,7 @@ public RelNode visitFlatten(Flatten node, CalcitePlanContext context) { } /** Helper method to get the function name for proper column naming */ - private String getValueFunctionName(UnresolvedExpression aggregateFunction) { + private String getAggFieldAlias(UnresolvedExpression aggregateFunction) { if (aggregateFunction instanceof Alias) { return ((Alias) aggregateFunction).getName(); } @@ -1976,7 +1976,7 @@ public RelNode visitTimechart( // Handle no by field case if (node.getByField() == null) { - String valueFunctionName = getValueFunctionName(node.getAggregateFunction()); + String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction()); // Create group expression list with just the timestamp span but use a different alias // to avoid @timestamp naming conflict @@ -1984,7 +1984,7 @@ public RelNode visitTimechart( simpleGroupExprList.add(new Alias("timestamp", spanExpr)); // Create agg expression list with the aggregate function List simpleAggExprList = - List.of(new Alias(valueFunctionName, node.getAggregateFunction())); + List.of(new Alias(aggFieldAlias, node.getAggregateFunction())); // Create an Aggregation object Aggregation aggregation = new Aggregation( @@ -1999,9 +1999,9 @@ public RelNode visitTimechart( context.relBuilder.push(result); // Reorder fields: timestamp first, then count context.relBuilder.project( - context.relBuilder.field("timestamp"), context.relBuilder.field(valueFunctionName)); + context.relBuilder.field("timestamp"), context.relBuilder.field(aggFieldAlias)); // Rename timestamp to @timestamp - context.relBuilder.rename(List.of("@timestamp", valueFunctionName)); + context.relBuilder.rename(List.of("@timestamp", aggFieldAlias)); context.relBuilder.sort(context.relBuilder.field(0)); return context.relBuilder.peek(); @@ -2010,7 +2010,7 @@ public RelNode visitTimechart( // Extract parameters for byField case UnresolvedExpression byField = node.getByField(); String byFieldName = ((Field) byField).getField().toString(); - String valueFunctionName = getValueFunctionName(node.getAggregateFunction()); + String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction()); int limit = Optional.ofNullable(node.getLimit()).orElse(10); boolean useOther = Optional.ofNullable(node.getUseOther()).orElse(true); @@ -2037,11 +2037,11 @@ public RelNode visitTimechart( // Handle no limit case - just sort and return with proper field aliases if (limit == 0) { - // Add final projection with proper aliases: [@timestamp, byField, valueFunctionName] + // Add final projection with proper aliases: [@timestamp, byField, aggFieldAlias] context.relBuilder.project( context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"), context.relBuilder.alias(context.relBuilder.field(1), byFieldName), - context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName)); + context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias)); context.relBuilder.sort(context.relBuilder.field(0), context.relBuilder.field(1)); return context.relBuilder.peek(); } @@ -2051,32 +2051,64 @@ public RelNode visitTimechart( // Step 2: Find top N categories using window function approach (more efficient than separate // aggregation) - RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, context); + String aggFunctionName = getAggFunctionName(node.getAggregateFunction()); + Optional aggFuncNameOptional = BuiltinFunctionName.of(aggFunctionName); + if (aggFuncNameOptional.isEmpty()) { + throw new IllegalArgumentException( + StringUtils.format("Unrecognized aggregation function: %s", aggFunctionName)); + } + BuiltinFunctionName aggFunction = aggFuncNameOptional.get(); + RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, aggFunction, context); // Step 3: Apply OTHER logic with single pass return buildFinalResultWithOther( - completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context); + completeResults, + topCategories, + byFieldName, + aggFunction, + aggFieldAlias, + useOther, + limit, + context); } catch (Exception e) { throw new RuntimeException("Error in visitTimechart: " + e.getMessage(), e); } } + private String getAggFunctionName(UnresolvedExpression aggregateFunction) { + if (aggregateFunction instanceof Alias alias) { + return getAggFunctionName(alias.getDelegated()); + } + return ((AggregateFunction) aggregateFunction).getFuncName(); + } + /** Build top categories query - simpler approach that works better with OTHER handling */ private RelNode buildTopCategoriesQuery( - RelNode completeResults, int limit, CalcitePlanContext context) { + RelNode completeResults, + int limit, + BuiltinFunctionName aggFunction, + CalcitePlanContext context) { context.relBuilder.push(completeResults); // Filter out null values when determining top categories - null should not count towards limit context.relBuilder.filter(context.relBuilder.isNotNull(context.relBuilder.field(1))); // Get totals for non-null categories - field positions: 0=@timestamp, 1=byField, 2=value + RexInputRef valueField = context.relBuilder.field(2); + AggCall call = buildAggCall(context.relBuilder, aggFunction, valueField); + context.relBuilder.aggregate( - context.relBuilder.groupKey(context.relBuilder.field(1)), - context.relBuilder.sum(context.relBuilder.field(2)).as("grand_total")); + context.relBuilder.groupKey(context.relBuilder.field(1)), call.as("grand_total")); // Apply sorting and limit to non-null categories only - context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field("grand_total"))); + RexNode sortField = context.relBuilder.field("grand_total"); + // For MIN and EARLIEST, top results should be the minimum ones + sortField = + aggFunction == BuiltinFunctionName.MIN || aggFunction == BuiltinFunctionName.EARLIEST + ? sortField + : context.relBuilder.desc(sortField); + context.relBuilder.sort(sortField); if (limit > 0) { context.relBuilder.limit(0, limit); } @@ -2089,18 +2121,25 @@ private RelNode buildFinalResultWithOther( RelNode completeResults, RelNode topCategories, String byFieldName, - String valueFunctionName, + BuiltinFunctionName aggFunction, + String aggFieldAlias, boolean useOther, int limit, CalcitePlanContext context) { // Use zero-filling for count aggregations, standard result for others - if (valueFunctionName.equals("count")) { + if (aggFieldAlias.equals("count")) { return buildZeroFilledResult( - completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context); + completeResults, topCategories, byFieldName, aggFieldAlias, useOther, limit, context); } else { return buildStandardResult( - completeResults, topCategories, byFieldName, valueFunctionName, useOther, context); + completeResults, + topCategories, + byFieldName, + aggFunction, + aggFieldAlias, + useOther, + context); } } @@ -2109,7 +2148,8 @@ private RelNode buildStandardResult( RelNode completeResults, RelNode topCategories, String byFieldName, - String valueFunctionName, + BuiltinFunctionName aggFunctionName, + String aggFieldAlias, boolean useOther, CalcitePlanContext context) { @@ -2132,11 +2172,13 @@ private RelNode buildStandardResult( context.relBuilder.project( context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"), context.relBuilder.alias(categoryExpr, byFieldName), - context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName)); + context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias)); + RexInputRef valueField = context.relBuilder.field(2); + AggCall aggCall = buildAggCall(context.relBuilder, aggFunctionName, valueField); context.relBuilder.aggregate( context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)), - context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName)); + aggCall.as(aggFieldAlias)); applyFiltersAndSort(useOther, context); return context.relBuilder.peek(); @@ -2171,7 +2213,7 @@ private RelNode buildZeroFilledResult( RelNode completeResults, RelNode topCategories, String byFieldName, - String valueFunctionName, + String aggFieldAlias, boolean useOther, int limit, CalcitePlanContext context) { @@ -2210,7 +2252,7 @@ private RelNode buildZeroFilledResult( context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP), "@timestamp"), context.relBuilder.alias(context.relBuilder.field(1), byFieldName), - context.relBuilder.alias(context.relBuilder.literal(0), valueFunctionName)); + context.relBuilder.alias(context.relBuilder.literal(0), aggFieldAlias)); RelNode zeroFilledCombinations = context.relBuilder.build(); // Get actual results with OTHER logic applied @@ -2232,7 +2274,7 @@ private RelNode buildZeroFilledResult( context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP), "@timestamp"), context.relBuilder.alias(actualCategoryExpr, byFieldName), - context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName)); + context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias)); context.relBuilder.aggregate( context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)), @@ -2247,12 +2289,30 @@ private RelNode buildZeroFilledResult( // Aggregate to combine actual and zero-filled data context.relBuilder.aggregate( context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)), - context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName)); + context.relBuilder.sum(context.relBuilder.field(2)).as(aggFieldAlias)); applyFiltersAndSort(useOther, context); return context.relBuilder.peek(); } + /** + * Aggregate a field based on a given built-in aggregation function name. + * + *

It is intended for secondary aggregations in timechart and chart commands. Using it + * elsewhere may lead to unintended results. It handles explicitly only MIN, MAX, AVG, COUNT, + * DISTINCT_COUNT, EARLIEST, and LATEST. It sums the results for the rest aggregation types, + * assuming them to be accumulative. + */ + private AggCall buildAggCall( + RelBuilder relBuilder, BuiltinFunctionName aggFunction, RexNode node) { + return switch (aggFunction) { + case MIN, EARLIEST -> relBuilder.min(node); + case MAX, LATEST -> relBuilder.max(node); + case AVG -> relBuilder.avg(node); + default -> relBuilder.sum(node); + }; + } + @Override public RelNode visitTrendline(Trendline node, CalcitePlanContext context) { visitChildren(node, context); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index ad2b0aeda85..4fc9eb80982 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -422,10 +422,7 @@ public void testExplainWithReverse() throws IOException { @Test public void testExplainWithTimechartAvg() throws IOException { var result = explainQueryYaml("source=events | timechart span=1m avg(cpu_usage) by host"); - String expected = - !isPushdownDisabled() - ? loadFromFile("expectedOutput/calcite/explain_timechart.yaml") - : loadFromFile("expectedOutput/calcite/explain_timechart_no_pushdown.yaml"); + String expected = loadExpectedPlan("explain_timechart.yaml"); assertYamlEqualsIgnoreId(expected, result); } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimechartCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimechartCommandIT.java index 3b4ca27dab5..4d9352e9e87 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimechartCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimechartCommandIT.java @@ -183,27 +183,13 @@ public void testTimechartWithLimit() throws IOException { schema("host", "string"), schema("avg(cpu_usage)", "double")); - // Verify we have rows for web-01, web-02, and OTHER - boolean foundWeb01 = false; - boolean foundWeb02 = false; - boolean foundOther = false; - - for (int i = 0; i < result.getJSONArray("datarows").length(); i++) { - Object[] row = result.getJSONArray("datarows").getJSONArray(i).toList().toArray(); - String label = (String) row[1]; - - if ("web-01".equals(label)) { - foundWeb01 = true; - } else if ("web-02".equals(label)) { - foundWeb02 = true; - } else if ("OTHER".equals(label)) { - foundOther = true; - } - } - - assertTrue("web-01 not found in results", foundWeb01); - assertTrue("web-02 not found in results", foundWeb02); - assertTrue("OTHER category not found in results", foundOther); + verifyDataRows( + result, + rows("2024-07-01 00:00:00", "web-01", 45.2), + rows("2024-07-01 00:01:00", "OTHER", 38.7), + rows("2024-07-01 00:02:00", "web-01", 55.3), + rows("2024-07-01 00:03:00", "db-01", 42.1), + rows("2024-07-01 00:04:00", "OTHER", 41.8)); } @Test @@ -383,7 +369,7 @@ public void testTimechartWithLimitAndUseOther() throws IOException { if ("OTHER".equals(host)) { foundOther = true; - assertEquals(330.4, cpuUsage, 0.1); + assertEquals(41.3, cpuUsage, 0.1); } else if ("web-03".equals(host)) { foundWeb03 = true; assertEquals(55.3, cpuUsage, 0.1); diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_timechart.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_timechart.yaml index f212b4c8bfd..a315860aac9 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_timechart.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_timechart.yaml @@ -2,7 +2,7 @@ calcite: logical: | LogicalSystemLimit(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC]) - LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[SUM($2)]) + LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[AVG($2)]) LogicalProject(@timestamp=[$0], host=[CASE(IS NOT NULL($3), $1, CASE(IS NULL($1), null:NULL, 'OTHER'))], avg(cpu_usage)=[$2]) LogicalJoin(condition=[=($1, $3)], joinType=[left]) LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2]) @@ -10,7 +10,7 @@ calcite: LogicalProject(host=[$4], cpu_usage=[$7], $f3=[SPAN($1, 1, 'm')]) CalciteLogicalIndexScan(table=[[OpenSearch, events]]) LogicalSort(sort0=[$1], dir0=[DESC], fetch=[10]) - LogicalAggregate(group=[{1}], grand_total=[SUM($2)]) + LogicalAggregate(group=[{1}], grand_total=[AVG($2)]) LogicalFilter(condition=[IS NOT NULL($1)]) LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2]) LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)]) @@ -19,19 +19,21 @@ calcite: physical: | EnumerableLimit(fetch=[10000]) EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC]) - EnumerableAggregate(group=[{0, 1}], avg(cpu_usage)=[SUM($2)]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT NULL($t3)], expr#6=[IS NULL($t1)], expr#7=[null:NULL], expr#8=['OTHER'], expr#9=[CASE($t6, $t7, $t8)], expr#10=[CASE($t5, $t1, $t9)], @timestamp=[$t0], host=[$t10], avg(cpu_usage)=[$t2]) - EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left]) - EnumerableSort(sort0=[$1], dir0=[ASC]) - EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], @timestamp=[$t1], host=[$t0], $f2=[$t8]) - EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) - EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5]) - CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableSort(sort0=[$0], dir0=[ASC]) - EnumerableLimit(fetch=[10]) - EnumerableSort(sort0=[$1], dir0=[DESC]) - EnumerableAggregate(group=[{0}], grand_total=[SUM($1)]) - EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], host=[$t0], $f2=[$t8]) - EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) - EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5]) - CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp], FILTER->IS NOT NULL($0)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"host","boost":1.0}},"_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], proj#0..1=[{exprs}], avg(cpu_usage)=[$t8]) + EnumerableAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)]) + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT NULL($t3)], expr#6=[IS NULL($t1)], expr#7=[null:NULL], expr#8=['OTHER'], expr#9=[CASE($t6, $t7, $t8)], expr#10=[CASE($t5, $t1, $t9)], @timestamp=[$t0], host=[$t10], avg(cpu_usage)=[$t2]) + EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left]) + EnumerableSort(sort0=[$1], dir0=[ASC]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], @timestamp=[$t1], host=[$t0], $f2=[$t8]) + EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableLimit(fetch=[10]) + EnumerableSort(sort0=[$1], dir0=[DESC]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], host=[$t0], grand_total=[$t7]) + EnumerableAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], host=[$t0], $f2=[$t8]) + EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp], FILTER->IS NOT NULL($0)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"host","boost":1.0}},"_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_timechart_no_pushdown.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_timechart_no_pushdown.yaml deleted file mode 100644 index ae966d7eea7..00000000000 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_timechart_no_pushdown.yaml +++ /dev/null @@ -1,37 +0,0 @@ -calcite: - logical: | - LogicalSystemLimit(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) - LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC]) - LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[SUM($2)]) - LogicalProject(@timestamp=[$0], host=[CASE(IS NOT NULL($3), $1, CASE(IS NULL($1), null:NULL, 'OTHER'))], avg(cpu_usage)=[$2]) - LogicalJoin(condition=[=($1, $3)], joinType=[left]) - LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2]) - LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)]) - LogicalProject(host=[$4], cpu_usage=[$7], $f3=[SPAN($1, 1, 'm')]) - CalciteLogicalIndexScan(table=[[OpenSearch, events]]) - LogicalSort(sort0=[$1], dir0=[DESC], fetch=[10]) - LogicalAggregate(group=[{1}], grand_total=[SUM($2)]) - LogicalFilter(condition=[IS NOT NULL($1)]) - LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2]) - LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)]) - LogicalProject(host=[$4], cpu_usage=[$7], $f3=[SPAN($1, 1, 'm')]) - CalciteLogicalIndexScan(table=[[OpenSearch, events]]) - physical: | - EnumerableLimit(fetch=[10000]) - EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC]) - EnumerableAggregate(group=[{0, 1}], avg(cpu_usage)=[SUM($2)]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT NULL($t3)], expr#6=[IS NULL($t1)], expr#7=[null:NULL], expr#8=['OTHER'], expr#9=[CASE($t6, $t7, $t8)], expr#10=[CASE($t5, $t1, $t9)], @timestamp=[$t0], host=[$t10], avg(cpu_usage)=[$t2]) - EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left]) - EnumerableSort(sort0=[$1], dir0=[ASC]) - EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], @timestamp=[$t1], host=[$t0], $f2=[$t8]) - EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) - EnumerableCalc(expr#0..15=[{inputs}], expr#16=[1], expr#17=['m'], expr#18=[SPAN($t1, $t16, $t17)], host=[$t4], cpu_usage=[$t7], $f3=[$t18]) - CalciteEnumerableIndexScan(table=[[OpenSearch, events]]) - EnumerableSort(sort0=[$0], dir0=[ASC]) - EnumerableLimit(fetch=[10]) - EnumerableSort(sort0=[$1], dir0=[DESC]) - EnumerableAggregate(group=[{0}], grand_total=[SUM($2)]) - EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], expr#9=[IS NOT NULL($t0)], proj#0..1=[{exprs}], $f2=[$t8], $condition=[$t9]) - EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) - EnumerableCalc(expr#0..15=[{inputs}], expr#16=[1], expr#17=['m'], expr#18=[SPAN($t1, $t16, $t17)], host=[$t4], cpu_usage=[$t7], $f3=[$t18]) - CalciteEnumerableIndexScan(table=[[OpenSearch, events]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timechart.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timechart.yaml new file mode 100644 index 00000000000..5aa55ca656b --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_timechart.yaml @@ -0,0 +1,39 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC]) + LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[AVG($2)]) + LogicalProject(@timestamp=[$0], host=[CASE(IS NOT NULL($3), $1, CASE(IS NULL($1), null:NULL, 'OTHER'))], avg(cpu_usage)=[$2]) + LogicalJoin(condition=[=($1, $3)], joinType=[left]) + LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2]) + LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)]) + LogicalProject(host=[$4], cpu_usage=[$7], $f3=[SPAN($1, 1, 'm')]) + CalciteLogicalIndexScan(table=[[OpenSearch, events]]) + LogicalSort(sort0=[$1], dir0=[DESC], fetch=[10]) + LogicalAggregate(group=[{1}], grand_total=[AVG($2)]) + LogicalFilter(condition=[IS NOT NULL($1)]) + LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2]) + LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)]) + LogicalProject(host=[$4], cpu_usage=[$7], $f3=[SPAN($1, 1, 'm')]) + CalciteLogicalIndexScan(table=[[OpenSearch, events]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], proj#0..1=[{exprs}], avg(cpu_usage)=[$t8]) + EnumerableAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)]) + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT NULL($t3)], expr#6=[IS NULL($t1)], expr#7=[null:NULL], expr#8=['OTHER'], expr#9=[CASE($t6, $t7, $t8)], expr#10=[CASE($t5, $t1, $t9)], @timestamp=[$t0], host=[$t10], avg(cpu_usage)=[$t2]) + EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left]) + EnumerableSort(sort0=[$1], dir0=[ASC]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], @timestamp=[$t1], host=[$t0], $f2=[$t8]) + EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) + EnumerableCalc(expr#0..15=[{inputs}], expr#16=[1], expr#17=['m'], expr#18=[SPAN($t1, $t16, $t17)], host=[$t4], cpu_usage=[$t7], $f3=[$t18]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]]) + EnumerableSort(sort0=[$0], dir0=[ASC]) + EnumerableLimit(fetch=[10]) + EnumerableSort(sort0=[$1], dir0=[DESC]) + EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], host=[$t0], grand_total=[$t7]) + EnumerableAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)]) + EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], expr#9=[IS NOT NULL($t0)], proj#0..1=[{exprs}], $f2=[$t8], $condition=[$t9]) + EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)]) + EnumerableCalc(expr#0..15=[{inputs}], expr#16=[1], expr#17=['m'], expr#18=[SPAN($t1, $t16, $t17)], host=[$t4], cpu_usage=[$t7], $f3=[$t18]) + CalciteEnumerableIndexScan(table=[[OpenSearch, events]]) diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4582.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4582.yml new file mode 100644 index 00000000000..27973484d6c --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4582.yml @@ -0,0 +1,120 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : true + - do: + indices.create: + index: test_timechart_4582 + body: + mappings: + properties: + "@timestamp": + type: date_nanos + severityNumber: + type: long + severityText: + type: keyword + body: + type: text + - do: + bulk: + index: test_timechart_4582 + refresh: true + body: + - '{"index": {}}' + - '{"@timestamp": "2024-01-15T10:30:04.567890123Z", "severityNumber": 9, "severityText": "INFO", "body": "Info message"}' + - '{"index": {}}' + - '{"@timestamp": "2024-01-15T10:30:05.567890123Z", "severityNumber": 13, "severityText": "WARN", "body": "Warning message"}' + - '{"index": {}}' + - '{"@timestamp": "2024-01-15T10:30:06.567890123Z", "severityNumber": 17, "severityText": "ERROR", "body": "Error message"}' + - '{"index": {}}' + - '{"@timestamp": "2024-01-15T10:30:07.567890123Z", "severityNumber": 21, "severityText": "FATAL", "body": "Fatal message"}' + - '{"index": {}}' + - '{"@timestamp": "2024-01-15T10:30:08.567890123Z", "severityNumber": 24, "severityText": "FATAL4", "body": "Fatal4 message"}' + - '{"index": {}}' + - '{"@timestamp": "2024-01-15T10:30:09.567890123Z", "severityNumber": 23, "severityText": "DEBUG", "body": "Debug message"}' + - '{"index": {}}' + - '{"@timestamp": "2024-01-15T10:30:10.567890123Z", "severityNumber": 20, "severityText": "TRACE", "body": "Trace message"}' + - '{"index": {}}' + - '{"@timestamp": "2024-01-15T10:30:11.567890123Z", "severityNumber": 22, "severityText": "CUSTOM", "body": "Custom message"}' + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + +--- +"timechart max aggregation with limit should not sum OTHER values": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test_timechart_4582 | timechart limit=1 span=10seconds max(severityNumber) by severityText + + - match: { total: 3 } + - match: { "schema": [{"name": "@timestamp", "type": "timestamp"}, {"name": "severityText", "type": "string"}, {"name": "max(severityNumber)", "type": "bigint"}] } + - match: { "datarows": [["2024-01-15 10:30:00", "FATAL4", 24], ["2024-01-15 10:30:00", "OTHER", 23], ["2024-01-15 10:30:10", "OTHER",22]] } + +--- +"timechart min aggregation with limit should not sum OTHER values": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test_timechart_4582 | timechart limit=2 span=1d min(severityNumber) by severityText + + - match: { total: 3 } + - match: { "schema": [{"name": "@timestamp", "type": "timestamp"}, {"name": "severityText", "type": "string"}, {"name": "min(severityNumber)", "type": "bigint"}] } + - match: { "datarows": [["2024-01-15 00:00:00", "INFO", 9], ["2024-01-15 00:00:00", "OTHER", 17], ["2024-01-15 00:00:00", "WARN", 13]] } + +--- +"timechart earliest aggregation with limit should not sum OTHER values": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test_timechart_4582 | timechart limit=2 span=30seconds earliest(@timestamp) by severityText + + - match: { total: 3 } + - match: { "schema": [{"name": "@timestamp", "type": "timestamp"}, {"name": "severityText", "type": "string"}, {"name": "earliest(@timestamp)", "type": "timestamp"}] } + - match: { "datarows": [ + ["2024-01-15 10:30:00", "INFO", "2024-01-15 10:30:04.567890123"], + ["2024-01-15 10:30:00", "OTHER", "2024-01-15 10:30:06.567890123"], + ["2024-01-15 10:30:00", "WARN", "2024-01-15 10:30:05.567890123"]] } + +--- +"timechart count aggregation with limit should sum OTHER values": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test_timechart_4582 | timechart limit=3 span=1min count() by severityText + + - match: { total: 4 } + - match: { "schema": [{"name": "@timestamp", "type": "timestamp"}, {"name": "severityText", "type": "string"}, {"name": "count", "type": "bigint"}] } + - match: { "datarows": [["2024-01-15 10:30:00", "CUSTOM", 1], ["2024-01-15 10:30:00", "DEBUG", 1], ["2024-01-15 10:30:00", "ERROR", 1], ["2024-01-15 10:30:00", "OTHER", 5]] } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimechartTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimechartTest.java index 6e03447e243..ca8ddd3e39c 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimechartTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimechartTest.java @@ -218,13 +218,13 @@ public void testTimechartWithSpan1m() { RelNode root = getRelNode(ppl); String expectedSparkSql = "SELECT `t1`.`@timestamp`, CASE WHEN `t7`.`region` IS NOT NULL THEN `t1`.`region` ELSE CASE" - + " WHEN `t1`.`region` IS NULL THEN NULL ELSE 'OTHER' END END `region`, SUM(`t1`.`$f2`)" + + " WHEN `t1`.`region` IS NULL THEN NULL ELSE 'OTHER' END END `region`, AVG(`t1`.`$f2`)" + " `avg(cpu_usage)`\n" + "FROM (SELECT `SPAN`(`@timestamp`, 1, 'm') `@timestamp`, `region`, AVG(`cpu_usage`)" + " `$f2`\n" + "FROM `scott`.`events`\n" + "GROUP BY `region`, `SPAN`(`@timestamp`, 1, 'm')) `t1`\n" - + "LEFT JOIN (SELECT `region`, SUM(`$f2`) `grand_total`\n" + + "LEFT JOIN (SELECT `region`, AVG(`$f2`) `grand_total`\n" + "FROM (SELECT `SPAN`(`@timestamp`, 1, 'm') `@timestamp`, `region`, AVG(`cpu_usage`)" + " `$f2`\n" + "FROM `scott`.`events`\n" @@ -255,13 +255,13 @@ public void testTimechartWithLimitAndUseOtherFalse() { RelNode root = getRelNode(ppl); String expectedSparkSql = "SELECT `t1`.`@timestamp`, CASE WHEN `t7`.`host` IS NOT NULL THEN `t1`.`host` ELSE CASE" - + " WHEN `t1`.`host` IS NULL THEN NULL ELSE 'OTHER' END END `host`, SUM(`t1`.`$f2`)" + + " WHEN `t1`.`host` IS NULL THEN NULL ELSE 'OTHER' END END `host`, AVG(`t1`.`$f2`)" + " `avg(cpu_usage)`\n" + "FROM (SELECT `SPAN`(`@timestamp`, 1, 'h') `@timestamp`, `host`, AVG(`cpu_usage`)" + " `$f2`\n" + "FROM `scott`.`events`\n" + "GROUP BY `host`, `SPAN`(`@timestamp`, 1, 'h')) `t1`\n" - + "LEFT JOIN (SELECT `host`, SUM(`$f2`) `grand_total`\n" + + "LEFT JOIN (SELECT `host`, AVG(`$f2`) `grand_total`\n" + "FROM (SELECT `SPAN`(`@timestamp`, 1, 'h') `@timestamp`, `host`, AVG(`cpu_usage`)" + " `$f2`\n" + "FROM `scott`.`events`\n"