Skip to content

Commit 9052bee

Browse files
committed
Querier: Support configuring optimizers and XFunctions
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 4c0957a commit 9052bee

File tree

9 files changed

+206
-33
lines changed

9 files changed

+206
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1919
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
2020
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
21+
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
2122
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
2223
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859
2324
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715

docs/blocks-storage/querier.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,22 @@ querier:
252252
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
253253
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
254254

255-
# Experimental. Use Thanos promql engine
256-
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
257-
# engine.
258-
# CLI flag: -querier.thanos-engine
259-
[thanos_engine: <boolean> | default = false]
255+
thanos_engine:
256+
# Experimental. Use Thanos promql engine
257+
# https://github.com/thanos-io/promql-engine rather than the Prometheus
258+
# promql engine.
259+
# CLI flag: -querier.thanos-engine
260+
[enable_thanos_engine: <boolean> | default = false]
261+
262+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
263+
# CLI flag: -querier.enable-x-functions
264+
[enable_x_functions: <boolean> | default = false]
265+
266+
# Logical plan optimizers. Multiple optimizers can be provided as a
267+
# comma-separated list. Supported values: default, all, propagate-matchers,
268+
# sort-matchers, merge-selects, detect-histogram-stats
269+
# CLI flag: -querier.optimizers
270+
[optimizers: <string> | default = "default"]
260271

261272
# If enabled, ignore max query length check at Querier select method. Users
262273
# can choose to ignore it since the validation can be done before Querier

docs/configuration/config-file-reference.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4302,11 +4302,22 @@ store_gateway_client:
43024302
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
43034303
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
43044304

4305-
# Experimental. Use Thanos promql engine
4306-
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
4307-
# engine.
4308-
# CLI flag: -querier.thanos-engine
4309-
[thanos_engine: <boolean> | default = false]
4305+
thanos_engine:
4306+
# Experimental. Use Thanos promql engine
4307+
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
4308+
# engine.
4309+
# CLI flag: -querier.thanos-engine
4310+
[enable_thanos_engine: <boolean> | default = false]
4311+
4312+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
4313+
# CLI flag: -querier.enable-x-functions
4314+
[enable_x_functions: <boolean> | default = false]
4315+
4316+
# Logical plan optimizers. Multiple optimizers can be provided as a
4317+
# comma-separated list. Supported values: default, all, propagate-matchers,
4318+
# sort-matchers, merge-selects, detect-histogram-stats
4319+
# CLI flag: -querier.optimizers
4320+
[optimizers: <string> | default = "default"]
43104321

43114322
# If enabled, ignore max query length check at Querier select method. Users can
43124323
# choose to ignore it since the validation can be done before Querier evaluation

integration/querier_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/prometheus/prometheus/prompb"
2020
"github.com/stretchr/testify/assert"
2121
"github.com/stretchr/testify/require"
22+
"github.com/thanos-io/promql-engine/execution/parse"
2223

2324
"github.com/cortexproject/cortex/integration/e2e"
2425
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
@@ -416,6 +417,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
416417
"-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled),
417418
"-querier.query-store-for-labels-enabled": "true",
418419
"-querier.thanos-engine": strconv.FormatBool(thanosEngine),
420+
"-querier.enable-x-functions": strconv.FormatBool(thanosEngine),
419421
// Ingester.
420422
"-ring.store": "consul",
421423
"-consul.hostname": consul.NetworkHTTPEndpoint(),
@@ -1310,3 +1312,66 @@ func TestQuerierMaxSamplesLimit(t *testing.T) {
13101312
Error: "query processing would load too many samples into memory in query execution",
13111313
})
13121314
}
1315+
1316+
func TestQuerierEngineConfigs(t *testing.T) {
1317+
const blockRangePeriod = 5 * time.Second
1318+
1319+
s, err := e2e.NewScenario(networkName)
1320+
require.NoError(t, err)
1321+
defer s.Close()
1322+
1323+
// Configure the blocks storage to frequently compact TSDB head
1324+
// and ship blocks to the storage.
1325+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
1326+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
1327+
"-blocks-storage.tsdb.ship-interval": "1s",
1328+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
1329+
"-querier.thanos-engine": "true",
1330+
"-querier.enable-x-functions": "true",
1331+
"-querier.optimizers": "all",
1332+
})
1333+
1334+
// Start dependencies.
1335+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
1336+
consul := e2edb.NewConsul()
1337+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1338+
1339+
// Start Cortex components for the write path.
1340+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1341+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1342+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1343+
1344+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
1345+
require.NoError(t, s.Start(queryFrontend))
1346+
1347+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1348+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1349+
}), "")
1350+
require.NoError(t, s.StartAndWaitReady(querier))
1351+
1352+
// Wait until the distributor and querier has updated the ring.
1353+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1354+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1355+
1356+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
1357+
require.NoError(t, err)
1358+
1359+
// Push some series to Cortex.
1360+
series1Timestamp := time.Now()
1361+
series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
1362+
series2, _ := generateSeries("series_2", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
1363+
1364+
res, err := c.Push(series1)
1365+
require.NoError(t, err)
1366+
require.Equal(t, 200, res.StatusCode)
1367+
res, err = c.Push(series2)
1368+
require.NoError(t, err)
1369+
require.Equal(t, 200, res.StatusCode)
1370+
1371+
for xFunc := range parse.XFunctions {
1372+
result, err := c.Query(fmt.Sprintf(`%s({job="test"}[1m])`, xFunc), series1Timestamp)
1373+
require.NoError(t, err)
1374+
require.Equal(t, model.ValVector, result.Type())
1375+
}
1376+
1377+
}

