Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eca4f39
Add logical plan fragmentation to scheduler + track querier fragments…
rubywtl Jul 27, 2025
241ecce
Allow queriers to execute logical query plans
rubywtl Jul 22, 2025
6587b44
add base engine interface and edit unit test
rubywtl Jul 23, 2025
cc05639
add feature flag to querier api
rubywtl Jul 24, 2025
20f1c91
refactor query creation logic code for distributed exec in query api
rubywtl Jul 24, 2025
8e9974d
move distributed exec feature flag to querier & refactor request hand…
rubywtl Jul 24, 2025
eeb720c
change POST request content to encoded HTTP form format
rubywtl Jul 24, 2025
e829bfb
remove feature flag from query api: execute logical plan if possible
rubywtl Jul 24, 2025
e437fcb
remove un-used flags in querier handler
rubywtl Jul 25, 2025
43d3dcc
rebase and adjust tests
rubywtl Jul 28, 2025
db279f7
minor changes due to rebase
rubywtl Jul 28, 2025
8a5bc57
add fields to fragment metadata and querier address to protobuf; veri…
rubywtl Jul 30, 2025
e1c8109
adjustments for scheduler queue & querier coordination
rubywtl Jul 30, 2025
9548e4c
Merge branch 'scheduler/logicalplan-fragment-and-coordination' into i…
rubywtl Jul 30, 2025
4d36ef4
Merge branch 'querier/execute-logicalplan' into integration_test
rubywtl Jul 30, 2025
7804f97
init distributed optimizer and fragmentation logic
rubywtl Aug 4, 2025
8ca6faf
change type to custom remote node
rubywtl Aug 4, 2025
3b37f19
update proto
rubywtl Aug 4, 2025
91b32f1
initial querier server
rubywtl Aug 4, 2025
6ed74f6
initial implementation
rubywtl Aug 7, 2025
bef2d02
able to pick-up fragments and executed by individual queriers
rubywtl Aug 11, 2025
36719a9
fix querier serve streaming errors but next() currently takes too long
rubywtl Aug 13, 2025
2b0eaa6
next() and series() working version - but frontend resp still 500
rubywtl Aug 14, 2025
e7b8254
inject metadata bug fix
rubywtl Aug 14, 2025
be48404
working version! (but unstable)
rubywtl Aug 14, 2025
4080ca6
next() vector streaming fix
rubywtl Aug 15, 2025
f9a4ff8
implement retry logic for root fragment and fix result cache result type
rubywtl Aug 18, 2025
8027899
more stable version (up to 90 days query)
rubywtl Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,3 +1375,109 @@ func TestQuerierEngineConfigs(t *testing.T) {
}

}

func TestQuerierDistributedExecution(t *testing.T) {
// e2e test setup
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

// initialize the flags
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "24h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// Alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-frontend.query-vertical-shard-size": "1",
"-frontend.max-cache-freshness": "1m",
// enable experimental promQL funcs
"-querier.enable-promql-experimental-functions": "true",
// enable distributed execution (logical plan execution)
"-querier.distributed-exec-enabled": "true",
},
)

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

// start services
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
require.NoError(t, s.StartAndWaitReady(queryScheduler))
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()

queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier1 := e2ecortex.NewQuerier("querier-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier2 := e2ecortex.NewQuerier("querier-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

require.NoError(t, s.StartAndWaitReady(querier1, querier2, ingester, distributor))
require.NoError(t, s.WaitReady(queryFrontend))

// wait until distributor and queriers have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier1.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// push some series to Cortex.
distClient, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(time.Minute * 1)
series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})

res, err := distClient.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = distClient.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

