Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 55 additions & 28 deletions redis/commands/timeseries/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ def __range_params(
from_time: int | str,
to_time: int | str,
count: int | None,
aggregation_type: str | None,
aggregation_type: str | list[str] | None,
bucket_size_msec: int | None,
filter_by_ts: List[int] | None,
filter_by_min_value: int | None,
Expand Down Expand Up @@ -771,7 +771,7 @@ def range(
from_time: int | str,
to_time: int | str,
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
filter_by_ts: List[int] | None = None,
filter_by_min_value: int | None = None,
Expand All @@ -789,7 +789,7 @@ def range(
from_time: int | str,
to_time: int | str,
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
filter_by_ts: List[int] | None = None,
filter_by_min_value: int | None = None,
Expand All @@ -806,7 +806,7 @@ def range(
from_time: int | str,
to_time: int | str,
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
filter_by_ts: List[int] | None = None,
filter_by_min_value: int | None = None,
Expand All @@ -833,9 +833,12 @@ def range(
count:
Limits the number of returned samples.
aggregation_type:
Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
`range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`,
`twa`, 'countNaN', 'countAll']
Optional aggregation type. Can be a single string or a list of strings
for multiple aggregators (requires Redis 8.8+). Valid values:
[`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`,
`std.p`, `std.s`, `var.p`, `var.s`, `twa`, `countNaN`, `countAll`].
When a list is passed, each sample in the response contains values
in the same order as the specified aggregators.
bucket_size_msec:
Time bucket for aggregation in milliseconds.
filter_by_ts:
Expand Down Expand Up @@ -881,7 +884,7 @@ def revrange(
from_time: int | str,
to_time: int | str,
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
filter_by_ts: List[int] | None = None,
filter_by_min_value: int | None = None,
Expand All @@ -899,7 +902,7 @@ def revrange(
from_time: int | str,
to_time: int | str,
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
filter_by_ts: List[int] | None = None,
filter_by_min_value: int | None = None,
Expand All @@ -916,7 +919,7 @@ def revrange(
from_time: int | str,
to_time: int | str,
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
filter_by_ts: List[int] | None = None,
filter_by_min_value: int | None = None,
Expand Down Expand Up @@ -945,9 +948,12 @@ def revrange(
count:
Limits the number of returned samples.
aggregation_type:
Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
`range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`,
`twa`, 'countNaN', 'countAll']
Optional aggregation type. Can be a single string or a list of strings
for multiple aggregators (requires Redis 8.8+). Valid values:
[`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`,
`std.p`, `std.s`, `var.p`, `var.s`, `twa`, `countNaN`, `countAll`].
When a list is passed, each sample in the response contains values
in the same order as the specified aggregators.
bucket_size_msec:
Time bucket for aggregation in milliseconds.
filter_by_ts:
Expand Down Expand Up @@ -988,7 +994,7 @@ def revrange(

def __mrange_params(
self,
aggregation_type: str | None,
aggregation_type: str | list[str] | None,
bucket_size_msec: int | None,
count: int | None,
filters: List[str],
Expand All @@ -1007,6 +1013,14 @@ def __mrange_params(
empty: bool | None,
):
"""Create TS.MRANGE and TS.MREVRANGE arguments."""
if (
groupby is not None
and isinstance(aggregation_type, list)
and len(aggregation_type) > 1
):
raise DataError(
"GROUPBY is not allowed when multiple aggregators are specified"
)
params: list[EncodableT] = [from_time, to_time]
self._append_latest(params, latest)
self._append_filer_by_ts(params, filter_by_ts)
Expand All @@ -1029,7 +1043,7 @@ def mrange(
to_time: int | str,
filters: List[str],
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
with_labels: bool | None = False,
filter_by_ts: List[int] | None = None,
Expand All @@ -1051,7 +1065,7 @@ def mrange(
to_time: int | str,
filters: List[str],
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
with_labels: bool | None = False,
filter_by_ts: List[int] | None = None,
Expand All @@ -1072,7 +1086,7 @@ def mrange(
to_time: int | str,
filters: List[str],
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
with_labels: bool | None = False,
filter_by_ts: List[int] | None = None,
Expand Down Expand Up @@ -1105,9 +1119,13 @@ def mrange(
count:
Limits the number of returned samples.
aggregation_type:
Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
`range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`,
`twa`, 'countNaN', 'countAll']
Optional aggregation type. Can be a single string or a list of strings
for multiple aggregators (requires Redis 8.8+). Valid values:
[`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`,
`std.p`, `std.s`, `var.p`, `var.s`, `twa`, `countNaN`, `countAll`].
When a list is passed, each sample in the response contains values
in the same order as the specified aggregators.
Note: GROUPBY is not allowed when multiple aggregators are specified.
bucket_size_msec:
Time bucket for aggregation in milliseconds.
with_labels:
Expand Down Expand Up @@ -1169,7 +1187,7 @@ def mrevrange(
to_time: int | str,
filters: List[str],
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
with_labels: bool | None = False,
filter_by_ts: List[int] | None = None,
Expand All @@ -1191,7 +1209,7 @@ def mrevrange(
to_time: int | str,
filters: List[str],
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
with_labels: bool | None = False,
filter_by_ts: List[int] | None = None,
Expand All @@ -1212,7 +1230,7 @@ def mrevrange(
to_time: int | str,
filters: List[str],
count: int | None = None,
aggregation_type: str | None = None,
aggregation_type: str | list[str] | None = None,
bucket_size_msec: int | None = 0,
with_labels: bool | None = False,
filter_by_ts: List[int] | None = None,
Expand Down Expand Up @@ -1245,9 +1263,13 @@ def mrevrange(
count:
Limits the number of returned samples.
aggregation_type:
Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
`range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`,
`twa`, 'countNaN', 'countAll'].
Optional aggregation type. Can be a single string or a list of strings
for multiple aggregators (requires Redis 8.8+). Valid values:
[`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`,
`std.p`, `std.s`, `var.p`, `var.s`, `twa`, `countNaN`, `countAll`].
When a list is passed, each sample in the response contains values
in the same order as the specified aggregators.
Note: GROUPBY is not allowed when multiple aggregators are specified.
bucket_size_msec:
Time bucket for aggregation in milliseconds.
with_labels:
Expand Down Expand Up @@ -1485,12 +1507,17 @@ def _append_align(params: list[EncodableT], align: int | str | None):
@staticmethod
def _append_aggregation(
params: list[EncodableT],
aggregation_type: str | None,
aggregation_type: str | list[str] | None,
bucket_size_msec: int | None,
):
"""Append AGGREGATION property to params."""
if aggregation_type is not None:
params.extend(["AGGREGATION", aggregation_type, bucket_size_msec])
if isinstance(aggregation_type, list):
params.extend(
["AGGREGATION", ",".join(aggregation_type), bucket_size_msec]
)
else:
params.extend(["AGGREGATION", aggregation_type, bucket_size_msec])

@staticmethod
def _append_chunk_size(params: list[EncodableT], chunk_size: int | None):
Expand Down
5 changes: 5 additions & 0 deletions redis/commands/timeseries/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ def _pairs_to_dict(pairs):

def parse_range(response, **kwargs):
"""Parse range response. Used by TS.RANGE and TS.REVRANGE."""
if not response:
return []
# Multi-aggregator: samples have >2 elements [timestamp, val1, val2, ...]
if len(response[0]) > 2:
return [[r[0]] + [float(v) for v in r[1:]] for r in response]
return [[r[0], float(r[1])] for r in response]


Expand Down
126 changes: 126 additions & 0 deletions tests/test_asyncio/test_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,3 +865,129 @@ async def test_mrevrange_with_count_nan_count_all_aggregators(decoded_r: redis.R
assert 2 == len(data_points)
assert [[1000, 2.0]] == data_points["temperature:A"][2]
assert [[1000, 2.0]] == data_points["temperature:B"][2]


@pytest.mark.redismod
@skip_if_server_version_lt("8.7.0")
async def test_range_multiple_aggregators(decoded_r: redis.Redis):
"""Test TS.RANGE with multiple aggregators (Redis 8.8+)."""
await decoded_r.ts().create("ts:multi_agg")
await decoded_r.ts().add("ts:multi_agg", 1000, 10)
await decoded_r.ts().add("ts:multi_agg", 1001, 20)
await decoded_r.ts().add("ts:multi_agg", 1002, 30)

result = await decoded_r.ts().range(
"ts:multi_agg",
1000,
1002,
aggregation_type=["min", "max", "avg"],
bucket_size_msec=10,
)
assert len(result) == 1
assert result[0][0] == 1000 # timestamp
assert result[0][1] == 10.0 # min
assert result[0][2] == 30.0 # max
assert result[0][3] == 20.0 # avg


@pytest.mark.redismod
@skip_if_server_version_lt("8.7.0")
async def test_revrange_multiple_aggregators(decoded_r: redis.Redis):
"""Test TS.REVRANGE with multiple aggregators (Redis 8.8+)."""
await decoded_r.ts().create("ts:multi_agg")
await decoded_r.ts().add("ts:multi_agg", 1000, 10)
await decoded_r.ts().add("ts:multi_agg", 1001, 20)
await decoded_r.ts().add("ts:multi_agg", 1002, 30)

result = await decoded_r.ts().revrange(
"ts:multi_agg",
1000,
1002,
aggregation_type=["min", "max", "avg"],
bucket_size_msec=10,
)
assert len(result) == 1
assert result[0][0] == 1000 # timestamp
assert result[0][1] == 10.0 # min
assert result[0][2] == 30.0 # max
assert result[0][3] == 20.0 # avg


@pytest.mark.redismod
@skip_if_server_version_lt("8.7.0")
async def test_mrange_multiple_aggregators(decoded_r: redis.Redis):
"""Test TS.MRANGE with multiple aggregators (Redis 8.8+)."""
await decoded_r.ts().create("ts:multi_agg_a", labels={"type": "test_multi_agg"})
await decoded_r.ts().add("ts:multi_agg_a", 1000, 10)
await decoded_r.ts().add("ts:multi_agg_a", 1001, 20)

result = await decoded_r.ts().mrange(
1000,
1001,
filters=["type=test_multi_agg"],
aggregation_type=["min", "max"],
bucket_size_msec=10,
)
assert "ts:multi_agg_a" in result
samples = result["ts:multi_agg_a"][2]
assert len(samples) == 1
assert samples[0][0] == 1000 # timestamp
assert samples[0][1] == 10.0 # min
assert samples[0][2] == 20.0 # max


@pytest.mark.redismod
@skip_if_server_version_lt("8.7.0")
async def test_mrevrange_multiple_aggregators(decoded_r: redis.Redis):
"""Test TS.MREVRANGE with multiple aggregators (Redis 8.8+)."""
await decoded_r.ts().create("ts:multi_agg_b", labels={"type": "test_multi_agg_rev"})
await decoded_r.ts().add("ts:multi_agg_b", 1000, 10)
await decoded_r.ts().add("ts:multi_agg_b", 1001, 20)

result = await decoded_r.ts().mrevrange(
1000,
1001,
filters=["type=test_multi_agg_rev"],
aggregation_type=["min", "max"],
bucket_size_msec=10,
)
assert "ts:multi_agg_b" in result
samples = result["ts:multi_agg_b"][2]
assert len(samples) == 1
assert samples[0][0] == 1000 # timestamp
assert samples[0][1] == 10.0 # min
assert samples[0][2] == 20.0 # max


@pytest.mark.redismod
async def test_mrange_groupby_multiple_aggregators_raises(decoded_r: redis.Redis):
"""Test that GROUPBY with multiple aggregators raises DataError."""
await decoded_r.ts().create("ts:gb_test", labels={"type": "test_gb"})

with pytest.raises(redis.DataError, match="GROUPBY is not allowed"):
await decoded_r.ts().mrange(
0,
100,
filters=["type=test_gb"],
aggregation_type=["min", "max"],
bucket_size_msec=10,
groupby="type",
reduce="max",
)


@pytest.mark.redismod
async def test_mrevrange_groupby_multiple_aggregators_raises(decoded_r: redis.Redis):
"""Test that GROUPBY with multiple aggregators raises DataError."""
await decoded_r.ts().create("ts:gb_test2", labels={"type": "test_gb2"})

with pytest.raises(redis.DataError, match="GROUPBY is not allowed"):
await decoded_r.ts().mrevrange(
0,
100,
filters=["type=test_gb2"],
aggregation_type=["min", "max"],
bucket_size_msec=10,
groupby="type",
reduce="max",
)
Loading
Loading