diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ff1a4b2ce3..3d98cc34238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580 * [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738 * [FEATURE] Querier: Allow choosing PromQL engine via header. #6777 +* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845 * [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859 * [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index ac9dd92d291..04d74307420 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -252,11 +252,22 @@ querier: # CLI flag: -querier.shuffle-sharding-ingesters-lookback-period [shuffle_sharding_ingesters_lookback_period: | default = 0s] - # Experimental. Use Thanos promql engine - # https://github.com/thanos-io/promql-engine rather than the Prometheus promql - # engine. - # CLI flag: -querier.thanos-engine - [thanos_engine: | default = false] + thanos_engine: + # Experimental. Use Thanos promql engine + # https://github.com/thanos-io/promql-engine rather than the Prometheus + # promql engine. + # CLI flag: -querier.thanos-engine + [enabled: | default = false] + + # Enable xincrease, xdelta, xrate etc from Thanos engine. + # CLI flag: -querier.enable-x-functions + [enable_x_functions: | default = false] + + # Logical plan optimizers. Multiple optimizers can be provided as a + # comma-separated list. Supported values: default, all, propagate-matchers, + # sort-matchers, merge-selects, detect-histogram-stats + # CLI flag: -querier.optimizers + [optimizers: | default = "default"] # If enabled, ignore max query length check at Querier select method. Users # can choose to ignore it since the validation can be done before Querier diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2963a87348c..0ce98cb65af 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4302,11 +4302,22 @@ store_gateway_client: # CLI flag: -querier.shuffle-sharding-ingesters-lookback-period [shuffle_sharding_ingesters_lookback_period: | default = 0s] -# Experimental. Use Thanos promql engine -# https://github.com/thanos-io/promql-engine rather than the Prometheus promql -# engine. -# CLI flag: -querier.thanos-engine -[thanos_engine: | default = false] +thanos_engine: + # Experimental. Use Thanos promql engine + # https://github.com/thanos-io/promql-engine rather than the Prometheus promql + # engine. + # CLI flag: -querier.thanos-engine + [enabled: | default = false] + + # Enable xincrease, xdelta, xrate etc from Thanos engine. + # CLI flag: -querier.enable-x-functions + [enable_x_functions: | default = false] + + # Logical plan optimizers. Multiple optimizers can be provided as a + # comma-separated list. Supported values: default, all, propagate-matchers, + # sort-matchers, merge-selects, detect-histogram-stats + # CLI flag: -querier.optimizers + [optimizers: | default = "default"] # If enabled, ignore max query length check at Querier select method. Users can # choose to ignore it since the validation can be done before Querier evaluation @@ -5023,6 +5034,23 @@ ring: # ruler.enable-ha-evaluation is true. # CLI flag: -ruler.liveness-check-timeout [liveness_check_timeout: | default = 1s] + +thanos_engine: + # Experimental. Use Thanos promql engine + # https://github.com/thanos-io/promql-engine rather than the Prometheus promql + # engine. + # CLI flag: -ruler.thanos-engine + [enabled: | default = false] + + # Enable xincrease, xdelta, xrate etc from Thanos engine. + # CLI flag: -ruler.enable-x-functions + [enable_x_functions: | default = false] + + # Logical plan optimizers. Multiple optimizers can be provided as a + # comma-separated list. Supported values: default, all, propagate-matchers, + # sort-matchers, merge-selects, detect-histogram-stats + # CLI flag: -ruler.optimizers + [optimizers: | default = "default"] ``` ### `ruler_storage_config` diff --git a/integration/querier_test.go b/integration/querier_test.go index 6305b4433c5..7e16b587dbb 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/execution/parse" "github.com/cortexproject/cortex/integration/e2e" e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" @@ -416,6 +417,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled), "-querier.query-store-for-labels-enabled": "true", "-querier.thanos-engine": strconv.FormatBool(thanosEngine), + "-querier.enable-x-functions": strconv.FormatBool(thanosEngine), // Ingester. "-ring.store": "consul", "-consul.hostname": consul.NetworkHTTPEndpoint(), @@ -1310,3 +1312,66 @@ func TestQuerierMaxSamplesLimit(t *testing.T) { Error: "query processing would load too many samples into memory in query execution", }) } + +func TestQuerierEngineConfigs(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-querier.thanos-engine": "true", + "-querier.enable-x-functions": "true", + "-querier.optimizers": "all", + }) + + // Start dependencies. + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) + + // Wait until the distributor and querier has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + series1Timestamp := time.Now() + series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "job", Value: "test"}) + series2, _ := generateSeries("series_2", series1Timestamp, prompb.Label{Name: "job", Value: "test"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + for xFunc := range parse.XFunctions { + result, err := c.Query(fmt.Sprintf(`%s(series_1{job="test"}[1m])`, xFunc), series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + } + +} diff --git a/pkg/configs/userconfig/config.go b/pkg/configs/userconfig/config.go index 3900ff9f41b..25e7d39b38b 100644 --- a/pkg/configs/userconfig/config.go +++ b/pkg/configs/userconfig/config.go @@ -9,10 +9,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "gopkg.in/yaml.v3" + "github.com/cortexproject/cortex/pkg/parser" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/configs/userconfig/config_test.go b/pkg/configs/userconfig/config_test.go index 06893bd605a..392ca911ca9 100644 --- a/pkg/configs/userconfig/config_test.go +++ b/pkg/configs/userconfig/config_test.go @@ -12,12 +12,12 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" + "github.com/cortexproject/cortex/pkg/parser" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index b141adc8127..379501db0e6 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -379,6 +379,7 @@ func New(cfg Config) (*Cortex, error) { return nil, err } + cortex.setupPromQLFunctions() return cortex, nil } @@ -537,3 +538,9 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc { util.WriteTextResponse(w, "ready") } } + +func (t *Cortex) setupPromQLFunctions() { + // The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (https://github.com/prometheus/prometheus/pull/14930) + // The cortex supports holt_winters for users using this function. + querier.EnableExperimentalPromQLFunctions(t.Cfg.Querier.EnablePromQLExperimentalFunctions, true) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 5f4679b9d4d..b2624f4af41 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "log/slog" "net/http" "runtime" @@ -18,8 +19,6 @@ import ( "github.com/prometheus/prometheus/rules" prom_storage "github.com/prometheus/prometheus/storage" "github.com/thanos-io/objstore" - "github.com/thanos-io/promql-engine/engine" - "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/querysharding" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" @@ -32,6 +31,7 @@ import ( configAPI "github.com/cortexproject/cortex/pkg/configs/api" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" @@ -555,7 +555,6 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro t.Cfg.Querier.DefaultEvaluationInterval, t.Cfg.Querier.MaxSubQuerySteps, t.Cfg.Querier.LookbackDelta, - t.Cfg.Querier.EnablePromQLExperimentalFunctions, ) return services.NewIdleService(nil, func(_ error) error { @@ -634,7 +633,6 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) - var queryEngine promql.QueryEngine opts := promql.EngineOpts{ Logger: util_log.SLogger, Reg: rulerRegisterer, @@ -649,15 +647,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds() }, } - if t.Cfg.Querier.ThanosEngine { - queryEngine = engine.New(engine.Opts{ - EngineOpts: opts, - LogicalOptimizers: logicalplan.AllOptimizers, - EnableAnalysis: true, - }) - } else { - queryEngine = promql.NewEngine(opts) - } + queryEngine := engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer) managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer) manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) diff --git a/pkg/engine/config.go b/pkg/engine/config.go new file mode 100644 index 00000000000..e964196bb0e --- /dev/null +++ b/pkg/engine/config.go @@ -0,0 +1,63 @@ +package engine + +import ( + "flag" + "fmt" + "strings" + + "github.com/thanos-io/promql-engine/logicalplan" +) + +var supportedOptimizers = []string{"default", "all", "propagate-matchers", "sort-matchers", "merge-selects", "detect-histogram-stats"} + +// ThanosEngineConfig contains the configuration to create engine +type ThanosEngineConfig struct { + Enabled bool `yaml:"enabled"` + EnableXFunctions bool `yaml:"enable_x_functions"` + Optimizers string `yaml:"optimizers"` + LogicalOptimizers []logicalplan.Optimizer `yaml:"-"` +} + +func (cfg *ThanosEngineConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, prefix+"thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") + f.BoolVar(&cfg.EnableXFunctions, prefix+"enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.") + f.StringVar(&cfg.Optimizers, prefix+"optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", ")) +} + +func (cfg *ThanosEngineConfig) Validate() error { + splitOptimizers := strings.Split(cfg.Optimizers, ",") + + for _, optimizer := range splitOptimizers { + if optimizer == "all" || optimizer == "default" { + if len(splitOptimizers) > 1 { + return fmt.Errorf("special optimizer %s cannot be combined with other optimizers", optimizer) + } + } + optimizers, err := getOptimizer(optimizer) + if err != nil { + return err + } + cfg.LogicalOptimizers = append(cfg.LogicalOptimizers, optimizers...) + } + + return nil +} + +func getOptimizer(name string) ([]logicalplan.Optimizer, error) { + switch name { + case "default": + return logicalplan.DefaultOptimizers, nil + case "all": + return logicalplan.AllOptimizers, nil + case "propagate-matchers": + return []logicalplan.Optimizer{logicalplan.PropagateMatchersOptimizer{}}, nil + case "sort-matchers": + return []logicalplan.Optimizer{logicalplan.SortMatchers{}}, nil + case "merge-selects": + return []logicalplan.Optimizer{logicalplan.MergeSelectsOptimizer{}}, nil + case "detect-histogram-stats": + return []logicalplan.Optimizer{logicalplan.DetectHistogramStatsOptimizer{}}, nil + default: + return nil, fmt.Errorf("unknown optimizer %s", name) + } +} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 2468bcfccbc..be22e4573ad 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -10,7 +10,6 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" thanosengine "github.com/thanos-io/promql-engine/engine" - "github.com/thanos-io/promql-engine/logicalplan" ) type engineKeyType struct{} @@ -52,15 +51,16 @@ type Engine struct { engineSwitchQueriesTotal *prometheus.CounterVec } -func New(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *Engine { +func New(opts promql.EngineOpts, thanosEngineCfg ThanosEngineConfig, reg prometheus.Registerer) *Engine { prometheusEngine := promql.NewEngine(opts) var thanosEngine *thanosengine.Engine - if enableThanosEngine { + if thanosEngineCfg.Enabled { thanosEngine = thanosengine.New(thanosengine.Opts{ EngineOpts: opts, - LogicalOptimizers: logicalplan.DefaultOptimizers, + LogicalOptimizers: thanosEngineCfg.LogicalOptimizers, EnableAnalysis: true, + EnableXFunctions: thanosEngineCfg.EnableXFunctions, }) } diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index facae793edc..7b270e6604a 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -3,6 +3,7 @@ package engine import ( "bytes" "context" + "fmt" "net/http" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/promqltest" "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/execution/parse" utillog "github.com/cortexproject/cortex/pkg/util/log" ) @@ -37,7 +39,7 @@ func TestEngine_Fallback(t *testing.T) { Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), Reg: reg, } - queryEngine := New(opts, true, reg) + queryEngine := New(opts, ThanosEngineConfig{Enabled: true}, reg) // instant query, should go to fallback _, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now) @@ -68,7 +70,7 @@ func TestEngine_Switch(t *testing.T) { Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), Reg: reg, } - queryEngine := New(opts, true, reg) + queryEngine := New(opts, ThanosEngineConfig{Enabled: true}, reg) // Query Prometheus engine r := &http.Request{Header: http.Header{}} @@ -96,3 +98,28 @@ func TestEngine_Switch(t *testing.T) { cortex_engine_switch_queries_total{engine_type="thanos"} 2 `), "cortex_engine_switch_queries_total")) } + +func TestEngine_XFunctions(t *testing.T) { + ctx := context.Background() + reg := prometheus.NewRegistry() + + now := time.Now() + start := time.Now().Add(-time.Minute * 5) + step := time.Minute + queryable := promqltest.LoadedStorage(t, "") + opts := promql.EngineOpts{ + Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), + Reg: reg, + } + queryEngine := New(opts, ThanosEngineConfig{Enabled: true, EnableXFunctions: true}, reg) + + for name := range parse.XFunctions { + t.Run(name, func(t *testing.T) { + _, err := queryEngine.NewInstantQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), now) + require.NoError(t, err) + + _, err = queryEngine.NewRangeQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), start, now, step) + require.NoError(t, err) + }) + } +} diff --git a/pkg/parser/parser.go b/pkg/parser/parser.go new file mode 100644 index 00000000000..07940c6aaaa --- /dev/null +++ b/pkg/parser/parser.go @@ -0,0 +1,21 @@ +package parser + +import ( + "maps" + + promqlparser "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/promql-engine/execution/parse" +) + +var functions = buildFunctions() + +func buildFunctions() map[string]*promqlparser.Function { + fns := make(map[string]*promqlparser.Function, len(promqlparser.Functions)) + maps.Copy(fns, promqlparser.Functions) + maps.Copy(fns, parse.XFunctions) + return fns +} + +func ParseExpr(qs string) (promqlparser.Expr, error) { + return promqlparser.NewParser(qs, promqlparser.WithFunctions(functions)).ParseExpr() +} diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index 4f4b57bfe4d..3f6d13aba19 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -149,7 +149,7 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) { require.Equal(t, histograms[0], val) require.Equal(t, int64(1*time.Second/time.Millisecond), actual) - // Histogram chunk should support querying float histograms since it is what Query Engine does. + // Histogram chunk should support querying float histograms since it is what Query ThanosEngine does. actualT, fh := sut.AtFloatHistogram(nil) require.Equal(t, histograms[0].ToFloat(nil), fh) require.Equal(t, int64(1*time.Second/time.Millisecond), actualT) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b1a94f1d40f..ffe6c2e0b50 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -85,9 +85,7 @@ type Config struct { ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"` - // Experimental. Use https://github.com/thanos-io/promql-engine rather than - // the Prometheus query engine. - ThanosEngine bool `yaml:"thanos_engine"` + ThanosEngine engine.ThanosEngineConfig `yaml:"thanos_engine"` // Ignore max query length check at Querier. IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"` @@ -111,6 +109,8 @@ var ( // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.ThanosEngine.RegisterFlagsWithPrefix("querier.", f) + //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods flagext.DeprecatedFlag(f, "querier.ingester-streaming", "Deprecated: Use streaming RPCs to query ingester. QueryStream is always enabled and the flag is not effective anymore.", util_log.Logger) //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods @@ -139,7 +139,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).") f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") - f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.") f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") @@ -181,6 +180,10 @@ func (cfg *Config) Validate() error { } } + if err := cfg.ThanosEngine.Validate(); err != nil { + return err + } + return nil } @@ -228,10 +231,6 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor }) maxConcurrentMetric.Set(float64(cfg.MaxConcurrent)) - // The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (https://github.com/prometheus/prometheus/pull/14930) - // The cortex supports holt_winters for users using this function. - EnableExperimentalPromQLFunctions(cfg.EnablePromQLExperimentalFunctions, true) - opts := promql.EngineOpts{ Logger: util_log.GoKitLogToSlog(logger), Reg: reg, diff --git a/pkg/querier/tripperware/instantquery/limits.go b/pkg/querier/tripperware/instantquery/limits.go index 5c9514b957b..477fe4c36f4 100644 --- a/pkg/querier/tripperware/instantquery/limits.go +++ b/pkg/querier/tripperware/instantquery/limits.go @@ -5,9 +5,9 @@ import ( "net/http" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/promql" @@ -45,7 +45,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe // Enforce the max query length. if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 { - expr, err := parser.ParseExpr(r.GetQuery()) + expr, err := cortexparser.ParseExpr(r.GetQuery()) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } diff --git a/pkg/querier/tripperware/instantquery/limits_test.go b/pkg/querier/tripperware/instantquery/limits_test.go index 74f5c7c4d86..a365eab414c 100644 --- a/pkg/querier/tripperware/instantquery/limits_test.go +++ b/pkg/querier/tripperware/instantquery/limits_test.go @@ -6,13 +6,13 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -24,7 +24,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { ) wrongQuery := `up[` - _, parserErr := parser.ParseExpr(wrongQuery) + _, parserErr := cortexparser.ParseExpr(wrongQuery) tests := map[string]struct { maxQueryLength time.Duration diff --git a/pkg/querier/tripperware/merge.go b/pkg/querier/tripperware/merge.go index 0fd385cecf2..0e3d8aabb4b 100644 --- a/pkg/querier/tripperware/merge.go +++ b/pkg/querier/tripperware/merge.go @@ -10,6 +10,7 @@ import ( "github.com/thanos-io/thanos/pkg/strutil" "github.com/cortexproject/cortex/pkg/cortexpb" + cortexparser "github.com/cortexproject/cortex/pkg/parser" ) const StatusSuccess = "success" @@ -284,7 +285,7 @@ func getSortValueFromPair(samples []*pair, i int) float64 { } func sortPlanForQuery(q string) (sortPlan, error) { - expr, err := promqlparser.ParseExpr(q) + expr, err := cortexparser.ParseExpr(q) if err != nil { return 0, err } diff --git a/pkg/querier/tripperware/query_attribute_matcher.go b/pkg/querier/tripperware/query_attribute_matcher.go index 7edd9f0b098..002568b7a4e 100644 --- a/pkg/querier/tripperware/query_attribute_matcher.go +++ b/pkg/querier/tripperware/query_attribute_matcher.go @@ -6,9 +6,9 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/validation" @@ -24,7 +24,7 @@ func rejectQueryOrSetPriority(r *http.Request, now time.Time, lookbackDelta time if op == "query" || op == "query_range" { query := r.FormValue("query") - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } diff --git a/pkg/querier/tripperware/queryrange/limits.go b/pkg/querier/tripperware/queryrange/limits.go index 817fcf50834..7b1f17b55a9 100644 --- a/pkg/querier/tripperware/queryrange/limits.go +++ b/pkg/querier/tripperware/queryrange/limits.go @@ -7,9 +7,9 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/timestamp" - "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" @@ -82,7 +82,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength) } - expr, err := parser.ParseExpr(r.GetQuery()) + expr, err := cortexparser.ParseExpr(r.GetQuery()) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index 9a7668b9812..3690e1e0386 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -6,13 +6,13 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/validation" diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index b5cdef60386..ed0c00190c8 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -83,7 +83,6 @@ func TestRoundTrip(t *testing.T) { time.Minute, 0, 0, - false, ) for i, tc := range []struct { diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index 4ae249efc89..db6d2f284f5 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -27,6 +27,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/cortexpb" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/partialdata" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" @@ -326,7 +327,7 @@ func (s resultsCache) isAtModifierCachable(ctx context.Context, r tripperware.Re if !strings.Contains(query, "@") { return true } - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { // We are being pessimistic in such cases. level.Warn(util_log.WithContext(ctx, s.logger)).Log("msg", "failed to parse query, considering @ modifier as not cacheable", "query", query, "err", err) @@ -371,7 +372,7 @@ func (s resultsCache) isOffsetCachable(ctx context.Context, r tripperware.Reques if !strings.Contains(query, "offset") { return true } - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { level.Warn(util_log.WithContext(ctx, s.logger)).Log("msg", "failed to parse query, considering offset as not cacheable", "query", query, "err", err) return false diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index 6ff1f1a15e6..980d2867a87 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -11,6 +11,7 @@ import ( "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" + cortexparser "github.com/cortexproject/cortex/pkg/parser" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" @@ -111,7 +112,7 @@ func splitQuery(r tripperware.Request, interval time.Duration) ([]tripperware.Re // For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00` // If the modifier is already a constant, it will be returned as is. func evaluateAtModifierFunction(query string, start, end int64) (string, error) { - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { return "", httpgrpc.Errorf(http.StatusBadRequest, "%s", err) } @@ -167,7 +168,7 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer return ctx, baseInterval, nil } - queryExpr, err := parser.ParseExpr(r.GetQuery()) + queryExpr, err := cortexparser.ParseExpr(r.GetQuery()) if err != nil { return ctx, baseInterval, err } diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index c854959be03..3934574fcb0 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -10,15 +10,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" - - "github.com/prometheus/prometheus/promql/parser" - "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "go.uber.org/atomic" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -434,7 +433,7 @@ func Test_evaluateAtModifier(t *testing.T) { require.Equal(t, tt.expectedErrorCode, int(httpResp.Code)) } else { require.NoError(t, err) - expectedExpr, err := parser.ParseExpr(tt.expected) + expectedExpr, err := cortexparser.ParseExpr(tt.expected) require.NoError(t, err) require.Equal(t, expectedExpr.String(), out) } @@ -1044,7 +1043,7 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - expr, err := parser.ParseExpr(tc.req.GetQuery()) + expr, err := cortexparser.ParseExpr(tc.req.GetQuery()) require.Nil(t, err) durationFetchedByRange, durationFetchedBySelectors := analyzeDurationFetchedByQueryExpr(expr, tc.req.GetStart(), tc.req.GetEnd(), tc.baseSplitInterval, tc.lookbackDelta) require.Equal(t, tc.expectedDurationFetchedByRange, durationFetchedByRange) diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index b9be569d6d9..144bb04da36 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -31,7 +31,6 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" - "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" @@ -117,13 +116,8 @@ func NewQueryTripperware( defaultSubQueryInterval time.Duration, maxSubQuerySteps int64, lookbackDelta time.Duration, - enablePromQLExperimentalFunctions bool, ) Tripperware { - // The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (https://github.com/prometheus/prometheus/pull/14930) - // The cortex supports holt_winters for users using this function. - querier.EnableExperimentalPromQLFunctions(enablePromQLExperimentalFunctions, true) - // Per tenant query metrics. queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_frontend_queries_total", diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 0ef5e1519d5..8649147121d 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -347,7 +347,6 @@ cortex_query_frontend_queries_total{op="query", source="api", user="1"} 1 time.Minute, tc.maxSubQuerySteps, 0, - false, ) resp, err := tw(downstream).RoundTrip(req) if tc.expectedErr == nil { diff --git a/pkg/querier/tripperware/subquery.go b/pkg/querier/tripperware/subquery.go index cebce45f261..2226192a89a 100644 --- a/pkg/querier/tripperware/subquery.go +++ b/pkg/querier/tripperware/subquery.go @@ -6,6 +6,8 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + + cortexparser "github.com/cortexproject/cortex/pkg/parser" ) var ( @@ -18,7 +20,7 @@ const ( // SubQueryStepSizeCheck ensures the query doesn't contain too small step size in subqueries. func SubQueryStepSizeCheck(query string, defaultSubQueryInterval time.Duration, maxStep int64) error { - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { // If query fails to parse, we don't throw step size error // but fail query later on querier. diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index bb13f3e1438..db2b61d238a 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/user" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -413,7 +414,7 @@ http_requests_total`, s := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { q := r.FormValue("query") - expr, _ := parser.ParseExpr(q) + expr, _ := cortexparser.ParseExpr(q) shardIndex := int64(0) parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error { diff --git a/pkg/querysharding/util.go b/pkg/querysharding/util.go index 3f2a2d82432..2b438ce275e 100644 --- a/pkg/querysharding/util.go +++ b/pkg/querysharding/util.go @@ -7,6 +7,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/thanos/pkg/store/storepb" + + cortexparser "github.com/cortexproject/cortex/pkg/parser" ) const ( @@ -21,7 +23,7 @@ var ( ) func InjectShardingInfo(query string, shardInfo *storepb.ShardInfo) (string, error) { - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { return "", err } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 862bcc54706..c8d8302e27a 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -15,13 +15,13 @@ import ( "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/cortexpb" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring/client" @@ -171,7 +171,7 @@ func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, // Enforce the max query length. maxQueryLength := overrides.MaxQueryLength(userID) if maxQueryLength > 0 { - expr, err := parser.ParseExpr(qs) + expr, err := cortexparser.ParseExpr(qs) // If failed to parse expression, skip checking select range. // Fail the query in the engine. if err == nil { diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 22d475fe720..70c07233f41 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -29,6 +29,8 @@ import ( "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/engine" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ruler/rulespb" @@ -172,6 +174,8 @@ type Config struct { EnableHAEvaluation bool `yaml:"enable_ha_evaluation"` LivenessCheckTimeout time.Duration `yaml:"liveness_check_timeout"` + + ThanosEngine engine.ThanosEngineConfig `yaml:"thanos_engine"` } // Validate config and returns error on failure @@ -199,6 +203,11 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error { if !util.StringsContain(supportedQueryResponseFormats, cfg.QueryResponseFormat) { return errInvalidQueryResponseFormat } + + if err := cfg.ThanosEngine.Validate(); err != nil { + return err + } + return nil } @@ -208,6 +217,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.frontendClient", "", f) cfg.Ring.RegisterFlags(f) cfg.Notifier.RegisterFlags(f) + cfg.ThanosEngine.RegisterFlagsWithPrefix("ruler.", f) // Deprecated Flags that will be maintained to avoid user disruption @@ -1278,7 +1288,7 @@ func (r *Ruler) ruleGroupListToGroupStateDesc(userID string, backupGroups rulesp } var ruleDesc *RuleStateDesc - query, err := parser.ParseExpr(r.GetExpr()) + query, err := cortexparser.ParseExpr(r.GetExpr()) if err != nil { return nil, errors.Errorf("failed to parse rule query '%v'", r.GetExpr()) } diff --git a/pkg/util/promql/promql_test.go b/pkg/util/promql/promql_test.go index ed35b89c2e3..284bd09ca6c 100644 --- a/pkg/util/promql/promql_test.go +++ b/pkg/util/promql/promql_test.go @@ -4,8 +4,9 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" + + cortexparser "github.com/cortexproject/cortex/pkg/parser" ) func TestFindNonOverlapQueryLength(t *testing.T) { @@ -78,7 +79,7 @@ func TestFindNonOverlapQueryLength(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - expr, err := parser.ParseExpr(tc.query) + expr, err := cortexparser.ParseExpr(tc.query) require.NoError(t, err) duration := FindNonOverlapQueryLength(expr, 0, 0, time.Minute*5) require.Equal(t, tc.expectedLength, duration) diff --git a/pkg/util/time_test.go b/pkg/util/time_test.go index 239c4eb5b0b..6bdeb231938 100644 --- a/pkg/util/time_test.go +++ b/pkg/util/time_test.go @@ -8,11 +8,11 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/util/test" ) @@ -178,7 +178,7 @@ func TestFindMinMaxTime(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - expr, _ := parser.ParseExpr(testData.query) + expr, _ := cortexparser.ParseExpr(testData.query) url := "/query_range?query=" + testData.query + "&start=" + strconv.FormatInt(testData.queryStartTime.Truncate(time.Minute).Unix(), 10) +