Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ private Timechart timechart(UnresolvedExpression newAggregateFunction) {
return this.toBuilder().aggregateFunction(newAggregateFunction).build();
}

/** TODO: extend to support additional per_* functions */
@RequiredArgsConstructor
static class PerFunction {
private static final Map<String, Integer> UNIT_SECONDS = Map.of("per_second", 1);
private static final Map<String, Integer> UNIT_SECONDS =
Map.of(
"per_second", 1,
"per_minute", 60,
"per_hour", 3600,
"per_day", 86400);
private final String aggName;
private final UnresolvedExpression aggArg;
private final int seconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral;
import static org.opensearch.sql.ast.dsl.AstDSL.relation;

import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.opensearch.sql.ast.dsl.AstDSL;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Let;
Expand All @@ -26,8 +28,23 @@

class TimechartTest {

/**
* @return test sources for per_* function test.
*/
private static Stream<Arguments> perFuncTestSources() {
return Stream.of(
Arguments.of(30, "s", "SECOND"),
Arguments.of(5, "m", "MINUTE"),
Arguments.of(2, "h", "HOUR"),
Arguments.of(1, "d", "DAY"),
Arguments.of(1, "w", "WEEK"),
Arguments.of(1, "M", "MONTH"),
Arguments.of(1, "q", "QUARTER"),
Arguments.of(1, "y", "YEAR"));
}

@ParameterizedTest
@CsvSource({"1, m, MINUTE", "30, s, SECOND", "5, m, MINUTE", "2, h, HOUR", "1, d, DAY"})
@MethodSource("perFuncTestSources")
void should_transform_per_second_for_different_spans(
int spanValue, String spanUnit, String expectedIntervalUnit) {
withTimechart(span(spanValue, spanUnit), perSecond("bytes"))
Expand All @@ -45,6 +62,63 @@ void should_transform_per_second_for_different_spans(
timechart(span(spanValue, spanUnit), alias("per_second(bytes)", sum("bytes")))));
}

@ParameterizedTest
@MethodSource("perFuncTestSources")
void should_transform_per_minute_for_different_spans(
int spanValue, String spanUnit, String expectedIntervalUnit) {
withTimechart(span(spanValue, spanUnit), perMinute("bytes"))
.whenTransformingPerFunction()
.thenExpect(
eval(
let(
"per_minute(bytes)",
divide(
multiply("per_minute(bytes)", 60.0),
timestampdiff(
"SECOND",
"@timestamp",
timestampadd(expectedIntervalUnit, spanValue, "@timestamp")))),
timechart(span(spanValue, spanUnit), alias("per_minute(bytes)", sum("bytes")))));
}

@ParameterizedTest
@MethodSource("perFuncTestSources")
void should_transform_per_hour_for_different_spans(
int spanValue, String spanUnit, String expectedIntervalUnit) {
withTimechart(span(spanValue, spanUnit), perHour("bytes"))
.whenTransformingPerFunction()
.thenExpect(
eval(
let(
"per_hour(bytes)",
divide(
multiply("per_hour(bytes)", 3600.0),
timestampdiff(
"SECOND",
"@timestamp",
timestampadd(expectedIntervalUnit, spanValue, "@timestamp")))),
timechart(span(spanValue, spanUnit), alias("per_hour(bytes)", sum("bytes")))));
}

@ParameterizedTest
@MethodSource("perFuncTestSources")
void should_transform_per_day_for_different_spans(
int spanValue, String spanUnit, String expectedIntervalUnit) {
withTimechart(span(spanValue, spanUnit), perDay("bytes"))
.whenTransformingPerFunction()
.thenExpect(
eval(
let(
"per_day(bytes)",
divide(
multiply("per_day(bytes)", 86400.0),
timestampdiff(
"SECOND",
"@timestamp",
timestampadd(expectedIntervalUnit, spanValue, "@timestamp")))),
timechart(span(spanValue, spanUnit), alias("per_day(bytes)", sum("bytes")))));
}

@Test
void should_not_transform_non_per_functions() {
withTimechart(span(1, "m"), sum("bytes"))
Expand Down Expand Up @@ -104,6 +178,18 @@ private static AggregateFunction perSecond(String fieldName) {
return (AggregateFunction) aggregate("per_second", field(fieldName));
}

private static AggregateFunction perMinute(String fieldName) {
return (AggregateFunction) aggregate("per_minute", field(fieldName));
}

private static AggregateFunction perHour(String fieldName) {
return (AggregateFunction) aggregate("per_hour", field(fieldName));
}

private static AggregateFunction perDay(String fieldName) {
return (AggregateFunction) aggregate("per_day", field(fieldName));
}

private static AggregateFunction sum(String fieldName) {
return (AggregateFunction) aggregate("sum", field(fieldName));
}
Expand Down
30 changes: 26 additions & 4 deletions docs/user/ppl/cmd/timechart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,36 @@ Syntax
PER_SECOND
----------

Description
>>>>>>>>>>>

Usage: per_second(field) calculates the per-second rate for a numeric field within each time bucket.

The calculation formula is: `per_second(field) = sum(field) / span_in_seconds`, where `span_in_seconds` is the span interval in seconds.

Note: This function is available since 3.4.0.
Return type: DOUBLE

PER_MINUTE
----------

Usage: per_minute(field) calculates the per-minute rate for a numeric field within each time bucket.

The calculation formula is: `per_minute(field) = sum(field) * 60 / span_in_seconds`, where `span_in_seconds` is the span interval in seconds.

Return type: DOUBLE

PER_HOUR
--------

Usage: per_hour(field) calculates the per-hour rate for a numeric field within each time bucket.

The calculation formula is: `per_hour(field) = sum(field) * 3600 / span_in_seconds`, where `span_in_seconds` is the span interval in seconds.

Return type: DOUBLE

PER_DAY
-------

Usage: per_day(field) calculates the per-day rate for a numeric field within each time bucket.

The calculation formula is: `per_day(field) = sum(field) * 86400 / span_in_seconds`, where `span_in_seconds` is the span interval in seconds.

Return type: DOUBLE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,36 @@ public void testExplainTimechartPerSecond() throws IOException {
assertTrue(result.contains("per_second(cpu_usage)=[SUM($0)]"));
}

@Test
public void testExplainTimechartPerMinute() throws IOException {
var result = explainQueryToString("source=events | timechart span=2m per_minute(cpu_usage)");
assertTrue(
result.contains(
"per_minute(cpu_usage)=[DIVIDE(*($1, 60.0E0), "
+ "TIMESTAMPDIFF('SECOND':VARCHAR, $0, TIMESTAMPADD('MINUTE':VARCHAR, 2, $0)))]"));
assertTrue(result.contains("per_minute(cpu_usage)=[SUM($0)]"));
}

@Test
public void testExplainTimechartPerHour() throws IOException {
var result = explainQueryToString("source=events | timechart span=2m per_hour(cpu_usage)");
assertTrue(
result.contains(
"per_hour(cpu_usage)=[DIVIDE(*($1, 3600.0E0), "
+ "TIMESTAMPDIFF('SECOND':VARCHAR, $0, TIMESTAMPADD('MINUTE':VARCHAR, 2, $0)))]"));
assertTrue(result.contains("per_hour(cpu_usage)=[SUM($0)]"));
}

@Test
public void testExplainTimechartPerDay() throws IOException {
var result = explainQueryToString("source=events | timechart span=2m per_day(cpu_usage)");
assertTrue(
result.contains(
"per_day(cpu_usage)=[DIVIDE(*($1, 86400.0E0), "
+ "TIMESTAMPDIFF('SECOND':VARCHAR, $0, TIMESTAMPADD('MINUTE':VARCHAR, 2, $0)))]"));
assertTrue(result.contains("per_day(cpu_usage)=[SUM($0)]"));
}

@Test
public void noPushDownForAggOnWindow() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,104 @@ public void testTimechartPerSecondWithVariableMonthLengths() throws IOException
rows("2025-02-01 00:00:00", 7.75), // 18748800 / 28 days' seconds
rows("2025-10-01 00:00:00", 7.0)); // 18748800 / 31 days' seconds
}

