Skip to content

Querier: Support configuring optimizers and XFunctions #6873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
Expand Down
21 changes: 16 additions & 5 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,22 @@ querier:
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]

# Experimental. Use Thanos promql engine
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
# engine.
# CLI flag: -querier.thanos-engine
[thanos_engine: <boolean> | default = false]
thanos_engine:
# Experimental. Use Thanos promql engine
# https://github.com/thanos-io/promql-engine rather than the Prometheus
# promql engine.
# CLI flag: -querier.thanos-engine
[enabled: <boolean> | default = false]

# Enable xincrease, xdelta, xrate etc from Thanos engine.
# CLI flag: -querier.enable-x-functions
[enable_x_functions: <boolean> | default = false]

# Logical plan optimizers. Multiple optimizers can be provided as a
# comma-separated list. Supported values: default, all, propagate-matchers,
# sort-matchers, merge-selects, detect-histogram-stats
# CLI flag: -querier.optimizers
[optimizers: <string> | default = "default"]

# If enabled, ignore max query length check at Querier select method. Users
# can choose to ignore it since the validation can be done before Querier
Expand Down
38 changes: 33 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4302,11 +4302,22 @@ store_gateway_client:
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]

# Experimental. Use Thanos promql engine
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
# engine.
# CLI flag: -querier.thanos-engine
[thanos_engine: <boolean> | default = false]
thanos_engine:
# Experimental. Use Thanos promql engine
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
# engine.
# CLI flag: -querier.thanos-engine
[enabled: <boolean> | default = false]

# Enable xincrease, xdelta, xrate etc from Thanos engine.
# CLI flag: -querier.enable-x-functions
[enable_x_functions: <boolean> | default = false]

# Logical plan optimizers. Multiple optimizers can be provided as a
# comma-separated list. Supported values: default, all, propagate-matchers,
# sort-matchers, merge-selects, detect-histogram-stats
# CLI flag: -querier.optimizers
[optimizers: <string> | default = "default"]

# If enabled, ignore max query length check at Querier select method. Users can
# choose to ignore it since the validation can be done before Querier evaluation
Expand Down Expand Up @@ -5023,6 +5034,23 @@ ring:
# ruler.enable-ha-evaluation is true.
# CLI flag: -ruler.liveness-check-timeout
[liveness_check_timeout: <duration> | default = 1s]

thanos_engine:
# Experimental. Use Thanos promql engine
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
# engine.
# CLI flag: -ruler.thanos-engine
[enabled: <boolean> | default = false]

# Enable xincrease, xdelta, xrate etc from Thanos engine.
# CLI flag: -ruler.enable-x-functions
[enable_x_functions: <boolean> | default = false]

