Skip to content

Commit fe31a41

Browse files
committed
refactor query creation logic code for distributed exec in query api
Signed-off-by: rubywtl <[email protected]>
1 parent 287d2af commit fe31a41

File tree

2 files changed

+73
-33
lines changed

2 files changed

+73
-33
lines changed

pkg/api/queryapi/query_api.go

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,69 @@ func NewQueryAPI(
5757
}
5858
}
5959

60+
func (q *QueryAPI) createQueryFromPlan(query string, byteLP []byte, start time.Time, end time.Time, step time.Duration, ctx context.Context, opts promql.QueryOpts) (promql.Query, apiFuncResult, error) {
61+
if step == 0 {
62+
// instant query logic (step is 0)
63+
if len(byteLP) != 0 {
64+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
65+
if err != nil {
66+
return nil, apiFuncResult{
67+
nil,
68+
&apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)},
69+
nil,
70+
nil,
71+
}, err
72+
}
73+
qry, err := q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, start, query)
74+
if err != nil {
75+
return nil, apiFuncResult{
76+
nil,
77+
&apiError{errorBadData, fmt.Errorf("failed to create instant query from logical plan: %v", err)},
78+
nil,
79+
nil,
80+
}, err
81+
}
82+
return qry, apiFuncResult{}, nil
83+
} else {
84+
// fallback to query string for instant query
85+
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, query, start)
86+
if err != nil {
87+
return nil, invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query"), err
88+
}
89+
return qry, apiFuncResult{}, nil
90+
}
91+
}
92+
93+
// range query logic (step is non-zero)
94+
if len(byteLP) != 0 {
95+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
96+
if err != nil {
97+
return nil, apiFuncResult{
98+
nil,
99+
&apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)},
100+
nil,
101+
nil,
102+
}, err
103+
}
104+
qry, err := q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, start, end, step, query)
105+
if err != nil {
106+
return nil, apiFuncResult{
107+
nil,
108+
&apiError{errorBadData, fmt.Errorf("failed to create range query from logical plan: %v", err)},
109+
nil,
110+
nil,
111+
}, err
112+
}
113+
return qry, apiFuncResult{}, nil
114+
} else {
115+
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, query, start, end, step)
116+
if err != nil {
117+
return nil, invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query"), err
118+
}
119+
return qry, apiFuncResult{}, nil
120+
}
121+
}
122+
60123
func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
61124
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
62125
start, err := util.ParseTime(r.FormValue("start"))
@@ -107,27 +170,15 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
107170
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
108171

109172
var qry promql.Query
110-
173+
var funcRes apiFuncResult
111174
if q.distributedExecEnabled {
112175
byteLP, err := io.ReadAll(r.Body)
113176
if err != nil {
114177
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
115178
}
116-
117-
if len(byteLP) != 0 {
118-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
119-
if err != nil {
120-
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
121-
}
122-
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step), r.FormValue("query"))
123-
if err != nil {
124-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
125-
}
126-
} else {
127-
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
128-
if err != nil {
129-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
130-
}
179+
qry, funcRes, err = q.createQueryFromPlan(r.FormValue("query"), byteLP, convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step), ctx, opts)
180+
if err != nil {
181+
return funcRes
131182
}
132183
} else {
133184
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
@@ -190,26 +241,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
190241
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
191242

192243
var qry promql.Query
244+
var funcRes apiFuncResult
193245
if q.distributedExecEnabled {
194246
byteLP, err := io.ReadAll(r.Body)
195247
if err != nil {
196248
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
197249
}
198-
199-
if len(byteLP) != 0 {
200-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
201-
if err != nil {
202-
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
203-
}
204-
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, convertMsToTime(ts), r.FormValue("query"))
205-
if err != nil {
206-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
207-
}
208-
} else {
209-
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
210-
if err != nil {
211-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
212-
}
250+
qry, funcRes, err = q.createQueryFromPlan(r.FormValue("query"), byteLP, convertMsToTime(ts), convertMsToTime(ts), convertMsToDuration(0), ctx, opts)
251+
if err != nil {
252+
return funcRes
213253
}
214254
} else {
215255
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))

pkg/api/queryapi/query_api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func Test_Logicalplan_Requests(t *testing.T) {
362362
return append(createTestLogicalPlan(t, 1536673665, 1536673680, 5), []byte("random data")...)
363363
},
364364
expectedCode: http.StatusInternalServerError,
365-
expectedBody: `{"status":"error","errorType":"server_error","error":"invalid character 'r' after top-level value"}`,
365+
expectedBody: `{"status":"error","errorType":"server_error","error":"invalid logical plan: invalid character 'r' after top-level value"}`,
366366
},
367367
{
368368
name: "[Range Query] with empty body and non-empty query string", // fall back to promql query execution
@@ -410,7 +410,7 @@ func Test_Logicalplan_Requests(t *testing.T) {
410410
return append(createTestLogicalPlan(t, 1536673670, 1536673670, 0), []byte("random data")...)
411411
},
412412
expectedCode: http.StatusInternalServerError,
413-
expectedBody: `{"status":"error","errorType":"server_error","error":"invalid character 'r' after top-level value"}`,
413+
expectedBody: `{"status":"error","errorType":"server_error","error":"invalid logical plan: invalid character 'r' after top-level value"}`,
414414
},
415415
{
416416
name: "[Instant Query] with empty body and non-empty query string",

0 commit comments

Comments
 (0)