@Test
public void testTimechartPerMinuteWithSpecifiedSpan() throws IOException {
JSONObject result =
executeQuery(
"source=events_traffic | where month(@timestamp) = 9 | timechart span=2m"
+ " per_minute(packets)");

verifySchema(
result, schema("@timestamp", "timestamp"), schema("per_minute(packets)", "double"));
verifyDataRows(
result,
rows("2025-09-08 10:00:00", 90.0), // (60+120) / 2m
rows("2025-09-08 10:02:00", 120.0)); // (60+180) / 2m
}

@Test
public void testTimechartPerMinuteWithByClause() throws IOException {
JSONObject result =
executeQuery(
"source=events_traffic | where month(@timestamp) = 9 | timechart span=2m"
+ " per_minute(packets) by host");

verifySchema(
result,
schema("@timestamp", "timestamp"),
schema("host", "string"),
schema("per_minute(packets)", "double"));
verifyDataRows(
result,
rows("2025-09-08 10:00:00", "server1", 90.0), // (60+120) / 2m
rows("2025-09-08 10:02:00", "server1", 30.0), // 60 / 2m
rows("2025-09-08 10:02:00", "server2", 90.0)); // 180 / 2m
}

@Test
public void testTimechartPerHourWithSpecifiedSpan() throws IOException {
JSONObject result =
executeQuery(
"source=events_traffic | where month(@timestamp) = 9 | timechart span=2m"
+ " per_hour(packets)");

verifySchema(result, schema("@timestamp", "timestamp"), schema("per_hour(packets)", "double"));
verifyDataRows(
result,
rows("2025-09-08 10:00:00", 5400.0), // (60+120) * 30
rows("2025-09-08 10:02:00", 7200.0)); // (60+180) * 30
}

