Skip to content

Commit 309e110

Browse files
Support TS.MRANGE/MREVRANGE GROUPBY <label> REDUCE <reducer> (#31)
* GroupBy support * Update build.yml Co-authored-by: dengliming <[email protected]>
1 parent 5f68f19 commit 309e110

File tree

6 files changed

+207
-4
lines changed

6 files changed

+207
-4
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
ports:
3333
- 6380:6379
3434
redistimeseries:
35-
image: redislabs/redistimeseries:latest
35+
image: redislabs/redistimeseries:1.6.0
3636
options: >-
3737
--health-cmd "redis-cli ping"
3838
--health-interval 10s
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2020 dengliming.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.dengliming.redismodule.redistimeseries;
18+
19+
import io.github.dengliming.redismodule.redistimeseries.protocol.Keywords;
20+
21+
import java.util.List;
22+
23+
/**
24+
* @author xdev.developer
25+
*/
26+
public class GroupByOptions {
27+
28+
private String groupByLabel;
29+
private Reducer reducer;
30+
31+
/**
32+
* Group by label using reducer aggregation
33+
* @param label grouping label
34+
* @param reducer reducer
35+
* @return RangeOptions
36+
*/
37+
public GroupByOptions groupBy(String label, Reducer reducer) {
38+
this.groupByLabel = label;
39+
this.reducer = reducer;
40+
return this;
41+
}
42+
43+
public void build(List<Object> args) {
44+
if (groupByLabel != null && reducer != null) {
45+
args.add(Keywords.GROUPBY);
46+
args.add(groupByLabel);
47+
args.add(Keywords.REDUCE);
48+
args.add(reducer.getKey());
49+
}
50+
}
51+
}

redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeries.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,41 @@ public RFuture<List<TimeSeries>> mrangeAsync(long from, long to, RangeOptions ra
292292
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TS_MRANGE, args.toArray());
293293
}
294294

295+
/**
296+
* Query a timestamp range across multiple time-series by filters.
297+
*
298+
* @param from fromTimestamp
299+
* @param to to timestamp
300+
* @param rangeOptions Optional args
301+
* @param groupBy Optional group by args
302+
* @param filters list of filters
303+
* @return List of TimeSeries
304+
*/
305+
public List<TimeSeries> mrange(long from, long to, RangeOptions rangeOptions, GroupByOptions groupBy, String... filters) {
306+
return commandExecutor.get(mrangeAsync(from, to, rangeOptions, groupBy, filters));
307+
}
308+
309+
public RFuture<List<TimeSeries>> mrangeAsync(long from, long to, RangeOptions rangeOptions, GroupByOptions groupBy, String... filters) {
310+
RAssert.notEmpty(filters, "filters must not be empty");
311+
312+
List<Object> args = new ArrayList<>();
313+
args.add(from);
314+
args.add(to);
315+
if (rangeOptions != null) {
316+
rangeOptions.build(args);
317+
}
318+
args.add(Keywords.FILTER);
319+
for (String filter : filters) {
320+
args.add(filter);
321+
}
322+
323+
if (groupBy != null) {
324+
groupBy.build(args);
325+
}
326+
327+
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TS_MRANGE, args.toArray());
328+
}
329+
295330
/**
296331
* Get the last sample.
297332
*
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2020 dengliming.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.dengliming.redismodule.redistimeseries;
18+
19+
/**
20+
* Group by reducer
21+
*
22+
* @author xdev.developer
23+
*/
24+
public enum Reducer {
25+
SUM("sum"), MIN("min"), MAX("max");
26+
27+
private String key;
28+
29+
Reducer(String key) {
30+
this.key = key;
31+
}
32+
33+
public String getKey() {
34+
return key;
35+
}
36+
}

redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@
2121
*/
2222
public enum Keywords {
2323

24-
RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE, ALIGN;
24+
RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE, ALIGN, GROUPBY, REDUCE;
2525

2626
}

redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.github.dengliming.redismodule.redistimeseries;
1818

1919
import org.junit.Assert;
20-
import org.junit.Ignore;
2120
import org.junit.jupiter.api.Test;
2221
import org.redisson.client.RedisException;
2322

@@ -28,6 +27,7 @@
2827

2928
import static io.github.dengliming.redismodule.redistimeseries.Sample.Value;
3029
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.assertj.core.api.Assertions.tuple;
3131

