Skip to content

change to check tenant limit validation only for some targets #6880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
* [BUGFIX] Ingester: Allow shipper to skip corrupted blocks. #6786
* [BUGFIX] Compactor: Delete the prefix `blocks_meta` from the metadata fetcher metrics. #6832
* [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863
* [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880

## 1.19.0 2025-02-27

Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ The next three options only apply when the querier is used together with the Que

Set this flag to `true` for the new behaviour.

Important to note is that when setting this flag to `true`, it has to be set on both the distributor and the querier (called `-distributor.shard-by-all-labels` on Querier as well). If the flag is only set on the distributor and not on the querier, you will get incomplete query results because not all ingesters are queried.
Important to note is that when setting this flag to `true`, it has to be set on the distributor, the querier, and the ruler (called `-distributor.shard-by-all-labels` on Querier as well). If the flag is only set on the distributor and not on the querier, you will get incomplete query results because not all ingesters are queried.

**Upgrade notes**: As this flag also makes all queries always read from all ingesters, the upgrade path is pretty trivial; just enable the flag. When you do enable it, you'll see a spike in the number of active series as the writes are "reshuffled" amongst the ingesters, but over the next stale period all the old series will be flushed, and you should end up with much better load balancing. With this flag enabled in the queriers, reads will always catch all the data from all ingesters.

Expand Down
94 changes: 94 additions & 0 deletions integration/runtime_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -164,6 +165,99 @@ func TestLoadRuntimeConfigFromCloudStorage(t *testing.T) {
require.NoError(t, s.Stop(cortexSvc))
}

// Verify components are successfully started with `-distributor.shard-by-all-labels=false`
// except for "distributor", "querier", and "ruler"
// refer to https://github.com/cortexproject/cortex/issues/6741#issuecomment-3067244929
func Test_VerifyComponentsAreSuccessfullyStarted_WithRuntimeConfigLoad(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

runtimeConfigYamlFile := `
overrides:
'user-1':
max_global_series_per_user: 15000
`

require.NoError(t, writeFileToSharedDir(s, runtimeConfigFile, []byte(runtimeConfigYamlFile)))
filePath := filepath.Join(e2e.ContainerSharedDir, runtimeConfigFile)

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-runtime-config.file": filePath,
"-runtime-config.backend": "filesystem",

// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-alertmanager-storage.backend": "local",
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),

// store-gateway
"-querier.store-gateway-addresses": "localhost:12345",

// distributor.shard-by-all-labels is false
"-distributor.shard-by-all-labels": "false",
})

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

// Ingester and Store gateway start
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(ingester, storeGateway))

// Querier start, but fail with "-distributor.shard-by-all-labels": "false"
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
}), "")
require.Error(t, s.StartAndWaitReady(querier))

// Start Query frontend
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
require.NoError(t, s.Start(queryFrontend))

// Querier start, should success with "-distributor.shard-by-all-labels": "true"
querier = e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
"-distributor.shard-by-all-labels": "true",
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

// Ruler start, but fail with "-distributor.shard-by-all-labels": "false"
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, RulerFlags()), "")
require.Error(t, s.StartAndWaitReady(ruler))

// Ruler start, should success with "-distributor.shard-by-all-labels": "true"
ruler = e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, RulerFlags(), map[string]string{
"-distributor.shard-by-all-labels": "true",
}), "")
require.NoError(t, s.StartAndWaitReady(ruler))

// Start the query-scheduler
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
require.NoError(t, s.StartAndWaitReady(queryScheduler))

// Start Alertmanager
alertmanager := e2ecortex.NewAlertmanager("alertmanager", mergeFlags(flags, AlertmanagerFlags()), "")
require.NoError(t, s.StartAndWaitReady(alertmanager))

// Distributor start, but fail with "-distributor.shard-by-all-labels": "false"
distributor := e2ecortex.NewQuerier("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{}), "")
require.Error(t, s.StartAndWaitReady(distributor))

// Distributor start, should success with "-distributor.shard-by-all-labels": "true"
distributor = e2ecortex.NewQuerier("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-distributor.shard-by-all-labels": "true",
}), "")
require.NoError(t, s.StartAndWaitReady(distributor))
}

func assertRuntimeConfigLoadedCorrectly(t *testing.T, cortexSvc *e2ecortex.CortexService) {
runtimeConfig := getRuntimeConfig(t, cortexSvc)

Expand Down
17 changes: 13 additions & 4 deletions pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"io"
"net/http"
"strings"

"gopkg.in/yaml.v2"

Expand All @@ -15,7 +16,8 @@ import (
)

var (
errMultipleDocuments = errors.New("the provided runtime configuration contains multiple documents")
errMultipleDocuments = errors.New("the provided runtime configuration contains multiple documents")
tenantLimitCheckTargets = []string{All, Distributor, Querier, Ruler}
)

// RuntimeConfigValues are values that can be reloaded from configuration file while Cortex is running.
Expand Down Expand Up @@ -78,9 +80,16 @@ func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) {
return nil, errMultipleDocuments
}

for _, ul := range overrides.TenantLimits {
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil {
return nil, err
targetStr := l.cfg.Target.String()
for _, target := range tenantLimitCheckTargets {
if strings.Contains(targetStr, target) {
// only check if target is `all`, `distributor`, "querier", and "ruler"
// refer to https://github.com/cortexproject/cortex/issues/6741#issuecomment-3067244929
for _, ul := range overrides.TenantLimits {
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil {
return nil, err
}
}
}
}

Expand Down
104 changes: 104 additions & 0 deletions pkg/cortex/runtime_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand Down Expand Up @@ -109,3 +110,106 @@ overrides:
assert.Nil(t, actual)
}
}

func TestLoad_ShouldNotErrorWithCertainTarget(t *testing.T) {

tests := []struct {
desc string
target []string
shardByAllLabels bool
isErr bool
}{
{
desc: "all",
target: []string{All},
shardByAllLabels: true,
},
{
desc: "all, shardByAllLabels:false",
target: []string{All},
shardByAllLabels: false,
isErr: true,
},
{
desc: "distributor",
target: []string{Distributor},
shardByAllLabels: true,
},
{
desc: "distributor, shardByAllLabels:false",
target: []string{Distributor},
shardByAllLabels: false,
isErr: true,
},
{
desc: "querier",
target: []string{Querier},
shardByAllLabels: true,
},
{
desc: "querier, shardByAllLabels:false",
target: []string{Querier},
shardByAllLabels: false,
isErr: true,
},
{
desc: "ruler",
target: []string{Ruler},
shardByAllLabels: true,
},
{
desc: "ruler, shardByAllLabels:false",
target: []string{Ruler},
shardByAllLabels: false,
isErr: true,
},
{
desc: "ingester",
target: []string{Ingester},
},
{
desc: "query frontend",
target: []string{QueryFrontend},
},
{
desc: "alertmanager",
target: []string{AlertManager},
},
{
desc: "store gateway",
target: []string{StoreGateway},
},
{
desc: "compactor",
target: []string{Compactor},
},
{
desc: "overrides exporter",
target: []string{OverridesExporter},
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
yamlFile := strings.NewReader(`
overrides:
'user-1':
max_global_series_per_user: 15000
`)

loader := runtimeConfigLoader{}
loader.cfg = Config{
Target: tc.target,
Distributor: distributor.Config{ShardByAllLabels: tc.shardByAllLabels},
Ingester: ingester.Config{ActiveSeriesMetricsEnabled: true},
}

_, err := loader.load(yamlFile)
if tc.isErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
Loading