# Logical plan optimizers. Multiple optimizers can be provided as a
# comma-separated list. Supported values: default, all, propagate-matchers,
# sort-matchers, merge-selects, detect-histogram-stats
# CLI flag: -ruler.optimizers
[optimizers: <string> | default = "default"]
```

### `ruler_storage_config`
Expand Down
65 changes: 65 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/promql-engine/execution/parse"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
Expand Down Expand Up @@ -416,6 +417,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
"-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled),
"-querier.query-store-for-labels-enabled": "true",
"-querier.thanos-engine": strconv.FormatBool(thanosEngine),
"-querier.enable-x-functions": strconv.FormatBool(thanosEngine),
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
Expand Down Expand Up @@ -1310,3 +1312,66 @@ func TestQuerierMaxSamplesLimit(t *testing.T) {
Error: "query processing would load too many samples into memory in query execution",
})
}

func TestQuerierEngineConfigs(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-querier.thanos-engine": "true",
"-querier.enable-x-functions": "true",
"-querier.optimizers": "all",
})

// Start dependencies.
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components for the write path.
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
require.NoError(t, s.Start(queryFrontend))

querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the distributor and querier has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
series1Timestamp := time.Now()
series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
series2, _ := generateSeries("series_2", series1Timestamp, prompb.Label{Name: "job", Value: "test"})

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

for xFunc := range parse.XFunctions {
result, err := c.Query(fmt.Sprintf(`%s(series_1{job="test"}[1m])`, xFunc), series1Timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit surprised that this test could pass when querying query frontend. I don't see we register those functions there. Only in querier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe because the middlewares are disabled by default.

require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
}

}
2 changes: 1 addition & 1 deletion pkg/configs/userconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/parser"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/configs/userconfig/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/parser"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down
7 changes: 7 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func New(cfg Config) (*Cortex, error) {
return nil, err
}

cortex.setupPromQLFunctions()
return cortex, nil
}

Expand Down Expand Up @@ -537,3 +538,9 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc {
util.WriteTextResponse(w, "ready")
}
}

func (t *Cortex) setupPromQLFunctions() {
// The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (https://github.com/prometheus/prometheus/pull/14930)
// The cortex supports holt_winters for users using this function.
querier.EnableExperimentalPromQLFunctions(t.Cfg.Querier.EnablePromQLExperimentalFunctions, true)
}
16 changes: 3 additions & 13 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"

"log/slog"
"net/http"
"runtime"
Expand All @@ -18,8 +19,6 @@ import (
"github.com/prometheus/prometheus/rules"
prom_storage "github.com/prometheus/prometheus/storage"
"github.com/thanos-io/objstore"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/querysharding"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
Expand All @@ -32,6 +31,7 @@ import (
configAPI "github.com/cortexproject/cortex/pkg/configs/api"
"github.com/cortexproject/cortex/pkg/configs/db"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/flusher"
"github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
Expand Down Expand Up @@ -555,7 +555,6 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
t.Cfg.Querier.DefaultEvaluationInterval,
t.Cfg.Querier.MaxSubQuerySteps,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.EnablePromQLExperimentalFunctions,
)

return services.NewIdleService(nil, func(_ error) error {
Expand Down Expand Up @@ -634,7 +633,6 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)

var queryEngine promql.QueryEngine
opts := promql.EngineOpts{
Logger: util_log.SLogger,
Reg: rulerRegisterer,
Expand All @@ -649,15 +647,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds()
},
}
if t.Cfg.Querier.ThanosEngine {
queryEngine = engine.New(engine.Opts{
EngineOpts: opts,
LogicalOptimizers: logicalplan.AllOptimizers,
EnableAnalysis: true,
})
} else {
queryEngine = promql.NewEngine(opts)
}
queryEngine := engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
63 changes: 63 additions & 0 deletions pkg/engine/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package engine

import (
"flag"
"fmt"
"strings"

"github.com/thanos-io/promql-engine/logicalplan"
)

var supportedOptimizers = []string{"default", "all", "propagate-matchers", "sort-matchers", "merge-selects", "detect-histogram-stats"}

// ThanosEngineConfig contains the configuration to create engine
type ThanosEngineConfig struct {
Enabled bool `yaml:"enabled"`
EnableXFunctions bool `yaml:"enable_x_functions"`
Optimizers string `yaml:"optimizers"`
LogicalOptimizers []logicalplan.Optimizer `yaml:"-"`
}

func (cfg *ThanosEngineConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, prefix+"thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
f.BoolVar(&cfg.EnableXFunctions, prefix+"enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.")
f.StringVar(&cfg.Optimizers, prefix+"optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", "))
}

func (cfg *ThanosEngineConfig) Validate() error {
splitOptimizers := strings.Split(cfg.Optimizers, ",")

for _, optimizer := range splitOptimizers {
if optimizer == "all" || optimizer == "default" {
if len(splitOptimizers) > 1 {
return fmt.Errorf("special optimizer %s cannot be combined with other optimizers", optimizer)
}
}
optimizers, err := getOptimizer(optimizer)
if err != nil {
return err
}
cfg.LogicalOptimizers = append(cfg.LogicalOptimizers, optimizers...)
}

return nil
}

func getOptimizer(name string) ([]logicalplan.Optimizer, error) {
switch name {
case "default":
return logicalplan.DefaultOptimizers, nil
case "all":
return logicalplan.AllOptimizers, nil
case "propagate-matchers":
return []logicalplan.Optimizer{logicalplan.PropagateMatchersOptimizer{}}, nil
case "sort-matchers":
return []logicalplan.Optimizer{logicalplan.SortMatchers{}}, nil
case "merge-selects":
return []logicalplan.Optimizer{logicalplan.MergeSelectsOptimizer{}}, nil
case "detect-histogram-stats":
return []logicalplan.Optimizer{logicalplan.DetectHistogramStatsOptimizer{}}, nil
default:
return nil, fmt.Errorf("unknown optimizer %s", name)
}
}
8 changes: 4 additions & 4 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
thanosengine "github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
)

type engineKeyType struct{}
Expand Down Expand Up @@ -52,15 +51,16 @@ type Engine struct {
engineSwitchQueriesTotal *prometheus.CounterVec
}

func New(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *Engine {
func New(opts promql.EngineOpts, thanosEngineCfg ThanosEngineConfig, reg prometheus.Registerer) *Engine {
prometheusEngine := promql.NewEngine(opts)

var thanosEngine *thanosengine.Engine
if enableThanosEngine {
if thanosEngineCfg.Enabled {
thanosEngine = thanosengine.New(thanosengine.Opts{
EngineOpts: opts,
LogicalOptimizers: logicalplan.DefaultOptimizers,
LogicalOptimizers: thanosEngineCfg.LogicalOptimizers,
EnableAnalysis: true,
EnableXFunctions: thanosEngineCfg.EnableXFunctions,
})
}

Expand Down
Loading
Loading