Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ public RelNode visitFlatten(Flatten node, CalcitePlanContext context) {
}

/** Helper method to get the function name for proper column naming */
private String getValueFunctionName(UnresolvedExpression aggregateFunction) {
private String getAggFieldAlias(UnresolvedExpression aggregateFunction) {
if (aggregateFunction instanceof Alias) {
return ((Alias) aggregateFunction).getName();
}
Expand Down Expand Up @@ -1976,15 +1976,15 @@ public RelNode visitTimechart(

// Handle no by field case
if (node.getByField() == null) {
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction());

// Create group expression list with just the timestamp span but use a different alias
// to avoid @timestamp naming conflict
List<UnresolvedExpression> simpleGroupExprList = new ArrayList<>();
simpleGroupExprList.add(new Alias("timestamp", spanExpr));
// Create agg expression list with the aggregate function
List<UnresolvedExpression> simpleAggExprList =
List.of(new Alias(valueFunctionName, node.getAggregateFunction()));
List.of(new Alias(aggFieldAlias, node.getAggregateFunction()));
// Create an Aggregation object
Aggregation aggregation =
new Aggregation(
Expand All @@ -1999,9 +1999,9 @@ public RelNode visitTimechart(
context.relBuilder.push(result);
// Reorder fields: timestamp first, then count
context.relBuilder.project(
context.relBuilder.field("timestamp"), context.relBuilder.field(valueFunctionName));
context.relBuilder.field("timestamp"), context.relBuilder.field(aggFieldAlias));
// Rename timestamp to @timestamp
context.relBuilder.rename(List.of("@timestamp", valueFunctionName));
context.relBuilder.rename(List.of("@timestamp", aggFieldAlias));

context.relBuilder.sort(context.relBuilder.field(0));
return context.relBuilder.peek();
Expand All @@ -2010,7 +2010,7 @@ public RelNode visitTimechart(
// Extract parameters for byField case
UnresolvedExpression byField = node.getByField();
String byFieldName = ((Field) byField).getField().toString();
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction());

int limit = Optional.ofNullable(node.getLimit()).orElse(10);
boolean useOther = Optional.ofNullable(node.getUseOther()).orElse(true);
Expand All @@ -2037,11 +2037,11 @@ public RelNode visitTimechart(

// Handle no limit case - just sort and return with proper field aliases
if (limit == 0) {
// Add final projection with proper aliases: [@timestamp, byField, valueFunctionName]
// Add final projection with proper aliases: [@timestamp, byField, aggFieldAlias]
context.relBuilder.project(
context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"),
context.relBuilder.alias(context.relBuilder.field(1), byFieldName),
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));
context.relBuilder.sort(context.relBuilder.field(0), context.relBuilder.field(1));
return context.relBuilder.peek();
}
Expand All @@ -2051,32 +2051,64 @@ public RelNode visitTimechart(

// Step 2: Find top N categories using window function approach (more efficient than separate
// aggregation)
RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, context);
String aggFunctionName = getAggFunctionName(node.getAggregateFunction());
Optional<BuiltinFunctionName> aggFuncNameOptional = BuiltinFunctionName.of(aggFunctionName);
if (aggFuncNameOptional.isEmpty()) {
throw new IllegalArgumentException(
StringUtils.format("Unrecognized aggregation function: %s", aggFunctionName));
}
BuiltinFunctionName aggFunction = aggFuncNameOptional.get();
RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, aggFunction, context);

// Step 3: Apply OTHER logic with single pass
return buildFinalResultWithOther(
completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context);
completeResults,
topCategories,
byFieldName,
aggFunction,
aggFieldAlias,
useOther,
limit,
context);

} catch (Exception e) {
throw new RuntimeException("Error in visitTimechart: " + e.getMessage(), e);
}
}