for _, q := range []*e2ecortex.CortexService{querier1, querier2} {
c, err := e2ecortex.NewClient("", q.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

_, err = c.Query("series_1", series1Timestamp)
require.NoError(t, err)
}

require.NoError(t, queryScheduler.WaitSumMetrics(e2e.Equals(2), "cortex_query_scheduler_connected_querier_clients"))

// main tests
// - make sure queries are still executable with distributed execution enabled
var body []byte
res, body, err = distClient.QueryRaw(`sum({job="test"})`, series1Timestamp, map[string]string{})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
require.Equal(t, expectedVector1, string(body))

res, body, err = distClient.QueryRaw(`sum({job="test"})`, series2Timestamp, map[string]string{})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
require.Equal(t, expectedVector2, string(body))
}
6 changes: 6 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package api
import (
"context"
"flag"
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
"github.com/cortexproject/cortex/pkg/engine/distributed_execution/querierpb"
"net/http"
"path"
"strings"
Expand Down Expand Up @@ -480,6 +482,10 @@ func (a *API) RegisterQueryScheduler(f *scheduler.Scheduler) {
schedulerpb.RegisterSchedulerForQuerierServer(a.server.GRPC, f)
}

func (a *API) RegisterQuerierServer(q *distributed_execution.QuerierServer) {
querierpb.RegisterQuerierServer(a.server.GRPC, q)
}

// RegisterServiceMapHandler registers the Cortex structs service handler
// TODO: Refactor this code to be accomplished using the services.ServiceManager
// or a future module manager #2291
Expand Down
9 changes: 6 additions & 3 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"
"encoding/json"
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
"html/template"
"net/http"
"path"
Expand All @@ -19,13 +20,13 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/api/queryapi"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/codec"
"github.com/cortexproject/cortex/pkg/querier/stats"
Expand Down Expand Up @@ -163,10 +164,12 @@ func NewQuerierHandler(
cfg Config,
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
engine promql.QueryEngine,
engine engine.QueryEngine,
queryResultCache *distributed_execution.QueryResultCache,
metadataQuerier querier.MetadataQuerier,
reg prometheus.Registerer,
logger log.Logger,
distributedExecEnabled bool,
) http.Handler {
// Prometheus histograms for requests to the querier.
querierRequestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -280,7 +283,7 @@ func NewQuerierHandler(
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
api.Register(legacyPromRouter)

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
queryAPI := queryapi.NewQueryAPI(engine, queryResultCache, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, distributedExecEnabled)

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestBuildInfoAPI(t *testing.T) {
version.Version = tc.version
version.Branch = tc.branch
version.Revision = tc.revision
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, &FakeLogger{})
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, nil, &FakeLogger{}, false)
writer := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/api/v1/status/buildinfo", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "test"))
Expand Down
130 changes: 108 additions & 22 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queryapi
import (
"context"
"fmt"
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
"net/http"
"strconv"
"time"
Expand All @@ -25,31 +26,37 @@ import (
)

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine promql.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
queryResultCache *distributed_execution.QueryResultCache
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
distributedExecEnabled bool
}

func NewQueryAPI(
qe promql.QueryEngine,
qe engine.QueryEngine,
queryResultCache *distributed_execution.QueryResultCache,
q storage.SampleAndChunkQueryable,
statsRenderer v1.StatsRenderer,
logger log.Logger,
codecs []v1.Codec,
CORSOrigin *regexp.Regexp,
distributedExecEnabled bool,
) *QueryAPI {
return &QueryAPI{
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
queryEngine: qe,
queryResultCache: queryResultCache,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
distributedExecEnabled: distributedExecEnabled,
}
}

Expand Down Expand Up @@ -101,10 +108,29 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

var qry promql.Query
startTime := convertMsToTime(start)
endTime := convertMsToTime(end)
stepDuration := convertMsToDuration(step)

byteLP := []byte(r.PostFormValue("plan"))
if len(byteLP) != 0 {
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
}
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query"))
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil}
}
} else { // if there is logical plan field is empty, fall back
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration)
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
}
}

// From now on, we must only return with a finalizer in the result (to
// be called by the caller) or call qry.Close ourselves (which is
// required in the case of a panic).
Expand All @@ -116,6 +142,14 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

ctx = httputil.ContextFromRequest(ctx, r)

if q.distributedExecEnabled {
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
if !isRoot {
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
q.queryResultCache.InitWriting(key)
}
}

res := qry.Exec(ctx)
if res.Err != nil {
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
Expand Down Expand Up @@ -157,9 +191,45 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

var isRoot bool
var queryID, fragmentID uint64
if q.distributedExecEnabled {
isRoot, queryID, fragmentID, _, _ = distributed_execution.ExtractFragmentMetaData(ctx)
if !isRoot {
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
q.queryResultCache.InitWriting(key)
}
}

var qry promql.Query
tsTime := convertMsToTime(ts)

byteLP := []byte(r.PostFormValue("plan"))
if len(byteLP) != 0 {
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
if err != nil {
if q.distributedExecEnabled {
if !isRoot {
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
q.queryResultCache.SetError(key)
}
}
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
}
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
if err != nil {
if !isRoot {
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
q.queryResultCache.SetError(key)
}
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create instant query from logical plan: %v", err)}, nil, nil}
}
} else { // if there is logical plan field is empty, fall back
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime)
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
}
}

// From now on, we must only return with a finalizer in the result (to
Expand Down Expand Up @@ -203,6 +273,20 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
}

if result.data != nil {
ctx := httputil.ContextFromRequest(r.Context(), r)

if q.distributedExecEnabled {
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)

q.queryResultCache.SetComplete(key, result.data)

if isRoot {
q.respond(w, r, result.data, result.warnings, r.FormValue("query"))
}
return
}

q.respond(w, r, result.data, result.warnings, r.FormValue("query"))
return
}
Expand Down Expand Up @@ -240,7 +324,9 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interf
w.Header().Set("Content-Type", codec.ContentType().String())
w.Header().Set("X-Uncompressed-Length", strconv.Itoa(len(b)))
w.WriteHeader(http.StatusOK)
if n, err := w.Write(b); err != nil {

n, err := w.Write(b)
if err != nil {
level.Error(q.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
}
}
Expand Down
Loading