Skip to content

Commit 3c16e62

Browse files
committed
remove feature flag from query api: execute logical plan if possible
Signed-off-by: rubywtl <[email protected]>
1 parent d984196 commit 3c16e62

File tree

3 files changed

+39
-78
lines changed

3 files changed

+39
-78
lines changed

pkg/api/handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func NewQuerierHandler(
281281
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
282282
api.Register(legacyPromRouter)
283283

284-
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, distributedExecEnabled)
284+
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
285285

286286
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
287287
// https://github.com/prometheus/prometheus/pull/7125/files

pkg/api/queryapi/query_api.go

Lines changed: 34 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@ import (
2525
)
2626

2727
type QueryAPI struct {
28-
queryable storage.SampleAndChunkQueryable
29-
queryEngine engine.QueryEngine
30-
now func() time.Time
31-
statsRenderer v1.StatsRenderer
32-
logger log.Logger
33-
codecs []v1.Codec
34-
CORSOrigin *regexp.Regexp
35-
distributedExecEnabled bool
28+
queryable storage.SampleAndChunkQueryable
29+
queryEngine engine.QueryEngine
30+
now func() time.Time
31+
statsRenderer v1.StatsRenderer
32+
logger log.Logger
33+
codecs []v1.Codec
34+
CORSOrigin *regexp.Regexp
3635
}
3736

3837
func NewQueryAPI(
@@ -42,17 +41,15 @@ func NewQueryAPI(
4241
logger log.Logger,
4342
codecs []v1.Codec,
4443
CORSOrigin *regexp.Regexp,
45-
distributedExecEnabled bool,
4644
) *QueryAPI {
4745
return &QueryAPI{
48-
queryEngine: qe,
49-
queryable: q,
50-
statsRenderer: statsRenderer,
51-
logger: logger,
52-
codecs: codecs,
53-
CORSOrigin: CORSOrigin,
54-
now: time.Now,
55-
distributedExecEnabled: distributedExecEnabled,
46+
queryEngine: qe,
47+
queryable: q,
48+
statsRenderer: statsRenderer,
49+
logger: logger,
50+
codecs: codecs,
51+
CORSOrigin: CORSOrigin,
52+
now: time.Now,
5653
}
5754
}
5855

@@ -110,34 +107,17 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
110107
endTime := convertMsToTime(end)
111108
stepDuration := convertMsToDuration(step)
112109

113-
if q.distributedExecEnabled {
114-
byteLP := []byte(r.PostFormValue("plan"))
115-
if len(byteLP) != 0 {
116-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
117-
if err != nil {
118-
return apiFuncResult{
119-
nil,
120-
&apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)},
121-
nil,
122-
nil,
123-
}
124-
}
125-
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query"))
126-
if err != nil {
127-
return apiFuncResult{
128-
nil,
129-
&apiError{errorBadData, fmt.Errorf("failed to create range query from logical plan: %v", err)},
130-
nil,
131-
nil,
132-
}
133-
}
134-
} else { // if there is logical plan field is empty, fall back
135-
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration)
136-
if err != nil {
137-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
138-
}
110+
byteLP := []byte(r.PostFormValue("plan"))
111+
if len(byteLP) != 0 {
112+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
113+
if err != nil {
114+
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
115+
}
116+
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query"))
117+
if err != nil {
118+
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil}
139119
}
140-
} else {
120+
} else { // if there is logical plan field is empty, fall back
141121
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration)
142122
if err != nil {
143123
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
@@ -200,35 +180,17 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
200180
var qry promql.Query
201181
tsTime := convertMsToTime(ts)
202182

203-
if q.distributedExecEnabled {
204-
byteLP := []byte(r.PostFormValue("plan"))
205-
if len(byteLP) != 0 {
206-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
207-
if err != nil {
208-
return apiFuncResult{
209-
nil,
210-
&apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)},
211-
nil,
212-
nil,
213-
}
214-
}
215-
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
216-
if err != nil {
217-
return apiFuncResult{
218-
nil,
219-
&apiError{errorBadData, fmt.Errorf("failed to create instant query from logical plan: %v", err)},
220-
nil,
221-
nil,
222-
}
223-
}
224-
} else {
225-
// fallback to query string for instant query
226-
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime)
227-
if err != nil {
228-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
229-
}
183+
byteLP := []byte(r.PostFormValue("plan"))
184+
if len(byteLP) != 0 {
185+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
186+
if err != nil {
187+
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
188+
}
189+
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
190+
if err != nil {
191+
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil}
230192
}
231-
} else {
193+
} else { // if there is logical plan field is empty, fall back
232194
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime)
233195
if err != nil {
234196
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

pkg/api/queryapi/query_api_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func Test_CustomAPI(t *testing.T) {
183183

184184
for _, test := range tests {
185185
t.Run(test.name, func(t *testing.T) {
186-
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
186+
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
187187

188188
router := mux.NewRouter()
189189
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
@@ -244,7 +244,7 @@ func Test_InvalidCodec(t *testing.T) {
244244
},
245245
}
246246

247-
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), false)
247+
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"))
248248
router := mux.NewRouter()
249249
router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
250250

@@ -285,7 +285,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) {
285285
},
286286
}
287287

288-
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
288+
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
289289

290290
router := mux.NewRouter()
291291
router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
@@ -441,7 +441,7 @@ func Test_Logicalplan_Requests(t *testing.T) {
441441

442442
for _, tt := range tests {
443443
t.Run(tt.name, func(t *testing.T) {
444-
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), true)
444+
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
445445
router := mux.NewRouter()
446446
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
447447
router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler))
@@ -464,7 +464,6 @@ func createTestRequest(path string, planBytes []byte) *http.Request {
464464
req := httptest.NewRequest(http.MethodPost, path, io.NopCloser(strings.NewReader(form.Encode())))
465465

466466
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
467-
468467
ctx := context.Background()
469468
_, ctx = stats.ContextWithEmptyStats(ctx)
470469
return req.WithContext(user.InjectOrgID(ctx, "user1"))

0 commit comments

Comments
 (0)