Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0acdd51
WIP: implementing case range analyzer
yuancu Sep 26, 2025
6afdcb6
Correct case analyzer
yuancu Sep 26, 2025
f416dee
Create bucket aggregation parsers that supports parsing nested sub ag…
yuancu Sep 26, 2025
cbcb25a
Fix unit tests
yuancu Sep 26, 2025
7ec0684
Merge remote-tracking branch 'origin/main' into issues/4201
yuancu Sep 28, 2025
51813f0
Fix parsers to multi-range cases
yuancu Sep 28, 2025
373e825
Update leaf bucket parser
yuancu Sep 28, 2025
f3830a5
Unit test case range analyzer
yuancu Sep 28, 2025
da5a9c5
Add explain ITs for pushing down case in aggregations
yuancu Sep 28, 2025
b46dd95
Update CaseRangeAnalyzerTest
yuancu Sep 28, 2025
0660994
Merge remote-tracking branch 'origin/main' into issues/4201
yuancu Sep 28, 2025
0493629
Add a yaml test that replicates issue 4201
yuancu Sep 28, 2025
f690add
Add integration tests for case in aggregation
yuancu Sep 28, 2025
b204b2e
Fix unit tests
yuancu Sep 28, 2025
4dc86db
Add a patch to CalcitePPLCaseFunctionIT
yuancu Sep 28, 2025
d38a916
Migrate all composite aggregation parser usage to bucket aggregate pa…
yuancu Sep 29, 2025
6beed21
Create a parent abstract classes for BucketAggregationParsers
yuancu Sep 29, 2025
d40c244
Remove an unnecessary bucket agg in AggregationQueryBuilder
yuancu Sep 29, 2025
606e346
Test pushing down case where there exists null values
yuancu Sep 29, 2025
a5fdd66
Return empty in CaseRangeAnalyzer to unblock the rest pushdown
yuancu Sep 29, 2025
fdb9886
Document limitations of pushding case as range queries
yuancu Sep 29, 2025
e4c5266
Merge remote-tracking branch 'origin/main' into issues/4201
yuancu Oct 9, 2025
7a8db58
Merge remote-tracking branch 'origin/main' into issues/4201
yuancu Oct 11, 2025
0ca81aa
Make case pushdown a private method
yuancu Oct 11, 2025
e701e57
Chores: remove unused helper method
yuancu Oct 11, 2025
4968d1c
Merge remote-tracking branch 'origin/main' into issues/4201
yuancu Oct 13, 2025
a22ae79
Unify logics for creating nested aggregations
yuancu Oct 14, 2025
010fd06
Remove a note in condition.rst
yuancu Oct 14, 2025
7d82cd8
Merge remote-tracking branch 'origin/main' into issues/4201
yuancu Oct 15, 2025
3b65b2d
Optmize range aggregation
yuancu Oct 15, 2025
91aaee8
Ignore testNestedAggregationsExplain when pushdown is disabled
yuancu Oct 16, 2025
b8bb898
Merge remote-tracking branch 'origin/main' into issues/4201
yuancu Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -967,19 +967,15 @@ void populate() {
XOR,
SqlStdOperatorTable.NOT_EQUALS,
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.BOOLEAN));
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a
// type checker
// for it. The second and third operands are required to be of the same type. If
// not,
// it will throw an IllegalArgumentException with information Can't find
// leastRestrictive type
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a type checker
// for it. The second and third operands are required to be of the same type. If not, it will
// throw an IllegalArgumentException with information Can't find leastRestrictive type
registerOperator(
IF,
SqlStdOperatorTable.CASE,
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.ANY, SqlTypeFamily.ANY));
// Re-define the type checker for is not null, is present, and is null since
// their original
// type checker ANY isn't compatible with struct types.
// their original type checker ANY isn't compatible with struct types.
registerOperator(
IS_NOT_NULL,
SqlStdOperatorTable.IS_NOT_NULL,
Expand Down
8 changes: 8 additions & 0 deletions docs/user/ppl/functions/condition.rst
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ Argument type: all the supported data type, (NOTE : there is no comma before "el

Return type: any

Limitations
>>>>>>>>>>>

When each condition is a field comparison with a numeric literal and each result expression is a string literal, the query will be optimized as `range aggregations <https://docs.opensearch.org/latest/aggregations/bucket/range>`_ if pushdown optimization is enabled. However, this optimization has the following limitations:
Comment on lines +230 to +233
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it's not a limitation of case function, it is just a restricted optimization. We can just call out in what case, the case function would be optimized to range DSL. Can we add some optimizable case usages in user doc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the stated conditions are in the scope of restricted optimizations, but the limitations are not because we will still do the optimization regardless of whether it has null values in its column or whether there is a default NULL range.

The problem is that there is no way to know in advance whether there exists null values in a column. Therefore, if we do this optimization, we always risk the discrepancy in results of with & without push-down.


- Null values will not be grouped into any bucket of a range aggregation and will be ignored
- The default ELSE clause will use the string literal ``"null"`` instead of actual NULL values

Example::

os> source=accounts | eval result = case(age > 35, firstname, age < 30, lastname else employer) | fields result, firstname, lastname, age, employer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STRINGS;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_TIME_DATA;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WEBLOGS;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORKER;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORK_INFORMATION;
Expand All @@ -18,6 +19,7 @@

import java.io.IOException;
import java.util.Locale;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.opensearch.sql.ppl.ExplainIT;
Expand Down Expand Up @@ -515,22 +517,6 @@ public void testExplainStatsWithSubAggregation() throws IOException {
+ " @timestamp, region"));
}

