Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type Manager struct {

// a single errG is used for all managed controllers, including those
// that are added after initialization
errG *errgroup.Group
errGCtx context.Context
errG *errgroup.Group // GUARDED_BY(lock)
errGCtx context.Context // GUARDED_BY(lock)

// a registry of cancel functions for each individual controller
sync.RWMutex
cancelFuncs map[Controller]func()
lock sync.RWMutex
cancelFuncs map[Controller]func() // GUARDED_BY(lock)

// for broadcasting events
broadcaster record.EventBroadcaster
Expand Down Expand Up @@ -66,19 +66,19 @@ func NewManager(debugConfig *componentconfig.DebuggingConfiguration, address str
// health / debug endpoints for them. It stops when the context is cancelled.
// It will only have an effect the first time it is called.
func (m *Manager) Start(ctx context.Context, readyc chan<- struct{}, controllers ...Controller) error {
m.RLock()
m.lock.RLock()
if m.errG != nil {
m.RUnlock()
m.lock.RUnlock()
return fmt.Errorf("manager already started")
}
m.RUnlock()
m.lock.RUnlock()

var startErr error
m.once.Do(func() {
m.Lock()
m.lock.Lock()
m.errG, ctx = errgroup.WithContext(ctx)
m.errGCtx = ctx
m.Unlock()
m.lock.Unlock()

// start controllers
if err := m.Go(controllers...); err != nil {
Expand Down Expand Up @@ -109,12 +109,12 @@ func (m *Manager) Start(ctx context.Context, readyc chan<- struct{}, controllers
<-ctx.Done()
m.broadcaster.Shutdown()

m.Lock()
m.lock.Lock()
for ctrl, cancel := range m.cancelFuncs {
cancel()
delete(m.cancelFuncs, ctrl)
}
m.Unlock()
m.lock.Unlock()

// no context passed to shutdown; the errg will block
// until the server is closed
Expand All @@ -137,24 +137,24 @@ func (m *Manager) Start(ctx context.Context, readyc chan<- struct{}, controllers

// Go adds controllers into the existing manager's errgroup
func (m *Manager) Go(controllers ...Controller) error {
m.RLock()
m.lock.RLock()
errG := m.errG
if errG == nil {
m.RUnlock()
m.lock.RUnlock()
return fmt.Errorf("cannot add controllers to an unstarted manager")
}
ctx := m.errGCtx
m.RUnlock()
m.lock.RUnlock()

// start newly added controllers
for _, c := range controllers {
c := c
m.healthzHandler.AddHealthChecker(controllerhealthz.NamedHealthChecker(c.Name(), c.HealthChecker()))
errG.Go(func() error {
ctx, cancel := context.WithCancel(ctx)
m.Lock()
m.lock.Lock()
m.cancelFuncs[c] = cancel
m.Unlock()
m.lock.Unlock()
c.Start(ctx, runtime.GOMAXPROCS(0))
return nil
})
Expand All @@ -171,16 +171,16 @@ func (m *Manager) Go(controllers ...Controller) error {
func (m *Manager) Cancel(controllers ...Controller) {
names := make([]string, 0, len(controllers))
for _, c := range controllers {
m.RLock()
m.lock.RLock()
cancel, ok := m.cancelFuncs[c]
m.RUnlock()
m.lock.RUnlock()
if ok {
cancel()
}
names = append(names, c.Name())
m.Lock()
m.lock.Lock()
delete(m.cancelFuncs, c)
m.Unlock()
m.lock.Unlock()
}
m.healthzHandler.RemoveHealthChecker(names...)
}
4 changes: 2 additions & 2 deletions manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func testController(t *testing.T, name string) Controller {
func requireCancelFnCount(t *testing.T, m *Manager, count int) {
t.Helper()
require.Eventually(t, func() bool {
m.RLock()
defer m.RUnlock()
m.lock.RLock()
defer m.lock.RUnlock()
return len(m.cancelFuncs) == count
}, 100*time.Second, 10*time.Millisecond)
}
Loading