Skip to content

Commit 979b1ef

Browse files
authored
add fetch timeout when fetching from expanded postings cache (#7185)
* add fetch timeout when fetching from expanded postings cache Signed-off-by: yeya24 <benye@amazon.com> * update changelog Signed-off-by: yeya24 <benye@amazon.com> * change default value to 0 Signed-off-by: yeya24 <benye@amazon.com> * update docs Signed-off-by: Ben Ye <benye@amazon.com> --------- Signed-off-by: yeya24 <benye@amazon.com> Signed-off-by: Ben Ye <benye@amazon.com>
1 parent fc92038 commit 979b1ef

File tree

7 files changed

+128
-6
lines changed

7 files changed

+128
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
* [ENHANCEMENT] Compactor: Avoid double compaction by not filtering delete blocks on real time when using bucketIndex lister. #7156
3030
* [ENHANCEMENT] Upgrade to go 1.25. #7164
3131
* [ENHANCEMENT] Upgraded container base images to `alpine:3.23`. #7163
32-
* [ENHANCEMENT] Instrument Ingester CPU profile with userID for read APIs. #7184
32+
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
33+
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
3334
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
3435
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
3536
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132

docs/blocks-storage/querier.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,6 +1929,12 @@ blocks_storage:
19291929
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
19301930
[ttl: <duration> | default = 10m]
19311931

1932+
# Timeout for fetching postings from TSDB index when cache miss occurs.
1933+
# This prevents runaway queries from consuming resources when all
1934+
# callers have given up.
1935+
# CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout
1936+
[fetch_timeout: <duration> | default = 0s]
1937+
19321938
# If enabled, ingesters will cache expanded postings for the compacted
19331939
# blocks. The cache is shared between all blocks.
19341940
blocks:
@@ -1944,6 +1950,12 @@ blocks_storage:
19441950
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
19451951
[ttl: <duration> | default = 10m]
19461952

1953+
# Timeout for fetching postings from TSDB index when cache miss occurs.
1954+
# This prevents runaway queries from consuming resources when all
1955+
# callers have given up.
1956+
# CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout
1957+
[fetch_timeout: <duration> | default = 0s]
1958+
19471959
users_scanner:
19481960
# Strategy to use to scan users. Supported values are: list, user_index.
19491961
# CLI flag: -blocks-storage.users-scanner.strategy

docs/blocks-storage/store-gateway.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,6 +2000,12 @@ blocks_storage:
20002000
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
20012001
[ttl: <duration> | default = 10m]
20022002

2003+
# Timeout for fetching postings from TSDB index when cache miss occurs.
2004+
# This prevents runaway queries from consuming resources when all
2005+
# callers have given up.
2006+
# CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout
2007+
[fetch_timeout: <duration> | default = 0s]
2008+
20032009
# If enabled, ingesters will cache expanded postings for the compacted
20042010
# blocks. The cache is shared between all blocks.
20052011
blocks:
@@ -2015,6 +2021,12 @@ blocks_storage:
20152021
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
20162022
[ttl: <duration> | default = 10m]
20172023

2024+
# Timeout for fetching postings from TSDB index when cache miss occurs.
2025+
# This prevents runaway queries from consuming resources when all
2026+
# callers have given up.
2027+
# CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout
2028+
[fetch_timeout: <duration> | default = 0s]
2029+
20182030
users_scanner:
20192031
# Strategy to use to scan users. Supported values are: list, user_index.
20202032
# CLI flag: -blocks-storage.users-scanner.strategy

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2605,6 +2605,12 @@ tsdb:
26052605
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
26062606
[ttl: <duration> | default = 10m]
26072607

2608+
# Timeout for fetching postings from TSDB index when cache miss occurs.
2609+
# This prevents runaway queries from consuming resources when all callers
2610+
# have given up.
2611+
# CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout
2612+
[fetch_timeout: <duration> | default = 0s]
2613+
26082614
# If enabled, ingesters will cache expanded postings for the compacted
26092615
# blocks. The cache is shared between all blocks.
26102616
blocks:
@@ -2620,6 +2626,12 @@ tsdb:
26202626
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
26212627
[ttl: <duration> | default = 10m]
26222628

2629+
# Timeout for fetching postings from TSDB index when cache miss occurs.
2630+
# This prevents runaway queries from consuming resources when all callers
2631+
# have given up.
2632+
# CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout
2633+
[fetch_timeout: <duration> | default = 0s]
2634+
26232635
users_scanner:
26242636
# Strategy to use to scan users. Supported values are: list, user_index.
26252637
# CLI flag: -blocks-storage.users-scanner.strategy

pkg/storage/tsdb/expanded_postings_cache.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@ type TSDBPostingsCacheConfig struct {
8080
}
8181

8282
type PostingsCacheConfig struct {
83-
Enabled bool `yaml:"enabled"`
84-
MaxBytes int64 `yaml:"max_bytes"`
85-
Ttl time.Duration `yaml:"ttl"`
83+
Enabled bool `yaml:"enabled"`
84+
MaxBytes int64 `yaml:"max_bytes"`
85+
Ttl time.Duration `yaml:"ttl"`
86+
FetchTimeout time.Duration `yaml:"fetch_timeout"`
8687
}
8788

8889
func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
@@ -94,6 +95,7 @@ func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *fl
9495
func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f *flag.FlagSet) {
9596
f.Int64Var(&cfg.MaxBytes, prefix+"expanded_postings_cache."+block+".max-bytes", 10*1024*1024, "Max bytes for postings cache")
9697
f.DurationVar(&cfg.Ttl, prefix+"expanded_postings_cache."+block+".ttl", 10*time.Minute, "TTL for postings cache")
98+
f.DurationVar(&cfg.FetchTimeout, prefix+"expanded_postings_cache."+block+".fetch-timeout", 0, "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.")
9799
f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not")
98100
}
99101