@Test
public void bucketNullableNotSupportSubAggregation() throws IOException {
// TODO: Don't throw exception after addressing
// https://github.com/opensearch-project/sql/issues/4317
// When bucketNullable is true, sub aggregation is not supported. Hence we cannot pushdown the
// aggregation in this query. Caused by issue
// https://github.com/opensearch-project/sql/issues/4317,
// bin aggregation on timestamp field won't work if not been push down.
enabledOnlyWhenPushdownIsEnabled();
assertThrows(
Exception.class,
() ->
explainQueryToString(
"source=events | bin @timestamp bins=3 | stats count() by @timestamp, region"));
}

@Test
public void testExplainBinWithSpan() throws IOException {
String expected = loadExpectedPlan("explain_bin_span.yaml");
Expand Down Expand Up @@ -1160,4 +1146,127 @@ public void testPushDownMinOrMaxAggOnDerivedField() throws IOException {
+ "| stats MIN(balance2), MAX(balance2)",
TEST_INDEX_ACCOUNT)));
}

@Test
public void testCasePushdownAsRangeQueryExplain() throws IOException {
// CASE 1: Range - Metric
// 1.1 Range - Metric
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_range_metric_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100') |"
+ " stats avg(age) as avg_age by age_range",
TEST_INDEX_BANK)));

// 1.2 Range - Metric (COUNT)
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_range_count_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age < 40, 'u40'"
+ " else 'u100') | stats avg(age) by age_range",
TEST_INDEX_BANK)));

// 1.3 Range - Range - Metric
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_range_range_metric_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100'),"
+ " balance_range = case(balance < 20000, 'medium' else 'high') | stats"
+ " avg(balance) as avg_balance by age_range, balance_range",
TEST_INDEX_BANK)));

// 1.4 Range - Metric (With null & discontinuous ranges)
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_range_metric_complex_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 30, 'u30', (age >= 35 and age < 40) or age"
+ " >= 80, '30-40 or >=80') | stats avg(balance) by age_range",
TEST_INDEX_BANK)));

// 1.5 Should not be pushed because the range is not closed-open
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_case_cannot_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age <= 40, 'u40'"
+ " else 'u100') | stats avg(age) as avg_age by age_range",
TEST_INDEX_BANK)));

// 1.6 Should not be pushed as range query because the result expression is not a string
// literal.
// Range aggregation keys must be strings
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_case_num_res_cannot_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 30, 30 else 100) | stats count() by"
+ " age_range",
TEST_INDEX_BANK)));

// CASE 2: Composite - Range - Metric
// 2.1 Composite (term) - Range - Metric
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_composite_range_metric_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats avg(balance)"
+ " by state, age_range",
TEST_INDEX_BANK)));

// 2.2 Composite (date histogram) - Range - Metric
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_composite_date_range_push.yaml"),
explainQueryToString(
"source=opensearch-sql_test_index_time_data | eval value_range = case(value < 7000,"
+ " 'small' else 'large') | stats avg(value) by value_range, span(@timestamp,"
+ " 1h)"));

// 2.3 Composite(2 fields) - Range - Metric (with count)
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_composite2_range_count_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats"
+ " avg(balance), count() by age_range, state, gender",
TEST_INDEX_BANK)));

// 2.4 Composite (2 fields) - Range - Range - Metric (with count)
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_composite2_range_range_count_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 35, 'u35' else 'a35'), balance_range ="
+ " case(balance < 20000, 'medium' else 'high') | stats avg(balance) as"
+ " avg_balance by age_range, balance_range, state",
TEST_INDEX_BANK)));

// 2.5 Should not be pushed down as range query because case result expression is not constant
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_case_composite_cannot_push.yaml"),
explainQueryToString(
String.format(
"source=%s | eval age_range = case(age < 35, 'u35' else email) | stats avg(balance)"
+ " as avg_balance by age_range, state",
TEST_INDEX_BANK)));
}

@Test
public void testNestedAggregationsExplain() throws IOException {
// TODO: Remove after resolving: https://github.com/opensearch-project/sql/issues/4578
Assume.assumeFalse(
"The query runs into error when pushdown is disabled due to bin's implementation",
isPushdownDisabled());
assertYamlEqualsJsonIgnoreId(
loadExpectedPlan("agg_composite_autodate_range_metric_push.yaml"),
explainQueryToString(
String.format(
"source=%s | bin timestamp bins=3 | eval value_range = case(value < 7000, 'small'"
+ " else 'great') | stats bucket_nullable=false avg(value), count() by"
+ " timestamp, value_range, category",
TEST_INDEX_TIME_DATA)));
}
}
Loading
Loading