Skip to content

Commit df1ff95

Browse files
committed
Allow queriers to execute logical query plans
1 parent 328bc3b commit df1ff95

File tree

8 files changed

+314
-38
lines changed

8 files changed

+314
-38
lines changed

pkg/api/handlers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ import (
1919
"github.com/prometheus/common/route"
2020
"github.com/prometheus/common/version"
2121
"github.com/prometheus/prometheus/config"
22-
"github.com/prometheus/prometheus/promql"
2322
"github.com/prometheus/prometheus/storage"
2423
v1 "github.com/prometheus/prometheus/web/api/v1"
2524
"github.com/weaveworks/common/instrument"
2625
"github.com/weaveworks/common/middleware"
2726

2827
"github.com/cortexproject/cortex/pkg/api/queryapi"
28+
"github.com/cortexproject/cortex/pkg/engine"
2929
"github.com/cortexproject/cortex/pkg/querier"
3030
"github.com/cortexproject/cortex/pkg/querier/codec"
3131
"github.com/cortexproject/cortex/pkg/querier/stats"
@@ -163,7 +163,7 @@ func NewQuerierHandler(
163163
cfg Config,
164164
queryable storage.SampleAndChunkQueryable,
165165
exemplarQueryable storage.ExemplarQueryable,
166-
engine promql.QueryEngine,
166+
engine engine.Engine,
167167
metadataQuerier querier.MetadataQuerier,
168168
reg prometheus.Registerer,
169169
logger log.Logger,
@@ -200,7 +200,7 @@ func NewQuerierHandler(
200200
corsOrigin := regexp.MustCompile(".*")
201201
translateSampleAndChunkQueryable := querier.NewErrorTranslateSampleAndChunkQueryable(queryable)
202202
api := v1.NewAPI(
203-
engine,
203+
&engine,
204204
translateSampleAndChunkQueryable, // Translate errors to errors expected by API.
205205
nil, // No remote write support.
206206
exemplarQueryable,

pkg/api/handlers_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010
"testing"
1111

12+
"github.com/cortexproject/cortex/pkg/engine"
1213
"github.com/prometheus/common/version"
1314
v1 "github.com/prometheus/prometheus/web/api/v1"
1415
"github.com/stretchr/testify/assert"
@@ -232,7 +233,7 @@ func TestBuildInfoAPI(t *testing.T) {
232233
version.Version = tc.version
233234
version.Branch = tc.branch
234235
version.Revision = tc.revision
235-
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, &FakeLogger{})
236+
handler := NewQuerierHandler(cfg, nil, nil, engine.Engine{}, nil, nil, &FakeLogger{})
236237
writer := httptest.NewRecorder()
237238
req := httptest.NewRequest("GET", "/api/v1/status/buildinfo", nil)
238239
req = req.WithContext(user.InjectOrgID(req.Context(), "test"))

pkg/api/queryapi/query_api.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package queryapi
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"net/http"
78
"time"
89

910
"github.com/go-kit/log"
1011
"github.com/go-kit/log/level"
1112
"github.com/grafana/regexp"
1213
"github.com/munnerz/goautoneg"
13-
"github.com/prometheus/prometheus/promql"
1414
"github.com/prometheus/prometheus/storage"
1515
"github.com/prometheus/prometheus/util/annotations"
1616
"github.com/prometheus/prometheus/util/httputil"
@@ -21,11 +21,13 @@ import (
2121
"github.com/cortexproject/cortex/pkg/querier"
2222
"github.com/cortexproject/cortex/pkg/util"
2323
"github.com/cortexproject/cortex/pkg/util/api"
24+
"github.com/prometheus/prometheus/promql"
25+
"github.com/thanos-io/promql-engine/logicalplan"
2426
)
2527

2628
type QueryAPI struct {
2729
queryable storage.SampleAndChunkQueryable
28-
queryEngine promql.QueryEngine
30+
queryEngine engine.Engine
2931
now func() time.Time
3032
statsRenderer v1.StatsRenderer
3133
logger log.Logger
@@ -34,7 +36,7 @@ type QueryAPI struct {
3436
}
3537

3638
func NewQueryAPI(
37-
qe promql.QueryEngine,
39+
qe engine.Engine,
3840
q storage.SampleAndChunkQueryable,
3941
statsRenderer v1.StatsRenderer,
4042
logger log.Logger,
@@ -100,10 +102,29 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
100102

101103
ctx = engine.AddEngineTypeToContext(ctx, r)
102104
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
103-
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
105+
106+
var qry promql.Query
107+
byteLP, err := io.ReadAll(r.Body)
104108
if err != nil {
105-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
109+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
110+
}
111+
112+
if len(byteLP) != 0 {
113+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
114+
if err != nil {
115+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
116+
}
117+
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step), r.FormValue("query"))
118+
if err != nil {
119+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
120+
}
121+
} else {
122+
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
123+
if err != nil {
124+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
125+
}
106126
}
127+
107128
// From now on, we must only return with a finalizer in the result (to
108129
// be called by the caller) or call qry.Close ourselves (which is
109130
// required in the case of a panic).
@@ -156,9 +177,27 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
156177

157178
ctx = engine.AddEngineTypeToContext(ctx, r)
158179
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
159-
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
180+
181+
var qry promql.Query
182+
byteLP, err := io.ReadAll(r.Body)
160183
if err != nil {
161-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
184+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
185+
}
186+
187+
if len(byteLP) != 0 {
188+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
189+
if err != nil {
190+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
191+
}
192+
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, convertMsToTime(ts), r.FormValue("query"))
193+
if err != nil {
194+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
195+
}
196+
} else {
197+
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
198+
if err != nil {
199+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
200+
}
162201
}
163202

164203
// From now on, we must only return with a finalizer in the result (to

0 commit comments

Comments
 (0)