pkg/cortex/modules.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
78
"log/slog"
89
"net/http"
910
"runtime"
@@ -18,8 +19,6 @@ import (
1819
"github.com/prometheus/prometheus/rules"
1920
prom_storage "github.com/prometheus/prometheus/storage"
2021
"github.com/thanos-io/objstore"
21-
"github.com/thanos-io/promql-engine/engine"
22-
"github.com/thanos-io/promql-engine/logicalplan"
2322
"github.com/thanos-io/thanos/pkg/discovery/dns"
2423
"github.com/thanos-io/thanos/pkg/querysharding"
2524
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
@@ -32,6 +31,7 @@ import (
3231
configAPI "github.com/cortexproject/cortex/pkg/configs/api"
3332
"github.com/cortexproject/cortex/pkg/configs/db"
3433
"github.com/cortexproject/cortex/pkg/distributor"
34+
"github.com/cortexproject/cortex/pkg/engine"
3535
"github.com/cortexproject/cortex/pkg/flusher"
3636
"github.com/cortexproject/cortex/pkg/frontend"
3737
"github.com/cortexproject/cortex/pkg/frontend/transport"
@@ -634,7 +634,6 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
634634
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
635635
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
636636

637-
var queryEngine promql.QueryEngine
638637
opts := promql.EngineOpts{
639638
Logger: util_log.SLogger,
640639
Reg: rulerRegisterer,
@@ -649,15 +648,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
649648
return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds()
650649
},
651650
}
652-
if t.Cfg.Querier.ThanosEngine {
653-
queryEngine = engine.New(engine.Opts{
654-
EngineOpts: opts,
655-
LogicalOptimizers: logicalplan.AllOptimizers,
656-
EnableAnalysis: true,
657-
})
658-
} else {
659-
queryEngine = promql.NewEngine(opts)
660-
}
651+
queryEngine := engine.New(opts, t.Cfg.Querier.Engine, rulerRegisterer)
661652

662653
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
663654
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)

pkg/engine/config.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package engine
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/thanos-io/promql-engine/logicalplan"
9+
)
10+
11+
var supportedOptimizers = []string{"default", "all", "propagate-matchers", "sort-matchers", "merge-selects", "detect-histogram-stats"}
12+
13+
// Config contains the configuration to create engine
14+
type Config struct {
15+
EnableThanosEngine bool `yaml:"enable_thanos_engine"`
16+
EnableXFunctions bool `yaml:"enable_x_functions"`
17+
Optimizers string `yaml:"optimizers"`
18+
LogicalOptimizers []logicalplan.Optimizer `yaml:"-"`
19+
}
20+
21+
// RegisterFlags adds the flags required to config this to the given FlagSet.
22+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
23+
f.BoolVar(&cfg.EnableThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
24+
f.BoolVar(&cfg.EnableXFunctions, "querier.enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.")
25+
f.StringVar(&cfg.Optimizers, "querier.optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", "))
26+
}
27+
28+
func (cfg *Config) Validate() error {
29+
splitOptimizers := strings.Split(cfg.Optimizers, ",")
30+
31+
for _, optimizer := range splitOptimizers {
32+
if optimizer == "all" || optimizer == "default" {
33+
if len(splitOptimizers) > 1 {
34+
return fmt.Errorf("special optimizer %s cannot be combined with other optimizers", optimizer)
35+
}
36+
}
37+
optimizers, err := getOptimizer(optimizer)
38+
if err != nil {
39+
return err
40+
}
41+
cfg.LogicalOptimizers = append(cfg.LogicalOptimizers, optimizers...)
42+
}
43+
44+
return nil
45+
}
46+
47+
func getOptimizer(name string) ([]logicalplan.Optimizer, error) {
48+
switch name {
49+
case "default":
50+
return logicalplan.DefaultOptimizers, nil
51+
case "all":
52+
return logicalplan.AllOptimizers, nil
53+
case "propagate-matchers":
54+
return []logicalplan.Optimizer{logicalplan.PropagateMatchersOptimizer{}}, nil
55+
case "sort-matchers":
56+
return []logicalplan.Optimizer{logicalplan.SortMatchers{}}, nil
57+
case "merge-selects":
58+
return []logicalplan.Optimizer{logicalplan.MergeSelectsOptimizer{}}, nil
59+
case "detect-histogram-stats":
60+
return []logicalplan.Optimizer{logicalplan.DetectHistogramStatsOptimizer{}}, nil
61+
default:
62+
return nil, fmt.Errorf("unknown optimizer %s", name)
63+
}
64+
}

pkg/engine/engine.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/prometheus/prometheus/promql"
1111
"github.com/prometheus/prometheus/storage"
1212
thanosengine "github.com/thanos-io/promql-engine/engine"
13-
"github.com/thanos-io/promql-engine/logicalplan"
1413
)
1514