3232
/**
3333
* @author dengliming
@@ -179,7 +179,7 @@ public void testAggregations() {
179179
assertThat(sum.get(0).getValue()).isEqualTo(40.0d);
180180
}
181181

182-
@Ignore("Only for redis timeseries > 1.6.0")
182+
@Test
183183
public void testAggregationsAlign() {
184184
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();
185185
long from = 1L;
@@ -225,6 +225,87 @@ public void testAggregationsAlign() {
225225
assertThat(end.get(1).getValue()).isEqualTo(10.0d);
226226
}
227227

228+
@Test
229+
public void testGroupBy() {
230+
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();
231+
long from = 1L;
232+
long to = 10;
233+
234+
TimeSeriesOptions cpuSystem = new TimeSeriesOptions()
235+
.labels(new Label("metric", "cpu"), new Label("name", "system"))
236+
.unCompressed();
237+
238+
TimeSeriesOptions cpuUser = new TimeSeriesOptions()
239+
.labels(new Label("metric", "cpu"), new Label("name", "user"))
240+
.unCompressed();
241+
242+
assertThat(redisTimeSeries.add(new Sample("ts1", Value.of(1L, 90.0d)), cpuSystem).longValue()).isEqualTo(1L);
243+
assertThat(redisTimeSeries.add(new Sample("ts1", Value.of(2L, 45.0d)), cpuSystem).longValue()).isEqualTo(2L);
244+
assertThat(redisTimeSeries.add(new Sample("ts2", Value.of(2L, 99.0d)), cpuUser).longValue()).isEqualTo(2L);
245+
assertThat(redisTimeSeries.add(new Sample("ts3", Value.of(2L, 2.0d)), cpuSystem).longValue()).isEqualTo(2L);
246+
247+
List<TimeSeries> max = redisTimeSeries.mrange(from, to,
248+
new RangeOptions().withLabels(),
249+
new GroupByOptions().groupBy("name", Reducer.MAX), "metric=cpu");
250+
251+
assertThat(max).hasSize(2);
252+
253+
assertThat(max.get(0).getLabels())
254+
.extracting(Label::getKey, Label::getValue)
255+
.containsExactlyInAnyOrder(
256+
tuple("name", "system"),
257+
tuple("__reducer__", "max"),
258+
tuple("__source__", "ts1,ts3"));
259+
260+
assertThat(max.get(0).getValues())
261+
.extracting(Value::getTimestamp, Value::getValue)
262+
.containsExactly(
263+
tuple(1L, 90.0d),
264+
tuple(2L, 45.0d));
265+
266+
assertThat(max.get(1).getLabels())
267+
.extracting(Label::getKey, Label::getValue)
268+
.containsExactlyInAnyOrder(
269+
tuple("name", "user"),
270+
tuple("__reducer__", "max"),
271+
tuple("__source__", "ts2"));
272+
273+
assertThat(max.get(1).getValues())
274+
.extracting(Value::getTimestamp, Value::getValue)
275+
.containsExactly(tuple(2L, 99.0d));
276+
277+
List<TimeSeries> min = redisTimeSeries.mrange(from, to,
278+
new RangeOptions().withLabels(),
279+
new GroupByOptions().groupBy("name", Reducer.MIN), "metric=cpu");
280+
281+
assertThat(min).hasSize(2);
282+
283+
assertThat(min.get(0).getLabels())
284+
.extracting(Label::getKey, Label::getValue)
285+
.containsExactlyInAnyOrder(
286+
tuple("name", "system"),
287+
tuple("__reducer__", "min"),
288+
tuple("__source__", "ts1,ts3"));
289+
290+
assertThat(min.get(0).getValues())
291+
.extracting(Value::getTimestamp, Value::getValue)
292+
.containsExactly(
293+
tuple(1L, 90.0d),
294+
tuple(2L, 2.0d));
295+
296+
assertThat(min.get(1).getLabels())
297+
.extracting(Label::getKey, Label::getValue)
298+
.containsExactlyInAnyOrder(
299+
tuple("name", "user"),
300+
tuple("__reducer__", "min"),
301+
tuple("__source__", "ts2"));
302+
303+
assertThat(min.get(1).getValues())
304+
.extracting(Value::getTimestamp, Value::getValue)
305+
.containsExactly(tuple(2L, 99.0d));
306+
307+
}
308+
228309
@Test
229310
public void testQueryIndex() {
230311
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();

0 commit comments

Comments
 (0)