Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
* [ENHANCEMENT] Compactor: Avoid double compaction by not filtering delete blocks on real time when using bucketIndex lister. #7156
* [ENHANCEMENT] Upgrade to go 1.25. #7164
* [ENHANCEMENT] Upgraded container base images to `alpine:3.23`. #7163
* [ENHANCEMENT] Instrument Ingester CPU profile with userID for read APIs. #7184
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
Expand Down
12 changes: 12 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1929,6 +1929,12 @@ blocks_storage:
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
[ttl: <duration> | default = 10m]

# Timeout for fetching postings from TSDB index when cache miss occurs.
# This prevents runaway queries from consuming resources when all
# callers have given up.
# CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout
[fetch_timeout: <duration> | default = 0s]

# If enabled, ingesters will cache expanded postings for the compacted
# blocks. The cache is shared between all blocks.
blocks:
Expand All @@ -1944,6 +1950,12 @@ blocks_storage:
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]

# Timeout for fetching postings from TSDB index when cache miss occurs.
# This prevents runaway queries from consuming resources when all
# callers have given up.
# CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout
[fetch_timeout: <duration> | default = 0s]

users_scanner:
# Strategy to use to scan users. Supported values are: list, user_index.
# CLI flag: -blocks-storage.users-scanner.strategy
Expand Down
12 changes: 12 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,12 @@ blocks_storage:
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
[ttl: <duration> | default = 10m]

# Timeout for fetching postings from TSDB index when cache miss occurs.
# This prevents runaway queries from consuming resources when all
# callers have given up.
# CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout
[fetch_timeout: <duration> | default = 0s]

# If enabled, ingesters will cache expanded postings for the compacted
# blocks. The cache is shared between all blocks.
blocks:
Expand All @@ -2015,6 +2021,12 @@ blocks_storage:
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]

# Timeout for fetching postings from TSDB index when cache miss occurs.
# This prevents runaway queries from consuming resources when all
# callers have given up.
# CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout
[fetch_timeout: <duration> | default = 0s]

users_scanner:
# Strategy to use to scan users. Supported values are: list, user_index.
# CLI flag: -blocks-storage.users-scanner.strategy
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,12 @@ tsdb:
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
[ttl: <duration> | default = 10m]

# Timeout for fetching postings from TSDB index when cache miss occurs.
# This prevents runaway queries from consuming resources when all callers
# have given up.
# CLI flag: -blocks-storage.expanded_postings_cache.head.fetch-timeout
[fetch_timeout: <duration> | default = 0s]

# If enabled, ingesters will cache expanded postings for the compacted
# blocks. The cache is shared between all blocks.
blocks:
Expand All @@ -2620,6 +2626,12 @@ tsdb:
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]

# Timeout for fetching postings from TSDB index when cache miss occurs.
# This prevents runaway queries from consuming resources when all callers
# have given up.
# CLI flag: -blocks-storage.expanded_postings_cache.block.fetch-timeout
[fetch_timeout: <duration> | default = 0s]

users_scanner:
# Strategy to use to scan users. Supported values are: list, user_index.
# CLI flag: -blocks-storage.users-scanner.strategy
Expand Down
22 changes: 17 additions & 5 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ type TSDBPostingsCacheConfig struct {
}

type PostingsCacheConfig struct {
Enabled bool `yaml:"enabled"`
MaxBytes int64 `yaml:"max_bytes"`
Ttl time.Duration `yaml:"ttl"`
Enabled bool `yaml:"enabled"`
MaxBytes int64 `yaml:"max_bytes"`
Ttl time.Duration `yaml:"ttl"`
FetchTimeout time.Duration `yaml:"fetch_timeout"`
}

func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -94,6 +95,7 @@ func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *fl
func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f *flag.FlagSet) {
f.Int64Var(&cfg.MaxBytes, prefix+"expanded_postings_cache."+block+".max-bytes", 10*1024*1024, "Max bytes for postings cache")
f.DurationVar(&cfg.Ttl, prefix+"expanded_postings_cache."+block+".ttl", 10*time.Minute, "TTL for postings cache")
f.DurationVar(&cfg.FetchTimeout, prefix+"expanded_postings_cache."+block+".fetch-timeout", 0, "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.")
f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not")
}