private String getAggFunctionName(UnresolvedExpression aggregateFunction) {
if (aggregateFunction instanceof Alias alias) {
return getAggFunctionName(alias.getDelegated());
}
return ((AggregateFunction) aggregateFunction).getFuncName();
}

/** Build top categories query - simpler approach that works better with OTHER handling */
private RelNode buildTopCategoriesQuery(
RelNode completeResults, int limit, CalcitePlanContext context) {
RelNode completeResults,
int limit,
BuiltinFunctionName aggFunction,
CalcitePlanContext context) {
context.relBuilder.push(completeResults);

// Filter out null values when determining top categories - null should not count towards limit
context.relBuilder.filter(context.relBuilder.isNotNull(context.relBuilder.field(1)));

// Get totals for non-null categories - field positions: 0=@timestamp, 1=byField, 2=value
RexInputRef valueField = context.relBuilder.field(2);
AggCall call = buildAggCall(context.relBuilder, aggFunction, valueField);

context.relBuilder.aggregate(
context.relBuilder.groupKey(context.relBuilder.field(1)),
context.relBuilder.sum(context.relBuilder.field(2)).as("grand_total"));
context.relBuilder.groupKey(context.relBuilder.field(1)), call.as("grand_total"));

// Apply sorting and limit to non-null categories only
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field("grand_total")));
RexNode sortField = context.relBuilder.field("grand_total");
// For MIN and EARLIEST, top results should be the minimum ones
sortField =
aggFunction == BuiltinFunctionName.MIN || aggFunction == BuiltinFunctionName.EARLIEST
? sortField
: context.relBuilder.desc(sortField);
context.relBuilder.sort(sortField);
if (limit > 0) {
context.relBuilder.limit(0, limit);
}
Expand All @@ -2089,18 +2121,25 @@ private RelNode buildFinalResultWithOther(
RelNode completeResults,
RelNode topCategories,
String byFieldName,
String valueFunctionName,
BuiltinFunctionName aggFunction,
String aggFieldAlias,
boolean useOther,
int limit,
CalcitePlanContext context) {

// Use zero-filling for count aggregations, standard result for others
if (valueFunctionName.equals("count")) {
if (aggFieldAlias.equals("count")) {
return buildZeroFilledResult(
completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context);
completeResults, topCategories, byFieldName, aggFieldAlias, useOther, limit, context);
} else {
return buildStandardResult(
completeResults, topCategories, byFieldName, valueFunctionName, useOther, context);
completeResults,
topCategories,
byFieldName,
aggFunction,
aggFieldAlias,
useOther,
context);
}
}

Expand All @@ -2109,7 +2148,8 @@ private RelNode buildStandardResult(
RelNode completeResults,
RelNode topCategories,
String byFieldName,
String valueFunctionName,
BuiltinFunctionName aggFunctionName,
String aggFieldAlias,
boolean useOther,
CalcitePlanContext context) {

Expand All @@ -2132,11 +2172,13 @@ private RelNode buildStandardResult(
context.relBuilder.project(
context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"),
context.relBuilder.alias(categoryExpr, byFieldName),
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));

RexInputRef valueField = context.relBuilder.field(2);
AggCall aggCall = buildAggCall(context.relBuilder, aggFunctionName, valueField);
context.relBuilder.aggregate(
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName));
aggCall.as(aggFieldAlias));

applyFiltersAndSort(useOther, context);
return context.relBuilder.peek();
Expand Down Expand Up @@ -2171,7 +2213,7 @@ private RelNode buildZeroFilledResult(
RelNode completeResults,
RelNode topCategories,
String byFieldName,
String valueFunctionName,
String aggFieldAlias,
boolean useOther,
int limit,
CalcitePlanContext context) {
Expand Down Expand Up @@ -2210,7 +2252,7 @@ private RelNode buildZeroFilledResult(
context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP),
"@timestamp"),
context.relBuilder.alias(context.relBuilder.field(1), byFieldName),
context.relBuilder.alias(context.relBuilder.literal(0), valueFunctionName));
context.relBuilder.alias(context.relBuilder.literal(0), aggFieldAlias));
RelNode zeroFilledCombinations = context.relBuilder.build();