1615
type engineKeyType struct{}
@@ -52,15 +51,16 @@ type Engine struct {
5251
engineSwitchQueriesTotal *prometheus.CounterVec
5352
}
5453

55-
func New(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *Engine {
54+
func New(opts promql.EngineOpts, engineCfg Config, reg prometheus.Registerer) *Engine {
5655
prometheusEngine := promql.NewEngine(opts)
5756

5857
var thanosEngine *thanosengine.Engine
59-
if enableThanosEngine {
58+
if engineCfg.EnableThanosEngine {
6059
thanosEngine = thanosengine.New(thanosengine.Opts{
6160
EngineOpts: opts,
62-
LogicalOptimizers: logicalplan.DefaultOptimizers,
61+
LogicalOptimizers: engineCfg.LogicalOptimizers,
6362
EnableAnalysis: true,
63+
EnableXFunctions: engineCfg.EnableXFunctions,
6464
})
6565
}
6666

pkg/engine/engine_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package engine
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"net/http"
78
"testing"
89
"time"
@@ -14,6 +15,7 @@ import (
1415
"github.com/prometheus/prometheus/promql/parser"
1516
"github.com/prometheus/prometheus/promql/promqltest"
1617
"github.com/stretchr/testify/require"
18+
"github.com/thanos-io/promql-engine/execution/parse"
1719

1820
utillog "github.com/cortexproject/cortex/pkg/util/log"
1921
)
@@ -37,7 +39,7 @@ func TestEngine_Fallback(t *testing.T) {
3739
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
3840
Reg: reg,
3941
}
40-
queryEngine := New(opts, true, reg)
42+
queryEngine := New(opts, Config{EnableThanosEngine: true}, reg)
4143

4244
// instant query, should go to fallback
4345
_, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now)
@@ -68,7 +70,7 @@ func TestEngine_Switch(t *testing.T) {
6870
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
6971
Reg: reg,
7072
}
71-
queryEngine := New(opts, true, reg)
73+
queryEngine := New(opts, Config{EnableThanosEngine: true}, reg)
7274

7375
// Query Prometheus engine
7476
r := &http.Request{Header: http.Header{}}
@@ -96,3 +98,28 @@ func TestEngine_Switch(t *testing.T) {
9698
cortex_engine_switch_queries_total{engine_type="thanos"} 2
9799
`), "cortex_engine_switch_queries_total"))
98100
}
101+
102+
func TestEngine_XFunctions(t *testing.T) {
103+
ctx := context.Background()
104+
reg := prometheus.NewRegistry()
105+
106+
now := time.Now()
107+
start := time.Now().Add(-time.Minute * 5)
108+
step := time.Minute
109+
queryable := promqltest.LoadedStorage(t, "")
110+
opts := promql.EngineOpts{
111+
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
112+
Reg: reg,
113+
}
114+
queryEngine := New(opts, Config{EnableThanosEngine: true, EnableXFunctions: true}, reg)
115+
116+
for name := range parse.XFunctions {
117+
t.Run(name, func(t *testing.T) {
118+
_, err := queryEngine.NewInstantQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), now)
119+
require.NoError(t, err)
120+
121+
_, err = queryEngine.NewRangeQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), start, now, step)
122+
require.NoError(t, err)
123+
})
124+
}
125+
}

pkg/querier/querier.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@ type Config struct {
8585

8686
ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"`
8787

88-
// Experimental. Use https://github.com/thanos-io/promql-engine rather than
89-
// the Prometheus query engine.
90-
ThanosEngine bool `yaml:"thanos_engine"`
88+
Engine engine.Config `yaml:"thanos_engine"`
9189

9290
// Ignore max query length check at Querier.
9391
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
@@ -139,13 +137,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
139137
f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).")
140138
f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
141139
f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).")
142-
f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
143140
f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.")
144141
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
145142
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
146143
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
147144
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
148145
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
146+
147+
cfg.Engine.RegisterFlags(f)
149148
}
150149

151150
// Validate the config
@@ -181,6 +180,10 @@ func (cfg *Config) Validate() error {
181180
}
182181
}
183182

183+
if err := cfg.Engine.Validate(); err != nil {
184+
return err
185+
}
186+
184187
return nil
185188
}
186189

@@ -246,7 +249,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
246249
return cfg.DefaultEvaluationInterval.Milliseconds()
247250
},
248251
}
249-
queryEngine := engine.New(opts, cfg.ThanosEngine, reg)
252+
queryEngine := engine.New(opts, cfg.Engine, reg)
250253
return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, queryEngine
251254
}
252255

0 commit comments

Comments
 (0)