From 895dd48aef10d42a9cda854b214f2262d9d4df59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Thu, 10 Jul 2025 11:34:13 -0700 Subject: [PATCH 1/2] Querier: Support configuring optimizers and XFunctions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 21 +++++-- docs/configuration/config-file-reference.md | 21 +++++-- integration/querier_test.go | 65 +++++++++++++++++++++ pkg/cortex/modules.go | 15 +---- pkg/engine/config.go | 64 ++++++++++++++++++++ pkg/engine/engine.go | 8 +-- pkg/engine/engine_test.go | 31 +++++++++- pkg/querier/querier.go | 13 +++-- 9 files changed, 206 insertions(+), 33 deletions(-) create mode 100644 pkg/engine/config.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ff1a4b2ce3..3d98cc34238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580 * [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738 * [FEATURE] Querier: Allow choosing PromQL engine via header. #6777 +* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845 * [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859 * [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index ac9dd92d291..e646b2aff0e 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -252,11 +252,22 @@ querier: # CLI flag: -querier.shuffle-sharding-ingesters-lookback-period [shuffle_sharding_ingesters_lookback_period: | default = 0s] - # Experimental. Use Thanos promql engine - # https://github.com/thanos-io/promql-engine rather than the Prometheus promql - # engine. - # CLI flag: -querier.thanos-engine - [thanos_engine: | default = false] + thanos_engine: + # Experimental. Use Thanos promql engine + # https://github.com/thanos-io/promql-engine rather than the Prometheus + # promql engine. + # CLI flag: -querier.thanos-engine + [enable_thanos_engine: | default = false] + + # Enable xincrease, xdelta, xrate etc from Thanos engine. + # CLI flag: -querier.enable-x-functions + [enable_x_functions: | default = false] + + # Logical plan optimizers. Multiple optimizers can be provided as a + # comma-separated list. Supported values: default, all, propagate-matchers, + # sort-matchers, merge-selects, detect-histogram-stats + # CLI flag: -querier.optimizers + [optimizers: | default = "default"] # If enabled, ignore max query length check at Querier select method. Users # can choose to ignore it since the validation can be done before Querier diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2963a87348c..60079d25639 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4302,11 +4302,22 @@ store_gateway_client: # CLI flag: -querier.shuffle-sharding-ingesters-lookback-period [shuffle_sharding_ingesters_lookback_period: | default = 0s] -# Experimental. Use Thanos promql engine -# https://github.com/thanos-io/promql-engine rather than the Prometheus promql -# engine. -# CLI flag: -querier.thanos-engine -[thanos_engine: | default = false] +thanos_engine: + # Experimental. Use Thanos promql engine + # https://github.com/thanos-io/promql-engine rather than the Prometheus promql + # engine. + # CLI flag: -querier.thanos-engine + [enable_thanos_engine: | default = false] + + # Enable xincrease, xdelta, xrate etc from Thanos engine. + # CLI flag: -querier.enable-x-functions + [enable_x_functions: | default = false] + + # Logical plan optimizers. Multiple optimizers can be provided as a + # comma-separated list. Supported values: default, all, propagate-matchers, + # sort-matchers, merge-selects, detect-histogram-stats + # CLI flag: -querier.optimizers + [optimizers: | default = "default"] # If enabled, ignore max query length check at Querier select method. Users can # choose to ignore it since the validation can be done before Querier evaluation diff --git a/integration/querier_test.go b/integration/querier_test.go index 6305b4433c5..7e16b587dbb 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/execution/parse" "github.com/cortexproject/cortex/integration/e2e" e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" @@ -416,6 +417,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled), "-querier.query-store-for-labels-enabled": "true", "-querier.thanos-engine": strconv.FormatBool(thanosEngine), + "-querier.enable-x-functions": strconv.FormatBool(thanosEngine), // Ingester. "-ring.store": "consul", "-consul.hostname": consul.NetworkHTTPEndpoint(), @@ -1310,3 +1312,66 @@ func TestQuerierMaxSamplesLimit(t *testing.T) { Error: "query processing would load too many samples into memory in query execution", }) } + +func TestQuerierEngineConfigs(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-querier.thanos-engine": "true", + "-querier.enable-x-functions": "true", + "-querier.optimizers": "all", + }) + + // Start dependencies. + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) + + // Wait until the distributor and querier has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + series1Timestamp := time.Now() + series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "job", Value: "test"}) + series2, _ := generateSeries("series_2", series1Timestamp, prompb.Label{Name: "job", Value: "test"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + for xFunc := range parse.XFunctions { + result, err := c.Query(fmt.Sprintf(`%s(series_1{job="test"}[1m])`, xFunc), series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + } + +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 5f4679b9d4d..32131730383 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "log/slog" "net/http" "runtime" @@ -18,8 +19,6 @@ import ( "github.com/prometheus/prometheus/rules" prom_storage "github.com/prometheus/prometheus/storage" "github.com/thanos-io/objstore" - "github.com/thanos-io/promql-engine/engine" - "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/querysharding" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" @@ -32,6 +31,7 @@ import ( configAPI "github.com/cortexproject/cortex/pkg/configs/api" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" @@ -634,7 +634,6 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) - var queryEngine promql.QueryEngine opts := promql.EngineOpts{ Logger: util_log.SLogger, Reg: rulerRegisterer, @@ -649,15 +648,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds() }, } - if t.Cfg.Querier.ThanosEngine { - queryEngine = engine.New(engine.Opts{ - EngineOpts: opts, - LogicalOptimizers: logicalplan.AllOptimizers, - EnableAnalysis: true, - }) - } else { - queryEngine = promql.NewEngine(opts) - } + queryEngine := engine.New(opts, t.Cfg.Querier.Engine, rulerRegisterer) managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer) manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) diff --git a/pkg/engine/config.go b/pkg/engine/config.go new file mode 100644 index 00000000000..812c6d7f5a9 --- /dev/null +++ b/pkg/engine/config.go @@ -0,0 +1,64 @@ +package engine + +import ( + "flag" + "fmt" + "strings" + + "github.com/thanos-io/promql-engine/logicalplan" +) + +var supportedOptimizers = []string{"default", "all", "propagate-matchers", "sort-matchers", "merge-selects", "detect-histogram-stats"} + +// Config contains the configuration to create engine +type Config struct { + EnableThanosEngine bool `yaml:"enable_thanos_engine"` + EnableXFunctions bool `yaml:"enable_x_functions"` + Optimizers string `yaml:"optimizers"` + LogicalOptimizers []logicalplan.Optimizer `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.EnableThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") + f.BoolVar(&cfg.EnableXFunctions, "querier.enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.") + f.StringVar(&cfg.Optimizers, "querier.optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", ")) +} + +func (cfg *Config) Validate() error { + splitOptimizers := strings.Split(cfg.Optimizers, ",") + + for _, optimizer := range splitOptimizers { + if optimizer == "all" || optimizer == "default" { + if len(splitOptimizers) > 1 { + return fmt.Errorf("special optimizer %s cannot be combined with other optimizers", optimizer) + } + } + optimizers, err := getOptimizer(optimizer) + if err != nil { + return err + } + cfg.LogicalOptimizers = append(cfg.LogicalOptimizers, optimizers...) + } + + return nil +} + +func getOptimizer(name string) ([]logicalplan.Optimizer, error) { + switch name { + case "default": + return logicalplan.DefaultOptimizers, nil + case "all": + return logicalplan.AllOptimizers, nil + case "propagate-matchers": + return []logicalplan.Optimizer{logicalplan.PropagateMatchersOptimizer{}}, nil + case "sort-matchers": + return []logicalplan.Optimizer{logicalplan.SortMatchers{}}, nil + case "merge-selects": + return []logicalplan.Optimizer{logicalplan.MergeSelectsOptimizer{}}, nil + case "detect-histogram-stats": + return []logicalplan.Optimizer{logicalplan.DetectHistogramStatsOptimizer{}}, nil + default: + return nil, fmt.Errorf("unknown optimizer %s", name) + } +} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 2468bcfccbc..f7d45e0e326 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -10,7 +10,6 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" thanosengine "github.com/thanos-io/promql-engine/engine" - "github.com/thanos-io/promql-engine/logicalplan" ) type engineKeyType struct{} @@ -52,15 +51,16 @@ type Engine struct { engineSwitchQueriesTotal *prometheus.CounterVec } -func New(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *Engine { +func New(opts promql.EngineOpts, engineCfg Config, reg prometheus.Registerer) *Engine { prometheusEngine := promql.NewEngine(opts) var thanosEngine *thanosengine.Engine - if enableThanosEngine { + if engineCfg.EnableThanosEngine { thanosEngine = thanosengine.New(thanosengine.Opts{ EngineOpts: opts, - LogicalOptimizers: logicalplan.DefaultOptimizers, + LogicalOptimizers: engineCfg.LogicalOptimizers, EnableAnalysis: true, + EnableXFunctions: engineCfg.EnableXFunctions, }) } diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index facae793edc..092152d0aa1 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -3,6 +3,7 @@ package engine import ( "bytes" "context" + "fmt" "net/http" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/promqltest" "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/execution/parse" utillog "github.com/cortexproject/cortex/pkg/util/log" ) @@ -37,7 +39,7 @@ func TestEngine_Fallback(t *testing.T) { Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), Reg: reg, } - queryEngine := New(opts, true, reg) + queryEngine := New(opts, Config{EnableThanosEngine: true}, reg) // instant query, should go to fallback _, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now) @@ -68,7 +70,7 @@ func TestEngine_Switch(t *testing.T) { Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), Reg: reg, } - queryEngine := New(opts, true, reg) + queryEngine := New(opts, Config{EnableThanosEngine: true}, reg) // Query Prometheus engine r := &http.Request{Header: http.Header{}} @@ -96,3 +98,28 @@ func TestEngine_Switch(t *testing.T) { cortex_engine_switch_queries_total{engine_type="thanos"} 2 `), "cortex_engine_switch_queries_total")) } + +func TestEngine_XFunctions(t *testing.T) { + ctx := context.Background() + reg := prometheus.NewRegistry() + + now := time.Now() + start := time.Now().Add(-time.Minute * 5) + step := time.Minute + queryable := promqltest.LoadedStorage(t, "") + opts := promql.EngineOpts{ + Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), + Reg: reg, + } + queryEngine := New(opts, Config{EnableThanosEngine: true, EnableXFunctions: true}, reg) + + for name := range parse.XFunctions { + t.Run(name, func(t *testing.T) { + _, err := queryEngine.NewInstantQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), now) + require.NoError(t, err) + + _, err = queryEngine.NewRangeQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), start, now, step) + require.NoError(t, err) + }) + } +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b1a94f1d40f..1778125453d 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -85,9 +85,7 @@ type Config struct { ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"` - // Experimental. Use https://github.com/thanos-io/promql-engine rather than - // the Prometheus query engine. - ThanosEngine bool `yaml:"thanos_engine"` + Engine engine.Config `yaml:"thanos_engine"` // Ignore max query length check at Querier. IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"` @@ -139,13 +137,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).") f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") - f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.") f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") + + cfg.Engine.RegisterFlags(f) } // Validate the config @@ -181,6 +180,10 @@ func (cfg *Config) Validate() error { } } + if err := cfg.Engine.Validate(); err != nil { + return err + } + return nil } @@ -246,7 +249,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor return cfg.DefaultEvaluationInterval.Milliseconds() }, } - queryEngine := engine.New(opts, cfg.ThanosEngine, reg) + queryEngine := engine.New(opts, cfg.Engine, reg) return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, queryEngine } From 4fcd0ec87a738cf124ad9c42a2f943bf0ba852df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Mon, 14 Jul 2025 11:13:26 -0700 Subject: [PATCH 2/2] Address comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- docs/blocks-storage/querier.md | 2 +- docs/configuration/config-file-reference.md | 19 ++++++++++++++- pkg/configs/userconfig/config.go | 2 +- pkg/configs/userconfig/config_test.go | 2 +- pkg/cortex/cortex.go | 7 ++++++ pkg/cortex/modules.go | 3 +-- pkg/engine/config.go | 23 +++++++++---------- pkg/engine/engine.go | 8 +++---- pkg/engine/engine_test.go | 6 ++--- pkg/parser/parser.go | 21 +++++++++++++++++ pkg/querier/batch/batch_test.go | 2 +- pkg/querier/querier.go | 14 ++++------- .../tripperware/instantquery/limits.go | 4 ++-- .../tripperware/instantquery/limits_test.go | 4 ++-- pkg/querier/tripperware/merge.go | 3 ++- .../tripperware/query_attribute_matcher.go | 4 ++-- pkg/querier/tripperware/queryrange/limits.go | 4 ++-- .../tripperware/queryrange/limits_test.go | 2 +- .../query_range_middlewares_test.go | 1 - .../tripperware/queryrange/results_cache.go | 5 ++-- .../queryrange/split_by_interval.go | 5 ++-- .../queryrange/split_by_interval_test.go | 9 ++++---- pkg/querier/tripperware/roundtrip.go | 6 ----- pkg/querier/tripperware/roundtrip_test.go | 1 - pkg/querier/tripperware/subquery.go | 4 +++- .../tripperware/test_shard_by_query_utils.go | 3 ++- pkg/querysharding/util.go | 4 +++- pkg/ruler/compat.go | 4 ++-- pkg/ruler/ruler.go | 12 +++++++++- pkg/util/promql/promql_test.go | 5 ++-- pkg/util/time_test.go | 4 ++-- 31 files changed, 121 insertions(+), 72 deletions(-) create mode 100644 pkg/parser/parser.go diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index e646b2aff0e..04d74307420 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -257,7 +257,7 @@ querier: # https://github.com/thanos-io/promql-engine rather than the Prometheus # promql engine. # CLI flag: -querier.thanos-engine - [enable_thanos_engine: | default = false] + [enabled: | default = false] # Enable xincrease, xdelta, xrate etc from Thanos engine. # CLI flag: -querier.enable-x-functions diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 60079d25639..0ce98cb65af 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4307,7 +4307,7 @@ thanos_engine: # https://github.com/thanos-io/promql-engine rather than the Prometheus promql # engine. # CLI flag: -querier.thanos-engine - [enable_thanos_engine: | default = false] + [enabled: | default = false] # Enable xincrease, xdelta, xrate etc from Thanos engine. # CLI flag: -querier.enable-x-functions @@ -5034,6 +5034,23 @@ ring: # ruler.enable-ha-evaluation is true. # CLI flag: -ruler.liveness-check-timeout [liveness_check_timeout: | default = 1s] + +thanos_engine: + # Experimental. Use Thanos promql engine + # https://github.com/thanos-io/promql-engine rather than the Prometheus promql + # engine. + # CLI flag: -ruler.thanos-engine + [enabled: | default = false] + + # Enable xincrease, xdelta, xrate etc from Thanos engine. + # CLI flag: -ruler.enable-x-functions + [enable_x_functions: | default = false] + + # Logical plan optimizers. Multiple optimizers can be provided as a + # comma-separated list. Supported values: default, all, propagate-matchers, + # sort-matchers, merge-selects, detect-histogram-stats + # CLI flag: -ruler.optimizers + [optimizers: | default = "default"] ``` ### `ruler_storage_config` diff --git a/pkg/configs/userconfig/config.go b/pkg/configs/userconfig/config.go index 3900ff9f41b..25e7d39b38b 100644 --- a/pkg/configs/userconfig/config.go +++ b/pkg/configs/userconfig/config.go @@ -9,10 +9,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "gopkg.in/yaml.v3" + "github.com/cortexproject/cortex/pkg/parser" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/configs/userconfig/config_test.go b/pkg/configs/userconfig/config_test.go index 06893bd605a..392ca911ca9 100644 --- a/pkg/configs/userconfig/config_test.go +++ b/pkg/configs/userconfig/config_test.go @@ -12,12 +12,12 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" + "github.com/cortexproject/cortex/pkg/parser" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index b141adc8127..379501db0e6 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -379,6 +379,7 @@ func New(cfg Config) (*Cortex, error) { return nil, err } + cortex.setupPromQLFunctions() return cortex, nil } @@ -537,3 +538,9 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc { util.WriteTextResponse(w, "ready") } } + +func (t *Cortex) setupPromQLFunctions() { + // The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (https://github.com/prometheus/prometheus/pull/14930) + // The cortex supports holt_winters for users using this function. + querier.EnableExperimentalPromQLFunctions(t.Cfg.Querier.EnablePromQLExperimentalFunctions, true) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 32131730383..b2624f4af41 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -555,7 +555,6 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro t.Cfg.Querier.DefaultEvaluationInterval, t.Cfg.Querier.MaxSubQuerySteps, t.Cfg.Querier.LookbackDelta, - t.Cfg.Querier.EnablePromQLExperimentalFunctions, ) return services.NewIdleService(nil, func(_ error) error { @@ -648,7 +647,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds() }, } - queryEngine := engine.New(opts, t.Cfg.Querier.Engine, rulerRegisterer) + queryEngine := engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer) managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer) manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) diff --git a/pkg/engine/config.go b/pkg/engine/config.go index 812c6d7f5a9..e964196bb0e 100644 --- a/pkg/engine/config.go +++ b/pkg/engine/config.go @@ -10,22 +10,21 @@ import ( var supportedOptimizers = []string{"default", "all", "propagate-matchers", "sort-matchers", "merge-selects", "detect-histogram-stats"} -// Config contains the configuration to create engine -type Config struct { - EnableThanosEngine bool `yaml:"enable_thanos_engine"` - EnableXFunctions bool `yaml:"enable_x_functions"` - Optimizers string `yaml:"optimizers"` - LogicalOptimizers []logicalplan.Optimizer `yaml:"-"` +// ThanosEngineConfig contains the configuration to create engine +type ThanosEngineConfig struct { + Enabled bool `yaml:"enabled"` + EnableXFunctions bool `yaml:"enable_x_functions"` + Optimizers string `yaml:"optimizers"` + LogicalOptimizers []logicalplan.Optimizer `yaml:"-"` } -// RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.BoolVar(&cfg.EnableThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") - f.BoolVar(&cfg.EnableXFunctions, "querier.enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.") - f.StringVar(&cfg.Optimizers, "querier.optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", ")) +func (cfg *ThanosEngineConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, prefix+"thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") + f.BoolVar(&cfg.EnableXFunctions, prefix+"enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.") + f.StringVar(&cfg.Optimizers, prefix+"optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", ")) } -func (cfg *Config) Validate() error { +func (cfg *ThanosEngineConfig) Validate() error { splitOptimizers := strings.Split(cfg.Optimizers, ",") for _, optimizer := range splitOptimizers { diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index f7d45e0e326..be22e4573ad 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -51,16 +51,16 @@ type Engine struct { engineSwitchQueriesTotal *prometheus.CounterVec } -func New(opts promql.EngineOpts, engineCfg Config, reg prometheus.Registerer) *Engine { +func New(opts promql.EngineOpts, thanosEngineCfg ThanosEngineConfig, reg prometheus.Registerer) *Engine { prometheusEngine := promql.NewEngine(opts) var thanosEngine *thanosengine.Engine - if engineCfg.EnableThanosEngine { + if thanosEngineCfg.Enabled { thanosEngine = thanosengine.New(thanosengine.Opts{ EngineOpts: opts, - LogicalOptimizers: engineCfg.LogicalOptimizers, + LogicalOptimizers: thanosEngineCfg.LogicalOptimizers, EnableAnalysis: true, - EnableXFunctions: engineCfg.EnableXFunctions, + EnableXFunctions: thanosEngineCfg.EnableXFunctions, }) } diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index 092152d0aa1..7b270e6604a 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -39,7 +39,7 @@ func TestEngine_Fallback(t *testing.T) { Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), Reg: reg, } - queryEngine := New(opts, Config{EnableThanosEngine: true}, reg) + queryEngine := New(opts, ThanosEngineConfig{Enabled: true}, reg) // instant query, should go to fallback _, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now) @@ -70,7 +70,7 @@ func TestEngine_Switch(t *testing.T) { Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), Reg: reg, } - queryEngine := New(opts, Config{EnableThanosEngine: true}, reg) + queryEngine := New(opts, ThanosEngineConfig{Enabled: true}, reg) // Query Prometheus engine r := &http.Request{Header: http.Header{}} @@ -111,7 +111,7 @@ func TestEngine_XFunctions(t *testing.T) { Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), Reg: reg, } - queryEngine := New(opts, Config{EnableThanosEngine: true, EnableXFunctions: true}, reg) + queryEngine := New(opts, ThanosEngineConfig{Enabled: true, EnableXFunctions: true}, reg) for name := range parse.XFunctions { t.Run(name, func(t *testing.T) { diff --git a/pkg/parser/parser.go b/pkg/parser/parser.go new file mode 100644 index 00000000000..07940c6aaaa --- /dev/null +++ b/pkg/parser/parser.go @@ -0,0 +1,21 @@ +package parser + +import ( + "maps" + + promqlparser "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/promql-engine/execution/parse" +) + +var functions = buildFunctions() + +func buildFunctions() map[string]*promqlparser.Function { + fns := make(map[string]*promqlparser.Function, len(promqlparser.Functions)) + maps.Copy(fns, promqlparser.Functions) + maps.Copy(fns, parse.XFunctions) + return fns +} + +func ParseExpr(qs string) (promqlparser.Expr, error) { + return promqlparser.NewParser(qs, promqlparser.WithFunctions(functions)).ParseExpr() +} diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index 4f4b57bfe4d..3f6d13aba19 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -149,7 +149,7 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) { require.Equal(t, histograms[0], val) require.Equal(t, int64(1*time.Second/time.Millisecond), actual) - // Histogram chunk should support querying float histograms since it is what Query Engine does. + // Histogram chunk should support querying float histograms since it is what Query ThanosEngine does. actualT, fh := sut.AtFloatHistogram(nil) require.Equal(t, histograms[0].ToFloat(nil), fh) require.Equal(t, int64(1*time.Second/time.Millisecond), actualT) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 1778125453d..ffe6c2e0b50 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -85,7 +85,7 @@ type Config struct { ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"` - Engine engine.Config `yaml:"thanos_engine"` + ThanosEngine engine.ThanosEngineConfig `yaml:"thanos_engine"` // Ignore max query length check at Querier. IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"` @@ -109,6 +109,8 @@ var ( // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.ThanosEngine.RegisterFlagsWithPrefix("querier.", f) + //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods flagext.DeprecatedFlag(f, "querier.ingester-streaming", "Deprecated: Use streaming RPCs to query ingester. QueryStream is always enabled and the flag is not effective anymore.", util_log.Logger) //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods @@ -143,8 +145,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") - - cfg.Engine.RegisterFlags(f) } // Validate the config @@ -180,7 +180,7 @@ func (cfg *Config) Validate() error { } } - if err := cfg.Engine.Validate(); err != nil { + if err := cfg.ThanosEngine.Validate(); err != nil { return err } @@ -231,10 +231,6 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor }) maxConcurrentMetric.Set(float64(cfg.MaxConcurrent)) - // The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (https://github.com/prometheus/prometheus/pull/14930) - // The cortex supports holt_winters for users using this function. - EnableExperimentalPromQLFunctions(cfg.EnablePromQLExperimentalFunctions, true) - opts := promql.EngineOpts{ Logger: util_log.GoKitLogToSlog(logger), Reg: reg, @@ -249,7 +245,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor return cfg.DefaultEvaluationInterval.Milliseconds() }, } - queryEngine := engine.New(opts, cfg.Engine, reg) + queryEngine := engine.New(opts, cfg.ThanosEngine, reg) return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, queryEngine } diff --git a/pkg/querier/tripperware/instantquery/limits.go b/pkg/querier/tripperware/instantquery/limits.go index 5c9514b957b..477fe4c36f4 100644 --- a/pkg/querier/tripperware/instantquery/limits.go +++ b/pkg/querier/tripperware/instantquery/limits.go @@ -5,9 +5,9 @@ import ( "net/http" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/promql" @@ -45,7 +45,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe // Enforce the max query length. if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 { - expr, err := parser.ParseExpr(r.GetQuery()) + expr, err := cortexparser.ParseExpr(r.GetQuery()) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } diff --git a/pkg/querier/tripperware/instantquery/limits_test.go b/pkg/querier/tripperware/instantquery/limits_test.go index 74f5c7c4d86..a365eab414c 100644 --- a/pkg/querier/tripperware/instantquery/limits_test.go +++ b/pkg/querier/tripperware/instantquery/limits_test.go @@ -6,13 +6,13 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -24,7 +24,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { ) wrongQuery := `up[` - _, parserErr := parser.ParseExpr(wrongQuery) + _, parserErr := cortexparser.ParseExpr(wrongQuery) tests := map[string]struct { maxQueryLength time.Duration diff --git a/pkg/querier/tripperware/merge.go b/pkg/querier/tripperware/merge.go index 0fd385cecf2..0e3d8aabb4b 100644 --- a/pkg/querier/tripperware/merge.go +++ b/pkg/querier/tripperware/merge.go @@ -10,6 +10,7 @@ import ( "github.com/thanos-io/thanos/pkg/strutil" "github.com/cortexproject/cortex/pkg/cortexpb" + cortexparser "github.com/cortexproject/cortex/pkg/parser" ) const StatusSuccess = "success" @@ -284,7 +285,7 @@ func getSortValueFromPair(samples []*pair, i int) float64 { } func sortPlanForQuery(q string) (sortPlan, error) { - expr, err := promqlparser.ParseExpr(q) + expr, err := cortexparser.ParseExpr(q) if err != nil { return 0, err } diff --git a/pkg/querier/tripperware/query_attribute_matcher.go b/pkg/querier/tripperware/query_attribute_matcher.go index 7edd9f0b098..002568b7a4e 100644 --- a/pkg/querier/tripperware/query_attribute_matcher.go +++ b/pkg/querier/tripperware/query_attribute_matcher.go @@ -6,9 +6,9 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/validation" @@ -24,7 +24,7 @@ func rejectQueryOrSetPriority(r *http.Request, now time.Time, lookbackDelta time if op == "query" || op == "query_range" { query := r.FormValue("query") - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } diff --git a/pkg/querier/tripperware/queryrange/limits.go b/pkg/querier/tripperware/queryrange/limits.go index 817fcf50834..7b1f17b55a9 100644 --- a/pkg/querier/tripperware/queryrange/limits.go +++ b/pkg/querier/tripperware/queryrange/limits.go @@ -7,9 +7,9 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/timestamp" - "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" @@ -82,7 +82,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength) } - expr, err := parser.ParseExpr(r.GetQuery()) + expr, err := cortexparser.ParseExpr(r.GetQuery()) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index 9a7668b9812..3690e1e0386 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -6,13 +6,13 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/validation" diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index b5cdef60386..ed0c00190c8 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -83,7 +83,6 @@ func TestRoundTrip(t *testing.T) { time.Minute, 0, 0, - false, ) for i, tc := range []struct { diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index 4ae249efc89..db6d2f284f5 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -27,6 +27,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/cortexpb" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/partialdata" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" @@ -326,7 +327,7 @@ func (s resultsCache) isAtModifierCachable(ctx context.Context, r tripperware.Re if !strings.Contains(query, "@") { return true } - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { // We are being pessimistic in such cases. level.Warn(util_log.WithContext(ctx, s.logger)).Log("msg", "failed to parse query, considering @ modifier as not cacheable", "query", query, "err", err) @@ -371,7 +372,7 @@ func (s resultsCache) isOffsetCachable(ctx context.Context, r tripperware.Reques if !strings.Contains(query, "offset") { return true } - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { level.Warn(util_log.WithContext(ctx, s.logger)).Log("msg", "failed to parse query, considering offset as not cacheable", "query", query, "err", err) return false diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index 6ff1f1a15e6..980d2867a87 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -11,6 +11,7 @@ import ( "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" + cortexparser "github.com/cortexproject/cortex/pkg/parser" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" @@ -111,7 +112,7 @@ func splitQuery(r tripperware.Request, interval time.Duration) ([]tripperware.Re // For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00` // If the modifier is already a constant, it will be returned as is. func evaluateAtModifierFunction(query string, start, end int64) (string, error) { - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { return "", httpgrpc.Errorf(http.StatusBadRequest, "%s", err) } @@ -167,7 +168,7 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer return ctx, baseInterval, nil } - queryExpr, err := parser.ParseExpr(r.GetQuery()) + queryExpr, err := cortexparser.ParseExpr(r.GetQuery()) if err != nil { return ctx, baseInterval, err } diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index c854959be03..3934574fcb0 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -10,15 +10,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" - - "github.com/prometheus/prometheus/promql/parser" - "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "go.uber.org/atomic" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -434,7 +433,7 @@ func Test_evaluateAtModifier(t *testing.T) { require.Equal(t, tt.expectedErrorCode, int(httpResp.Code)) } else { require.NoError(t, err) - expectedExpr, err := parser.ParseExpr(tt.expected) + expectedExpr, err := cortexparser.ParseExpr(tt.expected) require.NoError(t, err) require.Equal(t, expectedExpr.String(), out) } @@ -1044,7 +1043,7 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - expr, err := parser.ParseExpr(tc.req.GetQuery()) + expr, err := cortexparser.ParseExpr(tc.req.GetQuery()) require.Nil(t, err) durationFetchedByRange, durationFetchedBySelectors := analyzeDurationFetchedByQueryExpr(expr, tc.req.GetStart(), tc.req.GetEnd(), tc.baseSplitInterval, tc.lookbackDelta) require.Equal(t, tc.expectedDurationFetchedByRange, durationFetchedByRange) diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index b9be569d6d9..144bb04da36 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -31,7 +31,6 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" - "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" @@ -117,13 +116,8 @@ func NewQueryTripperware( defaultSubQueryInterval time.Duration, maxSubQuerySteps int64, lookbackDelta time.Duration, - enablePromQLExperimentalFunctions bool, ) Tripperware { - // The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (https://github.com/prometheus/prometheus/pull/14930) - // The cortex supports holt_winters for users using this function. - querier.EnableExperimentalPromQLFunctions(enablePromQLExperimentalFunctions, true) - // Per tenant query metrics. queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_frontend_queries_total", diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 0ef5e1519d5..8649147121d 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -347,7 +347,6 @@ cortex_query_frontend_queries_total{op="query", source="api", user="1"} 1 time.Minute, tc.maxSubQuerySteps, 0, - false, ) resp, err := tw(downstream).RoundTrip(req) if tc.expectedErr == nil { diff --git a/pkg/querier/tripperware/subquery.go b/pkg/querier/tripperware/subquery.go index cebce45f261..2226192a89a 100644 --- a/pkg/querier/tripperware/subquery.go +++ b/pkg/querier/tripperware/subquery.go @@ -6,6 +6,8 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" + + cortexparser "github.com/cortexproject/cortex/pkg/parser" ) var ( @@ -18,7 +20,7 @@ const ( // SubQueryStepSizeCheck ensures the query doesn't contain too small step size in subqueries. func SubQueryStepSizeCheck(query string, defaultSubQueryInterval time.Duration, maxStep int64) error { - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { // If query fails to parse, we don't throw step size error // but fail query later on querier. diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index bb13f3e1438..db2b61d238a 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/user" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -413,7 +414,7 @@ http_requests_total`, s := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { q := r.FormValue("query") - expr, _ := parser.ParseExpr(q) + expr, _ := cortexparser.ParseExpr(q) shardIndex := int64(0) parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error { diff --git a/pkg/querysharding/util.go b/pkg/querysharding/util.go index 3f2a2d82432..2b438ce275e 100644 --- a/pkg/querysharding/util.go +++ b/pkg/querysharding/util.go @@ -7,6 +7,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/thanos/pkg/store/storepb" + + cortexparser "github.com/cortexproject/cortex/pkg/parser" ) const ( @@ -21,7 +23,7 @@ var ( ) func InjectShardingInfo(query string, shardInfo *storepb.ShardInfo) (string, error) { - expr, err := parser.ParseExpr(query) + expr, err := cortexparser.ParseExpr(query) if err != nil { return "", err } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 862bcc54706..c8d8302e27a 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -15,13 +15,13 @@ import ( "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/cortexpb" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring/client" @@ -171,7 +171,7 @@ func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, // Enforce the max query length. maxQueryLength := overrides.MaxQueryLength(userID) if maxQueryLength > 0 { - expr, err := parser.ParseExpr(qs) + expr, err := cortexparser.ParseExpr(qs) // If failed to parse expression, skip checking select range. // Fail the query in the engine. if err == nil { diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 22d475fe720..70c07233f41 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -29,6 +29,8 @@ import ( "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/engine" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ruler/rulespb" @@ -172,6 +174,8 @@ type Config struct { EnableHAEvaluation bool `yaml:"enable_ha_evaluation"` LivenessCheckTimeout time.Duration `yaml:"liveness_check_timeout"` + + ThanosEngine engine.ThanosEngineConfig `yaml:"thanos_engine"` } // Validate config and returns error on failure @@ -199,6 +203,11 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error { if !util.StringsContain(supportedQueryResponseFormats, cfg.QueryResponseFormat) { return errInvalidQueryResponseFormat } + + if err := cfg.ThanosEngine.Validate(); err != nil { + return err + } + return nil } @@ -208,6 +217,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.frontendClient", "", f) cfg.Ring.RegisterFlags(f) cfg.Notifier.RegisterFlags(f) + cfg.ThanosEngine.RegisterFlagsWithPrefix("ruler.", f) // Deprecated Flags that will be maintained to avoid user disruption @@ -1278,7 +1288,7 @@ func (r *Ruler) ruleGroupListToGroupStateDesc(userID string, backupGroups rulesp } var ruleDesc *RuleStateDesc - query, err := parser.ParseExpr(r.GetExpr()) + query, err := cortexparser.ParseExpr(r.GetExpr()) if err != nil { return nil, errors.Errorf("failed to parse rule query '%v'", r.GetExpr()) } diff --git a/pkg/util/promql/promql_test.go b/pkg/util/promql/promql_test.go index ed35b89c2e3..284bd09ca6c 100644 --- a/pkg/util/promql/promql_test.go +++ b/pkg/util/promql/promql_test.go @@ -4,8 +4,9 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" + + cortexparser "github.com/cortexproject/cortex/pkg/parser" ) func TestFindNonOverlapQueryLength(t *testing.T) { @@ -78,7 +79,7 @@ func TestFindNonOverlapQueryLength(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - expr, err := parser.ParseExpr(tc.query) + expr, err := cortexparser.ParseExpr(tc.query) require.NoError(t, err) duration := FindNonOverlapQueryLength(expr, 0, 0, time.Minute*5) require.Equal(t, tc.expectedLength, duration) diff --git a/pkg/util/time_test.go b/pkg/util/time_test.go index 239c4eb5b0b..6bdeb231938 100644 --- a/pkg/util/time_test.go +++ b/pkg/util/time_test.go @@ -8,11 +8,11 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" + cortexparser "github.com/cortexproject/cortex/pkg/parser" "github.com/cortexproject/cortex/pkg/util/test" ) @@ -178,7 +178,7 @@ func TestFindMinMaxTime(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - expr, _ := parser.ParseExpr(testData.query) + expr, _ := cortexparser.ParseExpr(testData.query) url := "/query_range?query=" + testData.query + "&start=" + strconv.FormatInt(testData.queryStartTime.Truncate(time.Minute).Unix(), 10) +