Skip to content

Commit 84f55fa

Browse files
committed
fix querier serve streaming errors but next() currently takes too long
1 parent bef2d02 commit 84f55fa

File tree

9 files changed

+422
-162
lines changed

9 files changed

+422
-162
lines changed

pkg/api/handlers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func NewQuerierHandler(
169169
metadataQuerier querier.MetadataQuerier,
170170
reg prometheus.Registerer,
171171
logger log.Logger,
172+
distributedExecEnabled bool,
172173
) http.Handler {
173174
// Prometheus histograms for requests to the querier.
174175
querierRequestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
@@ -282,7 +283,7 @@ func NewQuerierHandler(
282283
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
283284
api.Register(legacyPromRouter)
284285

285-
queryAPI := queryapi.NewQueryAPI(engine, queryResultCache, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
286+
queryAPI := queryapi.NewQueryAPI(engine, queryResultCache, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, distributedExecEnabled)
286287

287288
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
288289
// https://github.com/prometheus/prometheus/pull/7125/files

pkg/api/handlers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func TestBuildInfoAPI(t *testing.T) {
232232
version.Version = tc.version
233233
version.Branch = tc.branch
234234
version.Revision = tc.revision
235-
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, &FakeLogger{})
235+
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, nil, &FakeLogger{}, false)
236236
writer := httptest.NewRecorder()
237237
req := httptest.NewRequest("GET", "/api/v1/status/buildinfo", nil)
238238
req = req.WithContext(user.InjectOrgID(req.Context(), "test"))

pkg/api/queryapi/query_api.go

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ import (
2626
)
2727

2828
type QueryAPI struct {
29-
queryable storage.SampleAndChunkQueryable
30-
queryEngine engine.QueryEngine
31-
queryResultCache *distributed_execution.QueryResultCache
32-
now func() time.Time
33-
statsRenderer v1.StatsRenderer
34-
logger log.Logger
35-
codecs []v1.Codec
36-
CORSOrigin *regexp.Regexp
29+
queryable storage.SampleAndChunkQueryable
30+
queryEngine engine.QueryEngine
31+
queryResultCache *distributed_execution.QueryResultCache
32+
now func() time.Time
33+
statsRenderer v1.StatsRenderer
34+
logger log.Logger
35+
codecs []v1.Codec
36+
CORSOrigin *regexp.Regexp
37+
distributedExecEnabled bool
3738
}
3839

3940
func NewQueryAPI(
@@ -44,16 +45,18 @@ func NewQueryAPI(
4445
logger log.Logger,
4546
codecs []v1.Codec,
4647
CORSOrigin *regexp.Regexp,
48+
distributedExecEnabled bool,
4749
) *QueryAPI {
4850
return &QueryAPI{
49-
queryEngine: qe,
50-
queryResultCache: queryResultCache,
51-
queryable: q,
52-
statsRenderer: statsRenderer,
53-
logger: logger,
54-
codecs: codecs,
55-
CORSOrigin: CORSOrigin,
56-
now: time.Now,
51+
queryEngine: qe,
52+
queryResultCache: queryResultCache,
53+
queryable: q,
54+
statsRenderer: statsRenderer,
55+
logger: logger,
56+
codecs: codecs,
57+
CORSOrigin: CORSOrigin,
58+
now: time.Now,
59+
distributedExecEnabled: distributedExecEnabled,
5760
}
5861
}
5962

@@ -139,11 +142,12 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
139142

140143
ctx = httputil.ContextFromRequest(ctx, r)
141144

142-
// TODO: if distributed exec enabled
143-
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
144-
if !isRoot {
145-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
146-
q.queryResultCache.InitWriting(*key)
145+
if q.distributedExecEnabled {
146+
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
147+
if !isRoot {
148+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
149+
q.queryResultCache.InitWriting(*key)
150+
}
147151
}
148152

149153
res := qry.Exec(ctx)
@@ -188,11 +192,14 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
188192
ctx = engine.AddEngineTypeToContext(ctx, r)
189193
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
190194

191-
// TODO: if distributed exec enabled
192-
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
193-
if !isRoot {
194-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
195-
q.queryResultCache.InitWriting(*key)
195+
var isRoot bool
196+
var queryID, fragmentID uint64
197+
if q.distributedExecEnabled {
198+
isRoot, queryID, fragmentID, _, _ = distributed_execution.ExtractFragmentMetaData(ctx)
199+
if !isRoot {
200+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
201+
q.queryResultCache.InitWriting(*key)
202+
}
196203
}
197204

198205
var qry promql.Query
@@ -202,9 +209,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
202209
if len(byteLP) != 0 {
203210
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
204211
if err != nil {
205-
if !isRoot {
206-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
207-
q.queryResultCache.SetError(*key)
212+
if q.distributedExecEnabled {
213+
if !isRoot {
214+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
215+
q.queryResultCache.SetError(*key)
216+
}
208217
}
209218
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
210219
}
@@ -264,17 +273,18 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
264273
}
265274

266275
if result.data != nil {
267-
// TODO: if distributed exec enabled
268276
ctx := httputil.ContextFromRequest(r.Context(), r)
269-
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
270-
if !isRoot {
271-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
272-
result := distributed_execution.FragmentResult{
273-
Data: result.data,
274-
Expiration: time.Now().Add(time.Hour),
277+
if q.distributedExecEnabled {
278+
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
279+
if !isRoot {
280+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
281+
result := distributed_execution.FragmentResult{
282+
Data: result.data,
283+
Expiration: time.Now().Add(time.Hour),
284+
}
285+
q.queryResultCache.SetComplete(*key, result)
286+
return
275287
}
276-
q.queryResultCache.SetComplete(*key, result)
277-
return
278288
}
279289
q.respond(w, r, result.data, result.warnings, r.FormValue("query"))
280290
return

pkg/api/queryapi/query_api_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
78
"io"
89
"net/http"
910
"net/http/httptest"
@@ -183,7 +184,7 @@ func Test_CustomAPI(t *testing.T) {
183184

184185
for _, test := range tests {
185186
t.Run(test.name, func(t *testing.T) {
186-
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
187+
c := NewQueryAPI(engine, nil, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
187188

188189
router := mux.NewRouter()
189190
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
@@ -244,7 +245,7 @@ func Test_InvalidCodec(t *testing.T) {
244245
},
245246
}
246247

247-
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"))
248+
queryAPI := NewQueryAPI(engine, nil, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), false)
248249
router := mux.NewRouter()
249250
router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
250251

@@ -285,7 +286,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) {
285286
},
286287
}
287288

288-
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
289+
queryAPI := NewQueryAPI(engine, nil, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
289290

290291
router := mux.NewRouter()
291292
router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
@@ -441,7 +442,7 @@ func Test_Logicalplan_Requests(t *testing.T) {
441442

442443
for _, tt := range tests {
443444
t.Run(tt.name, func(t *testing.T) {
444-
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
445+
c := NewQueryAPI(engine, &distributed_execution.QueryResultCache{}, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
445446
router := mux.NewRouter()
446447
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
447448
router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler))

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
407407
t.MetadataQuerier,
408408
prometheus.DefaultRegisterer,
409409
util_log.Logger,
410+
t.Cfg.Querier.DistributedExecEnabled,
410411
)
411412

412413
// If the querier is running standalone without the query-frontend or query-scheduler, we must register it's internal

0 commit comments

Comments
 (0)