@Test
public void testTimechartPerHourWithByClause() throws IOException {
JSONObject result =
executeQuery(
"source=events_traffic | where month(@timestamp) = 9 | timechart span=2m"
+ " per_hour(packets) by host");

verifySchema(
result,
schema("@timestamp", "timestamp"),
schema("host", "string"),
schema("per_hour(packets)", "double"));
verifyDataRows(
result,
rows("2025-09-08 10:00:00", "server1", 5400.0), // (60+120) * 30
rows("2025-09-08 10:02:00", "server1", 1800.0), // 60 * 30
rows("2025-09-08 10:02:00", "server2", 5400.0)); // 180 * 30
}

@Test
public void testTimechartPerDayWithSpecifiedSpan() throws IOException {
JSONObject result =
executeQuery(
"source=events_traffic | where month(@timestamp) = 9 | timechart span=2m"
+ " per_day(packets)");

verifySchema(result, schema("@timestamp", "timestamp"), schema("per_day(packets)", "double"));
verifyDataRows(
result,
rows("2025-09-08 10:00:00", 129600.0), // (60+120) * 720
rows("2025-09-08 10:02:00", 172800.0)); // (60+180) * 720
}

@Test
public void testTimechartPerDayWithByClause() throws IOException {
JSONObject result =
executeQuery(
"source=events_traffic | where month(@timestamp) = 9 | timechart span=2m"
+ " per_day(packets) by host");

verifySchema(
result,
schema("@timestamp", "timestamp"),
schema("host", "string"),
schema("per_day(packets)", "double"));
verifyDataRows(
result,
rows("2025-09-08 10:00:00", "server1", 129600.0), // (60+120) * 720
rows("2025-09-08 10:02:00", "server1", 43200.0), // 60 * 720
rows("2025-09-08 10:02:00", "server2", 129600.0)); // 180 * 720
}
}
2 changes: 1 addition & 1 deletion ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ percentileApproxFunction
;

perFunction
: funcName=PER_SECOND LT_PRTHS functionArg RT_PRTHS
: funcName=(PER_SECOND | PER_MINUTE | PER_HOUR | PER_DAY) LT_PRTHS functionArg RT_PRTHS
;

numericLiteral
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ public void testPerSecondFunctionInTimechartShouldPass() {
assertNotEquals(null, tree);
}

@Test
public void testPerMinuteFunctionInTimechartShouldPass() {
ParseTree tree = new PPLSyntaxParser().parse("source=t | timechart per_minute(a)");
assertNotEquals(null, tree);
}

@Test
public void testPerHourFunctionInTimechartShouldPass() {
ParseTree tree = new PPLSyntaxParser().parse("source=t | timechart per_hour(a)");
assertNotEquals(null, tree);
}

@Test
public void testPerDayFunctionInTimechartShouldPass() {
ParseTree tree = new PPLSyntaxParser().parse("source=t | timechart per_day(a)");
assertNotEquals(null, tree);
}

@Test
public void testDynamicSourceClauseParseTreeStructure() {
String query = "source=[myindex, logs, fieldIndex=\"test\", count=100]";
Expand Down
Loading
Loading