4
4
"context"
5
5
"errors"
6
6
"fmt"
7
+ "slices"
8
+ "sort"
7
9
"time"
8
10
9
11
"github.com/apache/arrow-go/v18/arrow"
@@ -24,6 +26,22 @@ type rangeAggregationOptions struct {
24
26
step time.Duration // step used for range queries
25
27
}
26
28
29
+ // window is a time interval where start is exclusive and end is inclusive
30
+ // Refer to [logql.batchRangeVectorIterator].
31
+ type window struct {
32
+ start , end time.Time
33
+ }
34
+
35
+ // Contains returns if the timestamp t is within the bounds of the window.
36
+ // The window start is exclusive, the window end is inclusive.
37
+ func (w window ) Contains (t time.Time ) bool {
38
+ return t .After (w .start ) && ! t .After (w .end )
39
+ }
40
+
41
+ // timestampMatchingWindowsFunc resolves matching range interval windows for a specific timestamp.
42
+ // The list can be empty if the timestamp is out of bounds or does not match any of the range windows.
43
+ type timestampMatchingWindowsFunc func (time.Time ) []window
44
+
27
45
// rangeAggregationPipeline is a pipeline that performs aggregations over a time window.
28
46
//
29
47
// 1. It reads from the input pipelines
@@ -37,8 +55,8 @@ type rangeAggregationPipeline struct {
37
55
inputsExhausted bool // indicates if all inputs are exhausted
38
56
39
57
aggregator * aggregator
40
- matchingTimeWindows func ( t time. Time ) []time. Time // function to find matching time windows for a given timestamp
41
- evaluator expressionEvaluator // used to evaluate column expressions
58
+ windowsForTimestamp timestampMatchingWindowsFunc // function to find matching time windows for a given timestamp
59
+ evaluator expressionEvaluator // used to evaluate column expressions
42
60
opts rangeAggregationOptions
43
61
}
44
62
@@ -53,21 +71,10 @@ func newRangeAggregationPipeline(inputs []Pipeline, evaluator expressionEvaluato
53
71
}
54
72
55
73
func (r * rangeAggregationPipeline ) init () {
56
- windows := []struct {
57
- // lower bound is not inclusive
58
- // refer to [logql.batchRangeVectorIterator]
59
- startTs time.Time
60
- endTs time.Time
61
- }{}
74
+ windows := []window {}
62
75
cur := r .opts .startTs
63
76
for cur .Compare (r .opts .endTs ) <= 0 {
64
- windows = append (windows , struct {
65
- startTs time.Time
66
- endTs time.Time
67
- }{
68
- startTs : cur .Add (- r .opts .rangeInterval ),
69
- endTs : cur ,
70
- })
77
+ windows = append (windows , window {start : cur .Add (- r .opts .rangeInterval ), end : cur })
71
78
72
79
if r .opts .step == 0 {
73
80
break
@@ -77,26 +84,8 @@ func (r *rangeAggregationPipeline) init() {
77
84
cur = cur .Add (r .opts .step )
78
85
}
79
86
80
- var (
81
- lowerbound = r .opts .startTs .Add (- r .opts .rangeInterval )
82
- upperbound = r .opts .endTs
83
- )
84
-
85
- r .matchingTimeWindows = func (t time.Time ) []time.Time {
86
- if t .Compare (lowerbound ) <= 0 || t .Compare (upperbound ) > 0 {
87
- return nil // out of range
88
- }
89
-
90
- var ret []time.Time
91
- for _ , window := range windows {
92
- if t .Compare (window .startTs ) > 0 && t .Compare (window .endTs ) <= 0 {
93
- ret = append (ret , window .endTs )
94
- }
95
- }
96
-
97
- return ret
98
- }
99
-
87
+ f := newMatcherFactoryFromOpts (r .opts )
88
+ r .windowsForTimestamp = f .createMatcher (windows )
100
89
r .aggregator = newAggregator (r .opts .partitionBy , len (windows ))
101
90
}
102
91
@@ -181,7 +170,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro
181
170
tsCol := vec .ToArray ().(* array.Timestamp )
182
171
183
172
for row := range int (record .NumRows ()) {
184
- windows := r .matchingTimeWindows (tsCol .Value (row ).ToTime (arrow .Nanosecond ))
173
+ windows := r .windowsForTimestamp (tsCol .Value (row ).ToTime (arrow .Nanosecond ))
185
174
if len (windows ) == 0 {
186
175
continue // out of range, skip this row
187
176
}
@@ -192,8 +181,8 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro
192
181
labelValues [col ] = arr .Value (row )
193
182
}
194
183
195
- for _ , ts := range windows {
196
- r .aggregator .Add (ts , 1 , labelValues )
184
+ for _ , w := range windows {
185
+ r .aggregator .Add (w . end , 1 , labelValues )
197
186
}
198
187
}
199
188
}
@@ -225,3 +214,154 @@ func (r *rangeAggregationPipeline) Inputs() []Pipeline {
225
214
func (r * rangeAggregationPipeline ) Transport () Transport {
226
215
return Local
227
216
}
217
+
218
+ func newMatcherFactoryFromOpts (opts rangeAggregationOptions ) * matcherFactory {
219
+ return & matcherFactory {
220
+ start : opts .startTs ,
221
+ step : opts .step ,
222
+ interval : opts .rangeInterval ,
223
+ bounds : window {
224
+ start : opts .startTs .Add (- opts .rangeInterval ),
225
+ end : opts .endTs ,
226
+ },
227
+ }
228
+ }
229
+
230
+ type matcherFactory struct {
231
+ start time.Time
232
+ step time.Duration
233
+ interval time.Duration
234
+ bounds window
235
+ }
236
+
237
+ func (f * matcherFactory ) createMatcher (windows []window ) timestampMatchingWindowsFunc {
238
+ switch {
239
+ case f .step == 0 :
240
+ // For instant queries, step == 0, meaning that all samples fall into the one and same step.
241
+ // A sample timestamp will always match the only time window available, unless the timestamp it out of range.
242
+ return f .createExactMatcher (windows )
243
+ case f .step == f .interval :
244
+ // If the step is equal to the range interval (e.g. when used $__auto in Grafana), then a sample timestamp matches exactly one time window.
245
+ return f .createAlignedMatcher (windows )
246
+ case f .step > f .interval :
247
+ // If the step is greater than the range interval, then a sample timestamp matches either one time window or no time window (and will be discarded).
248
+ return f .createGappedMatcher (windows )
249
+ case f .step < f .interval :
250
+ // If the step is smaller than the range interval, then a sample timestamp matches either one or multiple time windows.
251
+ return f .createOverlappingMatcher (windows )
252
+ default :
253
+ panic ("invalid step and range interval" )
254
+ }
255
+ }
256
+
257
+ // createExactMatcher is used for instant queries.
258
+ // The function returns a matcher that always returns the first aggregation window from the given windows if the timestamp is not out of range.
259
+ // It is expected that len(windows) is exactly 1, but it is not enforced.
260
+ //
261
+ // steps |---------x-------|
262
+ // interval |---------x-------|
263
+ func (f * matcherFactory ) createExactMatcher (windows []window ) timestampMatchingWindowsFunc {
264
+ return func (t time.Time ) []window {
265
+ if ! f .bounds .Contains (t ) {
266
+ return nil // out of range
267
+ }
268
+ if len (windows ) == 0 {
269
+ return nil
270
+ }
271
+ return []window {windows [0 ]}
272
+ }
273
+ }
274
+
275
+ // createAlignedMatcher is used for range queries.
276
+ // The function returns a matcher that always returns exactly one aggregation window that matches the timestamp if the timestamp is not out of range.
277
+ //
278
+ // steps |-----|---x-|-----|
279
+ // interval |-----|
280
+ // interval |---x-|
281
+ // interval |-----|
282
+ func (f * matcherFactory ) createAlignedMatcher (windows []window ) timestampMatchingWindowsFunc {
283
+ startNs := f .start .UnixNano ()
284
+ stepNs := f .step .Nanoseconds ()
285
+
286
+ return func (t time.Time ) []window {
287
+ if ! f .bounds .Contains (t ) {
288
+ return nil // out of range
289
+ }
290
+
291
+ tNs := t .UnixNano ()
292
+ // valid timestamps for window i: t > startNs + (i-1) * intervalNs && t <= startNs + i * intervalNs
293
+ windowIndex := (tNs - startNs + stepNs - 1 ) / stepNs // subtract 1ns because we are calculating 0-based indexes
294
+ return []window {windows [windowIndex ]}
295
+ }
296
+ }
297
+
298
+ // createGappedMatcher is used for range queries.
299
+ // The function returns a matcher that either returns exactly one aggregation window that matches the timestamp, or none,
300
+ // if the timestamp is out of bounds or within bounds, but is within a "gap" between the end of an interval and the beginning of the next interval.
301
+ //
302
+ // steps |-----|---x-|-----|
303
+ // interval |--|
304
+ // interval |x-|
305
+ // interval |--|
306
+ func (f * matcherFactory ) createGappedMatcher (windows []window ) timestampMatchingWindowsFunc {
307
+ startNs := f .start .UnixNano ()
308
+ stepNs := f .step .Nanoseconds ()
309
+
310
+ return func (t time.Time ) []window {
311
+ if ! f .bounds .Contains (t ) {
312
+ return nil // out of range
313
+ }
314
+
315
+ tNs := t .UnixNano ()
316
+ // For gapped windows, window i covers: (start + i*step - interval, start + i*step]
317
+ windowIndex := (tNs - startNs + stepNs - 1 ) / stepNs // subtract 1ns because we are calculating 0-based indexes
318
+ matchingWindow := windows [windowIndex ]
319
+
320
+ // Verify the timestamp is within the window (not in a gap)
321
+ if tNs > matchingWindow .start .UnixNano () {
322
+ return []window {matchingWindow }
323
+ }
324
+
325
+ return nil // timestamp is in a gap
326
+ }
327
+ }
328
+
329
+ // createOverlappingMatcher is used for range queries.
330
+ // The function returns a matcher that returns one or more aggregation windows that match the timestamp, if the timestamp is not out of range.
331
+ //
332
+ // steps |-----|---x-|-----|
333
+ // interval |x-------|
334
+ // interval |------x-|
335
+ // interval |--------|
336
+ func (f * matcherFactory ) createOverlappingMatcher (windows []window ) timestampMatchingWindowsFunc {
337
+ return func (t time.Time ) []window {
338
+ if ! f .bounds .Contains (t ) {
339
+ return nil // out of range
340
+ }
341
+
342
+ // Find the last window that could contain the timestamp.
343
+ // We need to find the last window where t > window.startTs
344
+ // so search for the first window where t <= window.startTs
345
+ firstOOBIndex := sort .Search (len (windows ), func (i int ) bool {
346
+ return t .Compare (windows [i ].start ) <= 0
347
+ })
348
+
349
+ windowIndex := firstOOBIndex - 1
350
+ if windowIndex < 0 {
351
+ return nil
352
+ }
353
+
354
+ // Iterate backwards from last matching window to find all matches
355
+ var result []window
356
+ for _ , window := range slices .Backward (windows [:windowIndex + 1 ]) {
357
+ if t .Compare (window .start ) > 0 && t .Compare (window .end ) <= 0 {
358
+ result = append (result , window )
359
+ } else if t .Compare (window .end ) > 0 {
360
+ // we've gone past all possible matches
361
+ break
362
+ }
363
+ }
364
+
365
+ return result
366
+ }
367
+ }
0 commit comments