// Get actual results with OTHER logic applied
Expand All @@ -2232,7 +2274,7 @@ private RelNode buildZeroFilledResult(
context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP),
"@timestamp"),
context.relBuilder.alias(actualCategoryExpr, byFieldName),
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));

context.relBuilder.aggregate(
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
Expand All @@ -2247,12 +2289,30 @@ private RelNode buildZeroFilledResult(
// Aggregate to combine actual and zero-filled data
context.relBuilder.aggregate(
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName));
context.relBuilder.sum(context.relBuilder.field(2)).as(aggFieldAlias));

applyFiltersAndSort(useOther, context);
return context.relBuilder.peek();
}

/**
* Aggregate a field based on a given built-in aggregation function name.
*
* <p>It is intended for secondary aggregations in timechart and chart commands. Using it
* elsewhere may lead to unintended results. It handles explicitly only MIN, MAX, AVG, COUNT,
* DISTINCT_COUNT, EARLIEST, and LATEST. It sums the results for the rest aggregation types,
* assuming them to be accumulative.
*/
private AggCall buildAggCall(
RelBuilder relBuilder, BuiltinFunctionName aggFunction, RexNode node) {
return switch (aggFunction) {
case MIN, EARLIEST -> relBuilder.min(node);
case MAX, LATEST -> relBuilder.max(node);
case AVG -> relBuilder.avg(node);
default -> relBuilder.sum(node);
};
}

@Override
public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,7 @@ public void testExplainWithReverse() throws IOException {
@Test
public void testExplainWithTimechartAvg() throws IOException {
var result = explainQueryYaml("source=events | timechart span=1m avg(cpu_usage) by host");
String expected =
!isPushdownDisabled()
? loadFromFile("expectedOutput/calcite/explain_timechart.yaml")
: loadFromFile("expectedOutput/calcite/explain_timechart_no_pushdown.yaml");
String expected = loadExpectedPlan("explain_timechart.yaml");
assertYamlEqualsIgnoreId(expected, result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,27 +183,13 @@ public void testTimechartWithLimit() throws IOException {
schema("host", "string"),
schema("avg(cpu_usage)", "double"));

// Verify we have rows for web-01, web-02, and OTHER
boolean foundWeb01 = false;
boolean foundWeb02 = false;
boolean foundOther = false;

for (int i = 0; i < result.getJSONArray("datarows").length(); i++) {
Object[] row = result.getJSONArray("datarows").getJSONArray(i).toList().toArray();
String label = (String) row[1];

if ("web-01".equals(label)) {
foundWeb01 = true;
} else if ("web-02".equals(label)) {
foundWeb02 = true;
} else if ("OTHER".equals(label)) {
foundOther = true;
}
}

assertTrue("web-01 not found in results", foundWeb01);
assertTrue("web-02 not found in results", foundWeb02);
assertTrue("OTHER category not found in results", foundOther);
verifyDataRows(
result,
rows("2024-07-01 00:00:00", "web-01", 45.2),
rows("2024-07-01 00:01:00", "OTHER", 38.7),
rows("2024-07-01 00:02:00", "web-01", 55.3),
rows("2024-07-01 00:03:00", "db-01", 42.1),
rows("2024-07-01 00:04:00", "OTHER", 41.8));
}

@Test
Expand Down Expand Up @@ -383,7 +369,7 @@ public void testTimechartWithLimitAndUseOther() throws IOException {

if ("OTHER".equals(host)) {
foundOther = true;
assertEquals(330.4, cpuUsage, 0.1);
assertEquals(41.3, cpuUsage, 0.1);
} else if ("web-03".equals(host)) {
foundWeb03 = true;
assertEquals(55.3, cpuUsage, 0.1);
Expand Down
Loading
Loading