@@ -219,8 +221,18 @@ func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
219221
c.metrics.CacheRequests.WithLabelValues(cache.name).Inc()
220222

221223
fetch := func() ([]storage.SeriesRef, int64, error) {
222-
// Use context.Background() as this promise is maybe shared across calls
223-
postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...)
224+
// Use a context with timeout instead of context.Background() to prevent runaway queries.
225+
// This promise is maybe shared across calls, so we can't use any single caller's context.
226+
// However, we need a timeout to prevent the fetch from running indefinitely when all
227+
// callers have given up (e.g., after their 1-minute query timeout).
228+
fetchCtx := context.Background()
229+
if cache.cfg.FetchTimeout > 0 {
230+
var cancel context.CancelFunc
231+
fetchCtx, cancel = context.WithTimeout(fetchCtx, cache.cfg.FetchTimeout)
232+
defer cancel()
233+
}
234+
235+
postings, err := c.postingsForMatchersFunc(fetchCtx, ix, ms...)
224236

225237
if err == nil {
226238
ids, err := index.ExpandPostings(postings)

pkg/storage/tsdb/expanded_postings_cache_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package tsdb
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"strings"
78
"sync"
@@ -12,6 +13,8 @@ import (
1213
"github.com/prometheus/client_golang/prometheus"
1314
"github.com/prometheus/client_golang/prometheus/testutil"
1415
"github.com/prometheus/prometheus/model/labels"
16+
"github.com/prometheus/prometheus/tsdb"
17+
"github.com/prometheus/prometheus/tsdb/index"
1518
"github.com/stretchr/testify/require"
1619
"go.uber.org/atomic"
1720
)
@@ -233,3 +236,59 @@ func RepeatStringIfNeeded(seed string, length int) string {
233236

234237
return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))]
235238
}
239+
240+
func TestPostingsCacheFetchTimeout(t *testing.T) {
241+
// Test that the fetch operation respects the FetchTimeout configuration
242+
// to prevent runaway queries when all callers have given up.
243+
cfg := TSDBPostingsCacheConfig{
244+
Head: PostingsCacheConfig{
245+
Enabled: true,
246+
Ttl: time.Hour,
247+
MaxBytes: 10 << 20,
248+
FetchTimeout: 100 * time.Millisecond,
249+
},
250+
}
251+
252+
fetchStarted := make(chan struct{})
253+
fetchShouldBlock := make(chan struct{})
254+
fetchCompleted := atomic.Bool{}
255+
256+
cfg.PostingsForMatchers = func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
257+
close(fetchStarted)
258+
select {
259+
case <-ctx.Done():
260+
// Good! Context was cancelled due to timeout
261+
return nil, ctx.Err()
262+
case <-fetchShouldBlock:
263+
// This shouldn't happen - the fetch should be cancelled by timeout
264+
fetchCompleted.Store(true)
265+
return index.EmptyPostings(), nil
266+
}
267+
}
268+
269+
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
270+
cache := newBlocksPostingsForMatchersCache("user1", cfg, m, newSeedByHash(seedArraySize))
271+
272+
// Start a query that will trigger the fetch
273+
blockID := headULID
274+
queryCtx, queryCancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
275+
defer queryCancel()
276+
277+
_, err := cache.PostingsForMatchers(queryCtx, blockID, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "test_metric"))
278+
279+
// Wait for fetch to start
280+
<-fetchStarted
281+
282+
// The query context will timeout after 50ms
283+
// But the fetch should continue with its own timeout (100ms)
284+
require.Error(t, err)
285+
require.ErrorIs(t, err, context.DeadlineExceeded)
286+
287+
// Wait a bit more than the fetch timeout
288+
time.Sleep(1 * time.Second)
289+
290+
// The fetch should have been cancelled by its timeout, not completed
291+
require.False(t, fetchCompleted.Load(), "Fetch should have been cancelled by timeout, not completed")
292+
293+
close(fetchShouldBlock)
294+
}

schemas/cortex-config-schema.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2972,6 +2972,13 @@
29722972
"type": "boolean",
29732973
"x-cli-flag": "blocks-storage.expanded_postings_cache.block.enabled"
29742974
},
2975+
"fetch_timeout": {
2976+
"default": "0s",
2977+
"description": "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.",
2978+
"type": "string",
2979+
"x-cli-flag": "blocks-storage.expanded_postings_cache.block.fetch-timeout",
2980+
"x-format": "duration"
2981+
},
29752982
"max_bytes": {
29762983
"default": 10485760,
29772984
"description": "Max bytes for postings cache",
@@ -2997,6 +3004,13 @@
29973004
"type": "boolean",
29983005
"x-cli-flag": "blocks-storage.expanded_postings_cache.head.enabled"
29993006
},
3007+
"fetch_timeout": {
3008+
"default": "0s",
3009+
"description": "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.",
3010+
"type": "string",
3011+
"x-cli-flag": "blocks-storage.expanded_postings_cache.head.fetch-timeout",
3012+
"x-format": "duration"
3013+
},
30003014
"max_bytes": {
30013015
"default": 10485760,
30023016
"description": "Max bytes for postings cache",

0 commit comments

Comments
 (0)