Skip to content

Allowed tenant runtime #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
req.RequestURI = url
var allowedTenants *util.AllowedTenants
if c.isTenantDisabled {
allowedTenants = util.NewAllowedTenants(nil, []string{"1"})
allowedTenants = util.NewAllowedTenants(util.AllowedTenantConfig{DisabledTenants: []string{"1"}}, nil)
}

w := httptest.NewRecorder()
Expand Down
12 changes: 9 additions & 3 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ type MultitenantAlertmanagerConfig struct {
// For the state persister.
Persister PersisterConfig `yaml:",inline"`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
util.AllowedTenantConfig `yaml:",inline"`

AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"`
}

type ClusterConfig struct {
Expand Down Expand Up @@ -366,7 +367,7 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC
logger: log.With(logger, "component", "MultiTenantAlertmanager"),
registry: registerer,
limits: limits,
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, cfg.AllowedTenantConfigFn),
ringCheckErrors: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_alertmanager_ring_check_errors_total",
Help: "Number of errors that have occurred when checking the ring for ownership.",
Expand Down Expand Up @@ -453,6 +454,10 @@ func (h *handlerForGRPCServer) ServeHTTP(w http.ResponseWriter, req *http.Reques
}

func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) {
if err := services.StartAndAwaitRunning(ctx, am.allowedTenants); err != nil {
return errors.Wrap(err, "failed to start allowed tenants service")
}

err = am.migrateStateFilesToPerTenantDirectories()
if err != nil {
return err
Expand Down Expand Up @@ -733,6 +738,7 @@ func (am *MultitenantAlertmanager) stopping(_ error) error {
// subservices manages ring and lifecycler, if sharding was enabled.
_ = services.StopManagerAndAwaitStopped(context.Background(), am.subservices)
}
services.StopAndAwaitTerminated(context.Background(), am.allowedTenants) //nolint:errcheck
return nil
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/backoff"
"github.com/cortexproject/cortex/pkg/util/flagext"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -187,8 +186,9 @@ type Config struct {
// Whether the migration of block deletion marks to the global markers location is enabled.
BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
// Allowed TenantConfig
util.AllowedTenantConfig `yaml:",inline"`
AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"`

// Compactors sharding.
ShardingEnabled bool `yaml:"sharding_enabled"`
Expand Down Expand Up @@ -396,7 +396,7 @@ func newCompactor(
bucketClientFactory: bucketClientFactory,
blocksGrouperFactory: blocksGrouperFactory,
blocksCompactorFactory: blocksCompactorFactory,
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),
allowedTenants: util.NewAllowedTenants(compactorCfg.AllowedTenantConfig, compactorCfg.AllowedTenantConfigFn),

compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_started_total",
Expand Down Expand Up @@ -482,6 +482,10 @@ func newCompactor(
func (c *Compactor) starting(ctx context.Context) error {
var err error

if err := services.StartAndAwaitRunning(ctx, c.allowedTenants); err != nil {
return errors.Wrap(err, "failed to start allowed tenants service")
}

// Create bucket client.
c.bucketClient, err = c.bucketClientFactory(ctx)
if err != nil {
Expand Down Expand Up @@ -578,7 +582,8 @@ func (c *Compactor) starting(ctx context.Context) error {
func (c *Compactor) stopping(_ error) error {
ctx := context.Background()

services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck
services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck
services.StopAndAwaitTerminated(ctx, c.allowedTenants) //nolint:errcheck
if c.ringSubservices != nil {
return services.StopManagerAndAwaitStopped(ctx, c.ringSubservices)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
return nil, err
}

t.Cfg.Ruler.AllowedTenantConfigFn = rulerAllowedTenant(t.RuntimeConfig)

t.Ruler, err = ruler.NewRuler(
t.Cfg.Ruler,
manager,
Expand Down Expand Up @@ -626,6 +628,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
if err != nil {
return
}
t.Cfg.Alertmanager.AllowedTenantConfigFn = alertManagerAllowedTenant(t.RuntimeConfig)

t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
Expand All @@ -639,6 +642,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
func (t *Cortex) initCompactor() (serv services.Service, err error) {
t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort

t.Cfg.Compactor.AllowedTenantConfigFn = compactorAllowedTenant(t.RuntimeConfig)
t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides)
if err != nil {
return
Expand Down
52 changes: 50 additions & 2 deletions pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ var (
errMultipleDocuments = errors.New("the provided runtime configuration contains multiple documents")
)

type runtimeAllowedTenantConfig struct {
alertManager *util.AllowedTenantConfig `yaml:"alert_manager"`
compactor *util.AllowedTenantConfig `yaml:"compactor"`
ruler *util.AllowedTenantConfig `yaml:"ruler"`
}

// runtimeConfigValues are values that can be reloaded from configuration file while Cortex is running.
// Reloading is done by runtime_config.Manager, which also keeps the currently loaded config.
// These values are then pushed to the components that are interested in them.
Expand All @@ -26,9 +32,9 @@ type runtimeConfigValues struct {

Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"`

IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"`

IngesterLimits *ingester.InstanceLimits `yaml:"ingester_limits"`

AllowedTenantConfig runtimeAllowedTenantConfig `yaml:"allowed_tenant"`
}

// runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager
Expand Down Expand Up @@ -118,6 +124,48 @@ func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.Ins
}
}

func alertManagerAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig {
if manager == nil {
return nil
}

return func() *util.AllowedTenantConfig {
val := manager.GetConfig()
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
return cfg.AllowedTenantConfig.alertManager
}
return nil
}
}

func compactorAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig {
if manager == nil {
return nil
}

return func() *util.AllowedTenantConfig {
val := manager.GetConfig()
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
return cfg.AllowedTenantConfig.compactor
}
return nil
}
}

func rulerAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig {
if manager == nil {
return nil
}

return func() *util.AllowedTenantConfig {
val := manager.GetConfig()
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
return cfg.AllowedTenantConfig.ruler
}
return nil
}
}

func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2595,7 +2595,7 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) {

tenants := r.Form[tenantParam]

allowedUsers := util.NewAllowedTenants(tenants, nil)
allowedUsers := util.NewAllowedTenants(util.AllowedTenantConfig{DisabledTenants: nil, EnabledTenants: tenants}, nil)
run := func() {
ingCtx := i.BasicService.ServiceContext()
if ingCtx == nil || ingCtx.Err() != nil {
Expand Down
12 changes: 9 additions & 3 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ type Config struct {

EnableAPI bool `yaml:"enable_api"`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
util.AllowedTenantConfig `yaml:",inline"`
AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"`

RingCheckPeriod time.Duration `yaml:"-"`

Expand Down Expand Up @@ -266,7 +266,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
logger: logger,
limits: limits,
clientsPool: clientPool,
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, cfg.AllowedTenantConfigFn),

ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ruler_ring_check_errors_total",
Expand Down Expand Up @@ -334,6 +334,9 @@ func enableSharding(r *Ruler, ringStore kv.Client) error {

func (r *Ruler) starting(ctx context.Context) error {
// If sharding is enabled, start the used subservices.
if err := services.StartAndAwaitRunning(ctx, r.allowedTenants); err != nil {
return errors.Wrap(err, "failed to start allowed tenants service")
}
if r.cfg.EnableSharding {
var err error

Expand Down Expand Up @@ -361,6 +364,9 @@ func (r *Ruler) stopping(_ error) error {
if r.subservices != nil {
_ = services.StopManagerAndAwaitStopped(context.Background(), r.subservices)
}

services.StopAndAwaitTerminated(context.Background(), r.allowedTenants) //nolint:errcheck

return nil
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,10 @@ func TestSharding(t *testing.T) {
HeartbeatTimeout: 1 * time.Minute,
},
FlushCheckPeriod: 0,
EnabledTenants: tc.enabledUsers,
DisabledTenants: tc.disabledUsers,
AllowedTenantConfig: util.AllowedTenantConfig{
EnabledTenants: tc.enabledUsers,
DisabledTenants: tc.disabledUsers,
},
}

r := buildRuler(t, cfg, nil, store, nil)
Expand Down
86 changes: 76 additions & 10 deletions pkg/util/allowed_tenants.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,109 @@
package util

import (
"context"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
)

type AllowedTenantConfig struct {
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
}

// AllowedTenants that can answer whether tenant is allowed or not based on configuration.
// Default value (nil) allows all tenants.
type AllowedTenants struct {
services.Service

// If empty, all tenants are enabled. If not empty, only tenants in the map are enabled.
enabled map[string]struct{}

// If empty, no tenants are disabled. If not empty, tenants in the map are disabled.
disabled map[string]struct{}

allowedTenantConfigFn func() *AllowedTenantConfig
defaultCfg *AllowedTenantConfig
m sync.RWMutex
}

// NewAllowedTenants builds new allowed tenants based on enabled and disabled tenants.
// If there are any enabled tenants, then only those tenants are allowed.
// If there are any disabled tenants, then tenant from that list, that would normally be allowed, is disabled instead.
func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants {
a := &AllowedTenants{}
func NewAllowedTenants(cfg AllowedTenantConfig, allowedTenantConfigFn func() *AllowedTenantConfig) *AllowedTenants {
if allowedTenantConfigFn == nil {
allowedTenantConfigFn = func() *AllowedTenantConfig { return &cfg }
}

if len(enabled) > 0 {
a.enabled = make(map[string]struct{}, len(enabled))
for _, u := range enabled {
a := &AllowedTenants{
allowedTenantConfigFn: allowedTenantConfigFn,
defaultCfg: &cfg,
}

a.setConfig(allowedTenantConfigFn())

a.Service = services.NewBasicService(nil, a.running, nil)

return a
}

func (a *AllowedTenants) running(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if a.allowedTenantConfigFn == nil {
c := a.allowedTenantConfigFn()
if c == nil {
c = a.defaultCfg
}
a.setConfig(c)
}
case <-ctx.Done():
return nil
}
}
}

func (a *AllowedTenants) setConfig(cfg *AllowedTenantConfig) {
if a == nil {
return
}

a.m.Lock()
defer a.m.Unlock()
if len(cfg.EnabledTenants) > 0 {
a.enabled = make(map[string]struct{}, len(cfg.EnabledTenants))
for _, u := range cfg.EnabledTenants {
a.enabled[u] = struct{}{}
}
} else {
cfg.EnabledTenants = nil
}

if len(disabled) > 0 {
a.disabled = make(map[string]struct{}, len(disabled))
for _, u := range disabled {
if len(cfg.DisabledTenants) > 0 {
a.disabled = make(map[string]struct{}, len(cfg.DisabledTenants))
for _, u := range cfg.DisabledTenants {
a.disabled[u] = struct{}{}
}
} else {
a.disabled = nil
}

return a
}

func (a *AllowedTenants) IsAllowed(tenantID string) bool {
if a == nil {
return true
}

a.m.RLock()
defer a.m.RUnlock()

if len(a.enabled) > 0 {
if _, ok := a.enabled[tenantID]; !ok {
return false
Expand Down
Loading