forked from cortexproject/cortex
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdistributed_query.go
More file actions
93 lines (74 loc) · 2.47 KB
/
distributed_query.go
File metadata and controls
93 lines (74 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package tripperware
import (
"context"
"net/http"
"time"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
"github.com/weaveworks/common/httpgrpc"
)
const (
stepBatch = 10
)
func DistributedQueryMiddleware(defaultEvaluationInterval time.Duration, lookbackDelta time.Duration) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return distributedQueryMiddleware{
next: next,
lookbackDelta: lookbackDelta,
defaultEvaluationInterval: defaultEvaluationInterval,
}
})
}
func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
if step == 0 {
return start, start
}
return start, end
}
type distributedQueryMiddleware struct {
next Handler
defaultEvaluationInterval time.Duration
lookbackDelta time.Duration
}
func (d distributedQueryMiddleware) newLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, error) {
start, end = getStartAndEnd(start, end, step)
qOpts := query.Options{
Start: start,
End: end,
Step: step,
StepsBatch: stepBatch,
NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration {
return d.defaultEvaluationInterval
},
// Hardcoded value for execution-time-params that will be re-populated again in the querier stage
LookbackDelta: d.lookbackDelta,
EnablePerStepStats: false,
}
expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
if err != nil {
return nil, err
}
planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: false,
}
logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts)
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
return &optimizedPlan, nil
}
func (d distributedQueryMiddleware) Do(ctx context.Context, r Request) (Response, error) {
promReq, ok := r.(*PrometheusRequest)
if !ok {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}
startTime := time.Unix(0, promReq.Start*int64(time.Millisecond))
endTime := time.Unix(0, promReq.End*int64(time.Millisecond))
step := time.Duration(promReq.Step) * time.Millisecond
var err error
newLogicalPlan, err := d.newLogicalPlan(promReq.Query, startTime, endTime, step)
if err != nil {
return nil, err
}
promReq.LogicalPlan = *newLogicalPlan
return d.next.Do(ctx, r)
}