diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py index 181f7e8966..26f999a053 100644 --- a/redis/commands/timeseries/commands.py +++ b/redis/commands/timeseries/commands.py @@ -741,7 +741,7 @@ def __range_params( from_time: int | str, to_time: int | str, count: int | None, - aggregation_type: str | None, + aggregation_type: str | list[str] | None, bucket_size_msec: int | None, filter_by_ts: List[int] | None, filter_by_min_value: int | None, @@ -771,7 +771,7 @@ def range( from_time: int | str, to_time: int | str, count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, filter_by_ts: List[int] | None = None, filter_by_min_value: int | None = None, @@ -789,7 +789,7 @@ def range( from_time: int | str, to_time: int | str, count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, filter_by_ts: List[int] | None = None, filter_by_min_value: int | None = None, @@ -806,7 +806,7 @@ def range( from_time: int | str, to_time: int | str, count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, filter_by_ts: List[int] | None = None, filter_by_min_value: int | None = None, @@ -833,9 +833,12 @@ def range( count: Limits the number of returned samples. aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, - `twa`, 'countNaN', 'countAll'] + Optional aggregation type. Can be a single string or a list of strings + for multiple aggregators (requires Redis 8.8+). Valid values: + [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, + `std.p`, `std.s`, `var.p`, `var.s`, `twa`, `countNaN`, `countAll`]. + When a list is passed, each sample in the response contains values + in the same order as the specified aggregators. bucket_size_msec: Time bucket for aggregation in milliseconds. filter_by_ts: @@ -881,7 +884,7 @@ def revrange( from_time: int | str, to_time: int | str, count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, filter_by_ts: List[int] | None = None, filter_by_min_value: int | None = None, @@ -899,7 +902,7 @@ def revrange( from_time: int | str, to_time: int | str, count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, filter_by_ts: List[int] | None = None, filter_by_min_value: int | None = None, @@ -916,7 +919,7 @@ def revrange( from_time: int | str, to_time: int | str, count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, filter_by_ts: List[int] | None = None, filter_by_min_value: int | None = None, @@ -945,9 +948,12 @@ def revrange( count: Limits the number of returned samples. aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, - `twa`, 'countNaN', 'countAll'] + Optional aggregation type. Can be a single string or a list of strings + for multiple aggregators (requires Redis 8.8+). Valid values: + [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, + `std.p`, `std.s`, `var.p`, `var.s`, `twa`, `countNaN`, `countAll`]. + When a list is passed, each sample in the response contains values + in the same order as the specified aggregators. bucket_size_msec: Time bucket for aggregation in milliseconds. filter_by_ts: @@ -988,7 +994,7 @@ def revrange( def __mrange_params( self, - aggregation_type: str | None, + aggregation_type: str | list[str] | None, bucket_size_msec: int | None, count: int | None, filters: List[str], @@ -1007,6 +1013,14 @@ def __mrange_params( empty: bool | None, ): """Create TS.MRANGE and TS.MREVRANGE arguments.""" + if ( + groupby is not None + and isinstance(aggregation_type, list) + and len(aggregation_type) > 1 + ): + raise DataError( + "GROUPBY is not allowed when multiple aggregators are specified" + ) params: list[EncodableT] = [from_time, to_time] self._append_latest(params, latest) self._append_filer_by_ts(params, filter_by_ts) @@ -1029,7 +1043,7 @@ def mrange( to_time: int | str, filters: List[str], count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, with_labels: bool | None = False, filter_by_ts: List[int] | None = None, @@ -1051,7 +1065,7 @@ def mrange( to_time: int | str, filters: List[str], count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, with_labels: bool | None = False, filter_by_ts: List[int] | None = None, @@ -1072,7 +1086,7 @@ def mrange( to_time: int | str, filters: List[str], count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, with_labels: bool | None = False, filter_by_ts: List[int] | None = None, @@ -1105,9 +1119,13 @@ def mrange( count: Limits the number of returned samples. aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, - `twa`, 'countNaN', 'countAll'] + Optional aggregation type. Can be a single string or a list of strings + for multiple aggregators (requires Redis 8.8+). Valid values: + [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, + `std.p`, `std.s`, `var.p`, `var.s`, `twa`, `countNaN`, `countAll`]. + When a list is passed, each sample in the response contains values + in the same order as the specified aggregators. + Note: GROUPBY is not allowed when multiple aggregators are specified. bucket_size_msec: Time bucket for aggregation in milliseconds. with_labels: @@ -1169,7 +1187,7 @@ def mrevrange( to_time: int | str, filters: List[str], count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, with_labels: bool | None = False, filter_by_ts: List[int] | None = None, @@ -1191,7 +1209,7 @@ def mrevrange( to_time: int | str, filters: List[str], count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, with_labels: bool | None = False, filter_by_ts: List[int] | None = None, @@ -1212,7 +1230,7 @@ def mrevrange( to_time: int | str, filters: List[str], count: int | None = None, - aggregation_type: str | None = None, + aggregation_type: str | list[str] | None = None, bucket_size_msec: int | None = 0, with_labels: bool | None = False, filter_by_ts: List[int] | None = None, @@ -1245,9 +1263,13 @@ def mrevrange( count: Limits the number of returned samples. aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, - `twa`, 'countNaN', 'countAll']. + Optional aggregation type. Can be a single string or a list of strings + for multiple aggregators (requires Redis 8.8+). Valid values: + [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, + `std.p`, `std.s`, `var.p`, `var.s`, `twa`, `countNaN`, `countAll`]. + When a list is passed, each sample in the response contains values + in the same order as the specified aggregators. + Note: GROUPBY is not allowed when multiple aggregators are specified. bucket_size_msec: Time bucket for aggregation in milliseconds. with_labels: @@ -1485,12 +1507,17 @@ def _append_align(params: list[EncodableT], align: int | str | None): @staticmethod def _append_aggregation( params: list[EncodableT], - aggregation_type: str | None, + aggregation_type: str | list[str] | None, bucket_size_msec: int | None, ): """Append AGGREGATION property to params.""" if aggregation_type is not None: - params.extend(["AGGREGATION", aggregation_type, bucket_size_msec]) + if isinstance(aggregation_type, list): + params.extend( + ["AGGREGATION", ",".join(aggregation_type), bucket_size_msec] + ) + else: + params.extend(["AGGREGATION", aggregation_type, bucket_size_msec]) @staticmethod def _append_chunk_size(params: list[EncodableT], chunk_size: int | None): diff --git a/redis/commands/timeseries/utils.py b/redis/commands/timeseries/utils.py index d582e6208c..5f72ddfdf3 100644 --- a/redis/commands/timeseries/utils.py +++ b/redis/commands/timeseries/utils.py @@ -5,6 +5,11 @@ def _pairs_to_dict(pairs): def parse_range(response, **kwargs): """Parse range response. Used by TS.RANGE and TS.REVRANGE.""" + if not response: + return [] + # Multi-aggregator: samples have >2 elements [timestamp, val1, val2, ...] + if len(response[0]) > 2: + return [[r[0]] + [float(v) for v in r[1:]] for r in response] return [[r[0], float(r[1])] for r in response] diff --git a/tests/test_asyncio/test_timeseries.py b/tests/test_asyncio/test_timeseries.py index 6e95ee4f1f..182b34aee5 100644 --- a/tests/test_asyncio/test_timeseries.py +++ b/tests/test_asyncio/test_timeseries.py @@ -865,3 +865,129 @@ async def test_mrevrange_with_count_nan_count_all_aggregators(decoded_r: redis.R assert 2 == len(data_points) assert [[1000, 2.0]] == data_points["temperature:A"][2] assert [[1000, 2.0]] == data_points["temperature:B"][2] + + +@pytest.mark.redismod +@skip_if_server_version_lt("8.7.0") +async def test_range_multiple_aggregators(decoded_r: redis.Redis): + """Test TS.RANGE with multiple aggregators (Redis 8.8+).""" + await decoded_r.ts().create("ts:multi_agg") + await decoded_r.ts().add("ts:multi_agg", 1000, 10) + await decoded_r.ts().add("ts:multi_agg", 1001, 20) + await decoded_r.ts().add("ts:multi_agg", 1002, 30) + + result = await decoded_r.ts().range( + "ts:multi_agg", + 1000, + 1002, + aggregation_type=["min", "max", "avg"], + bucket_size_msec=10, + ) + assert len(result) == 1 + assert result[0][0] == 1000 # timestamp + assert result[0][1] == 10.0 # min + assert result[0][2] == 30.0 # max + assert result[0][3] == 20.0 # avg + + +@pytest.mark.redismod +@skip_if_server_version_lt("8.7.0") +async def test_revrange_multiple_aggregators(decoded_r: redis.Redis): + """Test TS.REVRANGE with multiple aggregators (Redis 8.8+).""" + await decoded_r.ts().create("ts:multi_agg") + await decoded_r.ts().add("ts:multi_agg", 1000, 10) + await decoded_r.ts().add("ts:multi_agg", 1001, 20) + await decoded_r.ts().add("ts:multi_agg", 1002, 30) + + result = await decoded_r.ts().revrange( + "ts:multi_agg", + 1000, + 1002, + aggregation_type=["min", "max", "avg"], + bucket_size_msec=10, + ) + assert len(result) == 1 + assert result[0][0] == 1000 # timestamp + assert result[0][1] == 10.0 # min + assert result[0][2] == 30.0 # max + assert result[0][3] == 20.0 # avg + + +@pytest.mark.redismod +@skip_if_server_version_lt("8.7.0") +async def test_mrange_multiple_aggregators(decoded_r: redis.Redis): + """Test TS.MRANGE with multiple aggregators (Redis 8.8+).""" + await decoded_r.ts().create("ts:multi_agg_a", labels={"type": "test_multi_agg"}) + await decoded_r.ts().add("ts:multi_agg_a", 1000, 10) + await decoded_r.ts().add("ts:multi_agg_a", 1001, 20) + + result = await decoded_r.ts().mrange( + 1000, + 1001, + filters=["type=test_multi_agg"], + aggregation_type=["min", "max"], + bucket_size_msec=10, + ) + assert "ts:multi_agg_a" in result + samples = result["ts:multi_agg_a"][2] + assert len(samples) == 1 + assert samples[0][0] == 1000 # timestamp + assert samples[0][1] == 10.0 # min + assert samples[0][2] == 20.0 # max + + +@pytest.mark.redismod +@skip_if_server_version_lt("8.7.0") +async def test_mrevrange_multiple_aggregators(decoded_r: redis.Redis): + """Test TS.MREVRANGE with multiple aggregators (Redis 8.8+).""" + await decoded_r.ts().create("ts:multi_agg_b", labels={"type": "test_multi_agg_rev"}) + await decoded_r.ts().add("ts:multi_agg_b", 1000, 10) + await decoded_r.ts().add("ts:multi_agg_b", 1001, 20) + + result = await decoded_r.ts().mrevrange( + 1000, + 1001, + filters=["type=test_multi_agg_rev"], + aggregation_type=["min", "max"], + bucket_size_msec=10, + ) + assert "ts:multi_agg_b" in result + samples = result["ts:multi_agg_b"][2] + assert len(samples) == 1 + assert samples[0][0] == 1000 # timestamp + assert samples[0][1] == 10.0 # min + assert samples[0][2] == 20.0 # max + + +@pytest.mark.redismod +async def test_mrange_groupby_multiple_aggregators_raises(decoded_r: redis.Redis): + """Test that GROUPBY with multiple aggregators raises DataError.""" + await decoded_r.ts().create("ts:gb_test", labels={"type": "test_gb"}) + + with pytest.raises(redis.DataError, match="GROUPBY is not allowed"): + await decoded_r.ts().mrange( + 0, + 100, + filters=["type=test_gb"], + aggregation_type=["min", "max"], + bucket_size_msec=10, + groupby="type", + reduce="max", + ) + + +@pytest.mark.redismod +async def test_mrevrange_groupby_multiple_aggregators_raises(decoded_r: redis.Redis): + """Test that GROUPBY with multiple aggregators raises DataError.""" + await decoded_r.ts().create("ts:gb_test2", labels={"type": "test_gb2"}) + + with pytest.raises(redis.DataError, match="GROUPBY is not allowed"): + await decoded_r.ts().mrevrange( + 0, + 100, + filters=["type=test_gb2"], + aggregation_type=["min", "max"], + bucket_size_msec=10, + groupby="type", + reduce="max", + ) diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 48248d0698..d469ac7cd8 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -1120,3 +1120,129 @@ def test_mrevrange_with_count_nan_count_all_aggregators(client): assert 2 == len(data_points) assert [[1000, 2.0]] == data_points["temperature:A"][2] assert [[1000, 2.0]] == data_points["temperature:B"][2] + + +@pytest.mark.redismod +@skip_if_server_version_lt("8.7.0") +def test_range_multiple_aggregators(client): + """Test TS.RANGE with multiple aggregators (Redis 8.8+).""" + client.ts().create("ts:multi_agg") + client.ts().add("ts:multi_agg", 1000, 10) + client.ts().add("ts:multi_agg", 1001, 20) + client.ts().add("ts:multi_agg", 1002, 30) + + result = client.ts().range( + "ts:multi_agg", + 1000, + 1002, + aggregation_type=["min", "max", "avg"], + bucket_size_msec=10, + ) + assert len(result) == 1 + assert result[0][0] == 1000 # timestamp + assert result[0][1] == 10.0 # min + assert result[0][2] == 30.0 # max + assert result[0][3] == 20.0 # avg + + +@pytest.mark.redismod +@skip_if_server_version_lt("8.7.0") +def test_revrange_multiple_aggregators(client): + """Test TS.REVRANGE with multiple aggregators (Redis 8.8+).""" + client.ts().create("ts:multi_agg") + client.ts().add("ts:multi_agg", 1000, 10) + client.ts().add("ts:multi_agg", 1001, 20) + client.ts().add("ts:multi_agg", 1002, 30) + + result = client.ts().revrange( + "ts:multi_agg", + 1000, + 1002, + aggregation_type=["min", "max", "avg"], + bucket_size_msec=10, + ) + assert len(result) == 1 + assert result[0][0] == 1000 # timestamp + assert result[0][1] == 10.0 # min + assert result[0][2] == 30.0 # max + assert result[0][3] == 20.0 # avg + + +@pytest.mark.redismod +@skip_if_server_version_lt("8.7.0") +def test_mrange_multiple_aggregators(client): + """Test TS.MRANGE with multiple aggregators (Redis 8.8+).""" + client.ts().create("ts:multi_agg_a", labels={"type": "test_multi_agg"}) + client.ts().add("ts:multi_agg_a", 1000, 10) + client.ts().add("ts:multi_agg_a", 1001, 20) + + result = client.ts().mrange( + 1000, + 1001, + filters=["type=test_multi_agg"], + aggregation_type=["min", "max"], + bucket_size_msec=10, + ) + assert "ts:multi_agg_a" in result + samples = result["ts:multi_agg_a"][2] + assert len(samples) == 1 + assert samples[0][0] == 1000 # timestamp + assert samples[0][1] == 10.0 # min + assert samples[0][2] == 20.0 # max + + +@pytest.mark.redismod +@skip_if_server_version_lt("8.7.0") +def test_mrevrange_multiple_aggregators(client): + """Test TS.MREVRANGE with multiple aggregators (Redis 8.8+).""" + client.ts().create("ts:multi_agg_b", labels={"type": "test_multi_agg_rev"}) + client.ts().add("ts:multi_agg_b", 1000, 10) + client.ts().add("ts:multi_agg_b", 1001, 20) + + result = client.ts().mrevrange( + 1000, + 1001, + filters=["type=test_multi_agg_rev"], + aggregation_type=["min", "max"], + bucket_size_msec=10, + ) + assert "ts:multi_agg_b" in result + samples = result["ts:multi_agg_b"][2] + assert len(samples) == 1 + assert samples[0][0] == 1000 # timestamp + assert samples[0][1] == 10.0 # min + assert samples[0][2] == 20.0 # max + + +@pytest.mark.redismod +def test_mrange_groupby_multiple_aggregators_raises(client): + """Test that GROUPBY with multiple aggregators raises DataError.""" + client.ts().create("ts:gb_test", labels={"type": "test_gb"}) + + with pytest.raises(redis.exceptions.DataError, match="GROUPBY is not allowed"): + client.ts().mrange( + 0, + 100, + filters=["type=test_gb"], + aggregation_type=["min", "max"], + bucket_size_msec=10, + groupby="type", + reduce="max", + ) + + +@pytest.mark.redismod +def test_mrevrange_groupby_multiple_aggregators_raises(client): + """Test that GROUPBY with multiple aggregators raises DataError.""" + client.ts().create("ts:gb_test2", labels={"type": "test_gb2"}) + + with pytest.raises(redis.exceptions.DataError, match="GROUPBY is not allowed"): + client.ts().mrevrange( + 0, + 100, + filters=["type=test_gb2"], + aggregation_type=["min", "max"], + bucket_size_msec=10, + groupby="type", + reduce="max", + )