Expand Down Expand Up @@ -219,8 +221,18 @@ func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
c.metrics.CacheRequests.WithLabelValues(cache.name).Inc()

fetch := func() ([]storage.SeriesRef, int64, error) {
// Use context.Background() as this promise is maybe shared across calls
postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...)
// Use a context with timeout instead of context.Background() to prevent runaway queries.
// This promise is maybe shared across calls, so we can't use any single caller's context.
// However, we need a timeout to prevent the fetch from running indefinitely when all
// callers have given up (e.g., after their 1-minute query timeout).
fetchCtx := context.Background()
if cache.cfg.FetchTimeout > 0 {
var cancel context.CancelFunc
fetchCtx, cancel = context.WithTimeout(fetchCtx, cache.cfg.FetchTimeout)
defer cancel()
}

postings, err := c.postingsForMatchersFunc(fetchCtx, ix, ms...)

if err == nil {
ids, err := index.ExpandPostings(postings)
Expand Down
59 changes: 59 additions & 0 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"bytes"
"context"
"fmt"
"strings"
"sync"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -233,3 +236,59 @@ func RepeatStringIfNeeded(seed string, length int) string {

return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))]
}

func TestPostingsCacheFetchTimeout(t *testing.T) {
// Test that the fetch operation respects the FetchTimeout configuration
// to prevent runaway queries when all callers have given up.
cfg := TSDBPostingsCacheConfig{
Head: PostingsCacheConfig{
Enabled: true,
Ttl: time.Hour,
MaxBytes: 10 << 20,
FetchTimeout: 100 * time.Millisecond,
},
}

fetchStarted := make(chan struct{})
fetchShouldBlock := make(chan struct{})
fetchCompleted := atomic.Bool{}

cfg.PostingsForMatchers = func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
close(fetchStarted)
select {
case <-ctx.Done():
// Good! Context was cancelled due to timeout
return nil, ctx.Err()
case <-fetchShouldBlock:
// This shouldn't happen - the fetch should be cancelled by timeout
fetchCompleted.Store(true)
return index.EmptyPostings(), nil
}
}

m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
cache := newBlocksPostingsForMatchersCache("user1", cfg, m, newSeedByHash(seedArraySize))

// Start a query that will trigger the fetch
blockID := headULID
queryCtx, queryCancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer queryCancel()

_, err := cache.PostingsForMatchers(queryCtx, blockID, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "test_metric"))

// Wait for fetch to start
<-fetchStarted

// The query context will timeout after 50ms
// But the fetch should continue with its own timeout (100ms)
require.Error(t, err)
require.ErrorIs(t, err, context.DeadlineExceeded)

// Wait a bit more than the fetch timeout
time.Sleep(1 * time.Second)

// The fetch should have been cancelled by its timeout, not completed
require.False(t, fetchCompleted.Load(), "Fetch should have been cancelled by timeout, not completed")

close(fetchShouldBlock)
}
14 changes: 14 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2972,6 +2972,13 @@
"type": "boolean",
"x-cli-flag": "blocks-storage.expanded_postings_cache.block.enabled"
},
"fetch_timeout": {
"default": "0s",
"description": "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.",
"type": "string",
"x-cli-flag": "blocks-storage.expanded_postings_cache.block.fetch-timeout",
"x-format": "duration"
},
"max_bytes": {
"default": 10485760,
"description": "Max bytes for postings cache",
Expand All @@ -2997,6 +3004,13 @@
"type": "boolean",
"x-cli-flag": "blocks-storage.expanded_postings_cache.head.enabled"
},
"fetch_timeout": {
"default": "0s",
"description": "Timeout for fetching postings from TSDB index when cache miss occurs. This prevents runaway queries from consuming resources when all callers have given up.",
"type": "string",
"x-cli-flag": "blocks-storage.expanded_postings_cache.head.fetch-timeout",
"x-format": "duration"
},
"max_bytes": {
"default": 10485760,
"description": "Max bytes for postings cache",
Expand Down
Loading