Skip to content

Commit 1cb0707

Browse files
committed
add engine unit test and distributed exec unit test
Signed-off-by: rubywtl <[email protected]>
1 parent 845a7a7 commit 1cb0707

File tree

3 files changed

+204
-0
lines changed

3 files changed

+204
-0
lines changed

integration/querier_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,3 +1310,109 @@ func TestQuerierMaxSamplesLimit(t *testing.T) {
13101310
Error: "query processing would load too many samples into memory in query execution",
13111311
})
13121312
}
1313+
1314+
func TestQuerierDistributedExecution(t *testing.T) {
1315+
// e2e test setup
1316+
s, err := e2e.NewScenario(networkName)
1317+
require.NoError(t, err)
1318+
defer s.Close()
1319+
1320+
consul := e2edb.NewConsulWithName("consul")
1321+
memcached := e2ecache.NewMemcached()
1322+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
1323+
1324+
// initialize the flags
1325+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
1326+
flags := mergeFlags(
1327+
baseFlags,
1328+
map[string]string{
1329+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
1330+
"-blocks-storage.tsdb.block-ranges-period": "2h",
1331+
"-blocks-storage.tsdb.ship-interval": "1h",
1332+
"-blocks-storage.bucket-store.sync-interval": "1s",
1333+
"-blocks-storage.tsdb.retention-period": "24h",
1334+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
1335+
"-querier.query-store-for-labels-enabled": "true",
1336+
// Ingester.
1337+
"-ring.store": "consul",
1338+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
1339+
// Distributor.
1340+
"-distributor.replication-factor": "1",
1341+
// Store-gateway.
1342+
"-store-gateway.sharding-enabled": "false",
1343+
// Alert manager
1344+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
1345+
"-frontend.query-vertical-shard-size": "1",
1346+
"-frontend.max-cache-freshness": "1m",
1347+
// enable experimental promQL funcs
1348+
"-querier.enable-promql-experimental-functions": "true",
1349+
// enable distributed execution (logical plan execution)
1350+
"-querier.distributed-exec-enabled": "true",
1351+
},
1352+
)
1353+
1354+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
1355+
require.NoError(t, s.StartAndWaitReady(minio))
1356+
1357+
// start services
1358+
var queryScheduler *e2ecortex.CortexService
1359+
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
1360+
require.NoError(t, s.StartAndWaitReady(queryScheduler))
1361+
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
1362+
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
1363+
1364+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
1365+
require.NoError(t, s.Start(queryFrontend))
1366+
1367+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1368+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1369+
querier1 := e2ecortex.NewQuerier("querier-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1370+
querier2 := e2ecortex.NewQuerier("querier-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1371+
1372+
require.NoError(t, s.StartAndWaitReady(querier1, querier2, ingester, distributor))
1373+
require.NoError(t, s.WaitReady(queryFrontend))
1374+
1375+
// wait until distributor and queriers have updated the ring.
1376+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1377+
require.NoError(t, querier1.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1378+
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1379+
1380+
// push some series to Cortex.
1381+
distClient, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
1382+
require.NoError(t, err)
1383+
1384+
series1Timestamp := time.Now()
1385+
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
1386+
series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
1387+
series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})
1388+
1389+
res, err := distClient.Push(series1)
1390+
require.NoError(t, err)
1391+
require.Equal(t, 200, res.StatusCode)
1392+
1393+
res, err = distClient.Push(series2)
1394+
require.NoError(t, err)
1395+
require.Equal(t, 200, res.StatusCode)
1396+
1397+
for _, q := range []*e2ecortex.CortexService{querier1, querier2} {
1398+
c, err := e2ecortex.NewClient("", q.HTTPEndpoint(), "", "", userID)
1399+
require.NoError(t, err)
1400+
1401+
_, err = c.Query("series_1", now)
1402+
require.NoError(t, err)
1403+
}
1404+
1405+
require.NoError(t, queryScheduler.WaitSumMetrics(e2e.Equals(2), "cortex_query_scheduler_connected_querier_clients"))
1406+
1407+
// main tests
1408+
// - make sure queries are still executable with distributed execution enabled
1409+
res, body, err = c.QueryRaw(`sum({job="test"})`, series1Timestamp, map[string]string{})
1410+
require.NoError(t, err)
1411+
require.Equal(t, 200, res.StatusCode)
1412+
require.Equal(t, expectedVector1, string(body))
1413+
1414+
res, body, err = c.QueryRaw(`sum({job="test"})`, series2Timestamp, map[string]string{})
1415+
require.NoError(t, err)
1416+
require.Equal(t, 200, res.StatusCode)
1417+
require.Equal(t, expectedVector2, string(body))
1418+
}

pkg/engine/engine.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ prom:
137137
func (qf *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time, qs string) (promql.Query, error) {
138138
if engineType := GetEngineType(ctx); engineType == Prometheus {
139139
qf.engineSwitchQueriesTotal.WithLabelValues(string(Prometheus)).Inc()
140+
goto prom
140141
} else if engineType == Thanos {
141142
qf.engineSwitchQueriesTotal.WithLabelValues(string(Thanos)).Inc()
142143
}
@@ -161,6 +162,7 @@ prom:
161162
func (qf *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start time.Time, end time.Time, interval time.Duration, qs string) (promql.Query, error) {
162163
if engineType := GetEngineType(ctx); engineType == Prometheus {
163164
qf.engineSwitchQueriesTotal.WithLabelValues(string(Prometheus)).Inc()
165+
goto prom
164166
} else if engineType == Thanos {
165167
qf.engineSwitchQueriesTotal.WithLabelValues(string(Thanos)).Inc()
166168
}

pkg/engine/engine_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"github.com/prometheus/prometheus/promql/parser"
1515
"github.com/prometheus/prometheus/promql/promqltest"
1616
"github.com/stretchr/testify/require"
17+
"github.com/thanos-io/promql-engine/logicalplan"
18+
"github.com/thanos-io/promql-engine/query"
1719

1820
utillog "github.com/cortexproject/cortex/pkg/util/log"
1921
)
@@ -96,3 +98,97 @@ func TestEngine_Switch(t *testing.T) {
9698
cortex_engine_switch_queries_total{engine_type="thanos"} 2
9799
`), "cortex_engine_switch_queries_total"))
98100
}
101+
102+
func TestEngine_With_Logical_Plan(t *testing.T) {
103+
ctx := context.Background()
104+
reg := prometheus.NewRegistry()
105+
106+
now := time.Now()
107+
start := time.Now().Add(-time.Minute * 5)
108+
step := time.Minute
109+
queryable := promqltest.LoadedStorage(t, "")
110+
opts := promql.EngineOpts{
111+
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
112+
Reg: reg,
113+
}
114+
queryEngine := New(opts, true, reg)
115+
116+
range_lp := createTestLogicalPlan(t, start, now, step, "up")
117+
instant_lp := createTestLogicalPlan(t, now, now, 0, "up")
118+
119+
r := &http.Request{Header: http.Header{}}
120+
r.Header.Set(TypeHeader, string(Thanos))
121+
ctx = AddEngineTypeToContext(ctx, r)
122+
123+
// Case 1: Executing logical plan with thanos engine
124+
_, _ = queryEngine.MakeInstantQueryFromPlan(ctx, queryable, nil, instant_lp.Root(), now, "up")
125+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
126+
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
127+
# TYPE cortex_thanos_engine_fallback_queries_total counter
128+
cortex_thanos_engine_fallback_queries_total 0
129+
`), "cortex_thanos_engine_fallback_queries_total"))
130+
131+
_, _ = queryEngine.MakeRangeQueryFromPlan(ctx, queryable, nil, range_lp.Root(), start, now, step, "up")
132+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
133+
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
134+
# TYPE cortex_thanos_engine_fallback_queries_total counter
135+
cortex_thanos_engine_fallback_queries_total 0
136+
`), "cortex_thanos_engine_fallback_queries_total"))
137+
138+
// Case 2: Logical plan that thanos engine cannot execute (so it will fall back to prometheus engine)
139+
err_range_lp := createTestLogicalPlan(t, start, now, step, "up[10]")
140+
_, _ = queryEngine.MakeRangeQueryFromPlan(ctx, queryable, nil, err_range_lp.Root(), start, now, step, "up")
141+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
142+
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
143+
# TYPE cortex_thanos_engine_fallback_queries_total counter
144+
cortex_thanos_engine_fallback_queries_total 1
145+
`), "cortex_thanos_engine_fallback_queries_total"))
146+
147+
// Case 3: executing with prometheus engine
148+
r.Header.Set(TypeHeader, string(Prometheus))
149+
ctx = AddEngineTypeToContext(ctx, r)
150+
151+
_, _ = queryEngine.MakeInstantQueryFromPlan(ctx, queryable, nil, instant_lp.Root(), now, "up")
152+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
153+
# HELP cortex_engine_switch_queries_total Total number of queries where engine_type is set explicitly
154+
# TYPE cortex_engine_switch_queries_total counter
155+
cortex_engine_switch_queries_total{engine_type="prometheus"} 1
156+
cortex_engine_switch_queries_total{engine_type="thanos"} 3
157+
`), "cortex_engine_switch_queries_total"))
158+
159+
_, _ = queryEngine.MakeRangeQueryFromPlan(ctx, queryable, nil, range_lp.Root(), start, now, step, "up")
160+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
161+
# HELP cortex_engine_switch_queries_total Total number of queries where engine_type is set explicitly
162+
# TYPE cortex_engine_switch_queries_total counter
163+
cortex_engine_switch_queries_total{engine_type="prometheus"} 2
164+
cortex_engine_switch_queries_total{engine_type="thanos"} 3
165+
`), "cortex_engine_switch_queries_total"))
166+
}
167+
168+
func createTestLogicalPlan(t *testing.T, startTime time.Time, endTime time.Time, step time.Duration, q string) logicalplan.Plan {
169+
170+
qOpts := query.Options{
171+
Start: startTime,
172+
End: startTime,
173+
Step: 0,
174+
StepsBatch: 10,
175+
LookbackDelta: 0,
176+
EnablePerStepStats: false,
177+
}
178+
179+
if step != 0 {
180+
qOpts.End = endTime
181+
qOpts.Step = step
182+
}
183+
184+
expr, err := parser.NewParser(q, parser.WithFunctions(parser.Functions)).ParseExpr()
185+
require.NoError(t, err)
186+
187+
planOpts := logicalplan.PlanOptions{
188+
DisableDuplicateLabelCheck: false,
189+
}
190+
191+
logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts)
192+
193+
return logicalPlan
194+
}

0 commit comments

Comments
 (0)