diff --git a/CHANGELOG.md b/CHANGELOG.md index 56f3e485ed9..b1c43a76b31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,7 @@ * [BUGFIX] Ingester: Allow shipper to skip corrupted blocks. #6786 * [BUGFIX] Compactor: Delete the prefix `blocks_meta` from the metadata fetcher metrics. #6832 * [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863 +* [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880 ## 1.19.0 2025-02-27 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index b26398444d5..07fcc3e5be5 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -97,7 +97,7 @@ The next three options only apply when the querier is used together with the Que Set this flag to `true` for the new behaviour. - Important to note is that when setting this flag to `true`, it has to be set on both the distributor and the querier (called `-distributor.shard-by-all-labels` on Querier as well). If the flag is only set on the distributor and not on the querier, you will get incomplete query results because not all ingesters are queried. + Important to note is that when setting this flag to `true`, it has to be set on the distributor, the querier, and the ruler (called `-distributor.shard-by-all-labels` on Querier as well). If the flag is only set on the distributor and not on the querier, you will get incomplete query results because not all ingesters are queried. **Upgrade notes**: As this flag also makes all queries always read from all ingesters, the upgrade path is pretty trivial; just enable the flag. When you do enable it, you'll see a spike in the number of active series as the writes are "reshuffled" amongst the ingesters, but over the next stale period all the old series will be flushed, and you should end up with much better load balancing. With this flag enabled in the queriers, reads will always catch all the data from all ingesters. diff --git a/integration/runtime_config_test.go b/integration/runtime_config_test.go index 65d188702f7..62f1ba18082 100644 --- a/integration/runtime_config_test.go +++ b/integration/runtime_config_test.go @@ -10,6 +10,7 @@ import ( "io" "os" "path/filepath" + "strings" "testing" "time" @@ -164,6 +165,99 @@ func TestLoadRuntimeConfigFromCloudStorage(t *testing.T) { require.NoError(t, s.Stop(cortexSvc)) } +// Verify components are successfully started with `-distributor.shard-by-all-labels=false` +// except for "distributor", "querier", and "ruler" +// refer to https://github.com/cortexproject/cortex/issues/6741#issuecomment-3067244929 +func Test_VerifyComponentsAreSuccessfullyStarted_WithRuntimeConfigLoad(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + runtimeConfigYamlFile := ` +overrides: + 'user-1': + max_global_series_per_user: 15000 +` + + require.NoError(t, writeFileToSharedDir(s, runtimeConfigFile, []byte(runtimeConfigYamlFile))) + filePath := filepath.Join(e2e.ContainerSharedDir, runtimeConfigFile) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-runtime-config.file": filePath, + "-runtime-config.backend": "filesystem", + + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + "-alertmanager-storage.backend": "local", + "-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"), + + // store-gateway + "-querier.store-gateway-addresses": "localhost:12345", + + // distributor.shard-by-all-labels is false + "-distributor.shard-by-all-labels": "false", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + // Ingester and Store gateway start + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(ingester, storeGateway)) + + // Querier start, but fail with "-distributor.shard-by-all-labels": "false" + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","), + }), "") + require.Error(t, s.StartAndWaitReady(querier)) + + // Start Query frontend + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + // Querier start, should success with "-distributor.shard-by-all-labels": "true" + querier = e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","), + "-distributor.shard-by-all-labels": "true", + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) + + // Ruler start, but fail with "-distributor.shard-by-all-labels": "false" + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, RulerFlags()), "") + require.Error(t, s.StartAndWaitReady(ruler)) + + // Ruler start, should success with "-distributor.shard-by-all-labels": "true" + ruler = e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, RulerFlags(), map[string]string{ + "-distributor.shard-by-all-labels": "true", + }), "") + require.NoError(t, s.StartAndWaitReady(ruler)) + + // Start the query-scheduler + queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "") + require.NoError(t, s.StartAndWaitReady(queryScheduler)) + + // Start Alertmanager + alertmanager := e2ecortex.NewAlertmanager("alertmanager", mergeFlags(flags, AlertmanagerFlags()), "") + require.NoError(t, s.StartAndWaitReady(alertmanager)) + + // Distributor start, but fail with "-distributor.shard-by-all-labels": "false" + distributor := e2ecortex.NewQuerier("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{}), "") + require.Error(t, s.StartAndWaitReady(distributor)) + + // Distributor start, should success with "-distributor.shard-by-all-labels": "true" + distributor = e2ecortex.NewQuerier("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-distributor.shard-by-all-labels": "true", + }), "") + require.NoError(t, s.StartAndWaitReady(distributor)) +} + func assertRuntimeConfigLoadedCorrectly(t *testing.T, cortexSvc *e2ecortex.CortexService) { runtimeConfig := getRuntimeConfig(t, cortexSvc) diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index bcf59cdfd3a..c2bcc786d91 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -4,6 +4,7 @@ import ( "errors" "io" "net/http" + "strings" "gopkg.in/yaml.v2" @@ -15,7 +16,8 @@ import ( ) var ( - errMultipleDocuments = errors.New("the provided runtime configuration contains multiple documents") + errMultipleDocuments = errors.New("the provided runtime configuration contains multiple documents") + tenantLimitCheckTargets = []string{All, Distributor, Querier, Ruler} ) // RuntimeConfigValues are values that can be reloaded from configuration file while Cortex is running. @@ -78,9 +80,16 @@ func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) { return nil, errMultipleDocuments } - for _, ul := range overrides.TenantLimits { - if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil { - return nil, err + targetStr := l.cfg.Target.String() + for _, target := range tenantLimitCheckTargets { + if strings.Contains(targetStr, target) { + // only check if target is `all`, `distributor`, "querier", and "ruler" + // refer to https://github.com/cortexproject/cortex/issues/6741#issuecomment-3067244929 + for _, ul := range overrides.TenantLimits { + if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil { + return nil, err + } + } } } diff --git a/pkg/cortex/runtime_config_test.go b/pkg/cortex/runtime_config_test.go index 544e5696cb4..e65398426c2 100644 --- a/pkg/cortex/runtime_config_test.go +++ b/pkg/cortex/runtime_config_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -109,3 +110,106 @@ overrides: assert.Nil(t, actual) } } + +func TestLoad_ShouldNotErrorWithCertainTarget(t *testing.T) { + + tests := []struct { + desc string + target []string + shardByAllLabels bool + isErr bool + }{ + { + desc: "all", + target: []string{All}, + shardByAllLabels: true, + }, + { + desc: "all, shardByAllLabels:false", + target: []string{All}, + shardByAllLabels: false, + isErr: true, + }, + { + desc: "distributor", + target: []string{Distributor}, + shardByAllLabels: true, + }, + { + desc: "distributor, shardByAllLabels:false", + target: []string{Distributor}, + shardByAllLabels: false, + isErr: true, + }, + { + desc: "querier", + target: []string{Querier}, + shardByAllLabels: true, + }, + { + desc: "querier, shardByAllLabels:false", + target: []string{Querier}, + shardByAllLabels: false, + isErr: true, + }, + { + desc: "ruler", + target: []string{Ruler}, + shardByAllLabels: true, + }, + { + desc: "ruler, shardByAllLabels:false", + target: []string{Ruler}, + shardByAllLabels: false, + isErr: true, + }, + { + desc: "ingester", + target: []string{Ingester}, + }, + { + desc: "query frontend", + target: []string{QueryFrontend}, + }, + { + desc: "alertmanager", + target: []string{AlertManager}, + }, + { + desc: "store gateway", + target: []string{StoreGateway}, + }, + { + desc: "compactor", + target: []string{Compactor}, + }, + { + desc: "overrides exporter", + target: []string{OverridesExporter}, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + yamlFile := strings.NewReader(` +overrides: + 'user-1': + max_global_series_per_user: 15000 +`) + + loader := runtimeConfigLoader{} + loader.cfg = Config{ + Target: tc.target, + Distributor: distributor.Config{ShardByAllLabels: tc.shardByAllLabels}, + Ingester: ingester.Config{ActiveSeriesMetricsEnabled: true}, + } + + _, err := loader.load(yamlFile) + if tc.isErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +}