diff --git a/pkg/alertmanager/distributor_test.go b/pkg/alertmanager/distributor_test.go index 40ef486cfaa..6364a20f4e9 100644 --- a/pkg/alertmanager/distributor_test.go +++ b/pkg/alertmanager/distributor_test.go @@ -276,7 +276,7 @@ func TestDistributor_DistributeRequest(t *testing.T) { req.RequestURI = url var allowedTenants *util.AllowedTenants if c.isTenantDisabled { - allowedTenants = util.NewAllowedTenants(nil, []string{"1"}) + allowedTenants = util.NewAllowedTenants(util.AllowedTenantConfig{DisabledTenants: []string{"1"}}, nil) } w := httptest.NewRecorder() diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index e868c0cd51a..80553e7b743 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -86,8 +86,9 @@ type MultitenantAlertmanagerConfig struct { // For the state persister. Persister PersisterConfig `yaml:",inline"` - EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` - DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` + util.AllowedTenantConfig `yaml:",inline"` + + AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"` } type ClusterConfig struct { @@ -366,7 +367,7 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC logger: log.With(logger, "component", "MultiTenantAlertmanager"), registry: registerer, limits: limits, - allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants), + allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, cfg.AllowedTenantConfigFn), ringCheckErrors: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_alertmanager_ring_check_errors_total", Help: "Number of errors that have occurred when checking the ring for ownership.", @@ -453,6 +454,10 @@ func (h *handlerForGRPCServer) ServeHTTP(w http.ResponseWriter, req *http.Reques } func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { + if err := services.StartAndAwaitRunning(ctx, am.allowedTenants); err != nil { + return errors.Wrap(err, "failed to start allowed tenants service") + } + err = am.migrateStateFilesToPerTenantDirectories() if err != nil { return err @@ -733,6 +738,7 @@ func (am *MultitenantAlertmanager) stopping(_ error) error { // subservices manages ring and lifecycler, if sharding was enabled. _ = services.StopManagerAndAwaitStopped(context.Background(), am.subservices) } + services.StopAndAwaitTerminated(context.Background(), am.allowedTenants) //nolint:errcheck return nil } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index e665d3ec0db..e360e88b525 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -30,7 +30,6 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/backoff" - "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -187,8 +186,9 @@ type Config struct { // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` - EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` - DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` + // Allowed TenantConfig + util.AllowedTenantConfig `yaml:",inline"` + AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"` // Compactors sharding. ShardingEnabled bool `yaml:"sharding_enabled"` @@ -396,7 +396,7 @@ func newCompactor( bucketClientFactory: bucketClientFactory, blocksGrouperFactory: blocksGrouperFactory, blocksCompactorFactory: blocksCompactorFactory, - allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants), + allowedTenants: util.NewAllowedTenants(compactorCfg.AllowedTenantConfig, compactorCfg.AllowedTenantConfigFn), compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", @@ -482,6 +482,10 @@ func newCompactor( func (c *Compactor) starting(ctx context.Context) error { var err error + if err := services.StartAndAwaitRunning(ctx, c.allowedTenants); err != nil { + return errors.Wrap(err, "failed to start allowed tenants service") + } + // Create bucket client. c.bucketClient, err = c.bucketClientFactory(ctx) if err != nil { @@ -578,7 +582,8 @@ func (c *Compactor) starting(ctx context.Context) error { func (c *Compactor) stopping(_ error) error { ctx := context.Background() - services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck + services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck + services.StopAndAwaitTerminated(ctx, c.allowedTenants) //nolint:errcheck if c.ringSubservices != nil { return services.StopManagerAndAwaitStopped(ctx, c.ringSubservices) } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 9470b86042b..7f0c648ddcd 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -581,6 +581,8 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { return nil, err } + t.Cfg.Ruler.AllowedTenantConfigFn = rulerAllowedTenant(t.RuntimeConfig) + t.Ruler, err = ruler.NewRuler( t.Cfg.Ruler, manager, @@ -626,6 +628,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { if err != nil { return } + t.Cfg.Alertmanager.AllowedTenantConfigFn = alertManagerAllowedTenant(t.RuntimeConfig) t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { @@ -639,6 +642,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { func (t *Cortex) initCompactor() (serv services.Service, err error) { t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Compactor.AllowedTenantConfigFn = compactorAllowedTenant(t.RuntimeConfig) t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides) if err != nil { return diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 67bfd928127..1ab7c48996d 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -18,6 +18,12 @@ var ( errMultipleDocuments = errors.New("the provided runtime configuration contains multiple documents") ) +type runtimeAllowedTenantConfig struct { + alertManager *util.AllowedTenantConfig `yaml:"alert_manager"` + compactor *util.AllowedTenantConfig `yaml:"compactor"` + ruler *util.AllowedTenantConfig `yaml:"ruler"` +} + // runtimeConfigValues are values that can be reloaded from configuration file while Cortex is running. // Reloading is done by runtime_config.Manager, which also keeps the currently loaded config. // These values are then pushed to the components that are interested in them. @@ -26,9 +32,9 @@ type runtimeConfigValues struct { Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` - IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"` - IngesterLimits *ingester.InstanceLimits `yaml:"ingester_limits"` + + AllowedTenantConfig runtimeAllowedTenantConfig `yaml:"allowed_tenant"` } // runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager @@ -118,6 +124,48 @@ func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.Ins } } +func alertManagerAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig { + if manager == nil { + return nil + } + + return func() *util.AllowedTenantConfig { + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + return cfg.AllowedTenantConfig.alertManager + } + return nil + } +} + +func compactorAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig { + if manager == nil { + return nil + } + + return func() *util.AllowedTenantConfig { + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + return cfg.AllowedTenantConfig.compactor + } + return nil + } +} + +func rulerAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig { + if manager == nil { + return nil + } + + return func() *util.AllowedTenantConfig { + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + return cfg.AllowedTenantConfig.ruler + } + return nil + } +} + func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dde76d16ef3..988ef027e2d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2595,7 +2595,7 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) { tenants := r.Form[tenantParam] - allowedUsers := util.NewAllowedTenants(tenants, nil) + allowedUsers := util.NewAllowedTenants(util.AllowedTenantConfig{DisabledTenants: nil, EnabledTenants: tenants}, nil) run := func() { ingCtx := i.BasicService.ServiceContext() if ingCtx == nil || ingCtx.Err() != nil { diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index e932292213e..e2bba6b39c2 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -116,8 +116,8 @@ type Config struct { EnableAPI bool `yaml:"enable_api"` - EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` - DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` + util.AllowedTenantConfig `yaml:",inline"` + AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"` RingCheckPeriod time.Duration `yaml:"-"` @@ -266,7 +266,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger: logger, limits: limits, clientsPool: clientPool, - allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants), + allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, cfg.AllowedTenantConfigFn), ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ruler_ring_check_errors_total", @@ -334,6 +334,9 @@ func enableSharding(r *Ruler, ringStore kv.Client) error { func (r *Ruler) starting(ctx context.Context) error { // If sharding is enabled, start the used subservices. + if err := services.StartAndAwaitRunning(ctx, r.allowedTenants); err != nil { + return errors.Wrap(err, "failed to start allowed tenants service") + } if r.cfg.EnableSharding { var err error @@ -361,6 +364,9 @@ func (r *Ruler) stopping(_ error) error { if r.subservices != nil { _ = services.StopManagerAndAwaitStopped(context.Background(), r.subservices) } + + services.StopAndAwaitTerminated(context.Background(), r.allowedTenants) //nolint:errcheck + return nil } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 2bcb6f813d2..249f092acfb 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -899,8 +899,10 @@ func TestSharding(t *testing.T) { HeartbeatTimeout: 1 * time.Minute, }, FlushCheckPeriod: 0, - EnabledTenants: tc.enabledUsers, - DisabledTenants: tc.disabledUsers, + AllowedTenantConfig: util.AllowedTenantConfig{ + EnabledTenants: tc.enabledUsers, + DisabledTenants: tc.disabledUsers, + }, } r := buildRuler(t, cfg, nil, store, nil) diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index 88c7a6333b8..a388162e719 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -1,36 +1,99 @@ package util +import ( + "context" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/services" +) + +type AllowedTenantConfig struct { + EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` + DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` +} + // AllowedTenants that can answer whether tenant is allowed or not based on configuration. // Default value (nil) allows all tenants. type AllowedTenants struct { + services.Service + // If empty, all tenants are enabled. If not empty, only tenants in the map are enabled. enabled map[string]struct{} // If empty, no tenants are disabled. If not empty, tenants in the map are disabled. disabled map[string]struct{} + + allowedTenantConfigFn func() *AllowedTenantConfig + defaultCfg *AllowedTenantConfig + m sync.RWMutex } // NewAllowedTenants builds new allowed tenants based on enabled and disabled tenants. // If there are any enabled tenants, then only those tenants are allowed. // If there are any disabled tenants, then tenant from that list, that would normally be allowed, is disabled instead. -func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { - a := &AllowedTenants{} +func NewAllowedTenants(cfg AllowedTenantConfig, allowedTenantConfigFn func() *AllowedTenantConfig) *AllowedTenants { + if allowedTenantConfigFn == nil { + allowedTenantConfigFn = func() *AllowedTenantConfig { return &cfg } + } - if len(enabled) > 0 { - a.enabled = make(map[string]struct{}, len(enabled)) - for _, u := range enabled { + a := &AllowedTenants{ + allowedTenantConfigFn: allowedTenantConfigFn, + defaultCfg: &cfg, + } + + a.setConfig(allowedTenantConfigFn()) + + a.Service = services.NewBasicService(nil, a.running, nil) + + return a +} + +func (a *AllowedTenants) running(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if a.allowedTenantConfigFn == nil { + c := a.allowedTenantConfigFn() + if c == nil { + c = a.defaultCfg + } + a.setConfig(c) + } + case <-ctx.Done(): + return nil + } + } +} + +func (a *AllowedTenants) setConfig(cfg *AllowedTenantConfig) { + if a == nil { + return + } + + a.m.Lock() + defer a.m.Unlock() + if len(cfg.EnabledTenants) > 0 { + a.enabled = make(map[string]struct{}, len(cfg.EnabledTenants)) + for _, u := range cfg.EnabledTenants { a.enabled[u] = struct{}{} } + } else { + cfg.EnabledTenants = nil } - if len(disabled) > 0 { - a.disabled = make(map[string]struct{}, len(disabled)) - for _, u := range disabled { + if len(cfg.DisabledTenants) > 0 { + a.disabled = make(map[string]struct{}, len(cfg.DisabledTenants)) + for _, u := range cfg.DisabledTenants { a.disabled[u] = struct{}{} } + } else { + a.disabled = nil } - - return a } func (a *AllowedTenants) IsAllowed(tenantID string) bool { @@ -38,6 +101,9 @@ func (a *AllowedTenants) IsAllowed(tenantID string) bool { return true } + a.m.RLock() + defer a.m.RUnlock() + if len(a.enabled) > 0 { if _, ok := a.enabled[tenantID]; !ok { return false diff --git a/pkg/util/allowed_tenants_test.go b/pkg/util/allowed_tenants_test.go index 221e0a9e603..6ef2497079a 100644 --- a/pkg/util/allowed_tenants_test.go +++ b/pkg/util/allowed_tenants_test.go @@ -7,14 +7,14 @@ import ( ) func TestAllowedTenants_NoConfig(t *testing.T) { - a := NewAllowedTenants(nil, nil) + a := NewAllowedTenants(AllowedTenantConfig{}, nil) require.True(t, a.IsAllowed("all")) require.True(t, a.IsAllowed("tenants")) require.True(t, a.IsAllowed("allowed")) } func TestAllowedTenants_Enabled(t *testing.T) { - a := NewAllowedTenants([]string{"A", "B"}, nil) + a := NewAllowedTenants(AllowedTenantConfig{EnabledTenants: []string{"A", "B"}}, nil) require.True(t, a.IsAllowed("A")) require.True(t, a.IsAllowed("B")) require.False(t, a.IsAllowed("C")) @@ -22,7 +22,7 @@ func TestAllowedTenants_Enabled(t *testing.T) { } func TestAllowedTenants_Disabled(t *testing.T) { - a := NewAllowedTenants(nil, []string{"A", "B"}) + a := NewAllowedTenants(AllowedTenantConfig{DisabledTenants: []string{"A", "B"}}, nil) require.False(t, a.IsAllowed("A")) require.False(t, a.IsAllowed("B")) require.True(t, a.IsAllowed("C")) @@ -30,7 +30,7 @@ func TestAllowedTenants_Disabled(t *testing.T) { } func TestAllowedTenants_Combination(t *testing.T) { - a := NewAllowedTenants([]string{"A", "B"}, []string{"B", "C"}) + a := NewAllowedTenants(AllowedTenantConfig{EnabledTenants: []string{"A", "B"}, DisabledTenants: []string{"B", "C"}}, nil) require.True(t, a.IsAllowed("A")) // enabled, and not disabled require.False(t, a.IsAllowed("B")) // enabled, but also disabled require.False(t, a.IsAllowed("C")) // disabled