Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions pkg/capacity/topology/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
Expand Down Expand Up @@ -159,6 +160,7 @@ type nodeTopology struct {
nodeInformer coreinformersv1.NodeInformer
csiNodeInformer storageinformersv1.CSINodeInformer
queue workqueue.TypedRateLimitingInterface[string]
hasSynced atomic.Bool

mutex sync.Mutex
// segments hold a list of all currently known topology segments.
Expand Down Expand Up @@ -209,14 +211,20 @@ func (nt *nodeTopology) RunWorker(ctx context.Context) {
}
}

func (nt *nodeTopology) upstreamSynced() bool {
return nt.nodeInformer.Informer().HasSynced() &&
nt.csiNodeInformer.Informer().HasSynced()
}

func (nt *nodeTopology) HasSynced() bool {
if nt.nodeInformer.Informer().HasSynced() &&
nt.csiNodeInformer.Informer().HasSynced() {
// Now that both informers are up-to-date, use that
// information to update our own view of the world.
nt.sync(context.Background())
if nt.hasSynced.Load() {
return true
}
if nt.upstreamSynced() {
// Now that both informers are up-to-date,
// trigger a sync to update the list of topology segments.
nt.queue.Add("")
}
return false
}

Expand All @@ -231,6 +239,11 @@ func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool {
}

func (nt *nodeTopology) sync(_ context.Context) {
if !nt.hasSynced.Load() && nt.upstreamSynced() {
// We are not yet synced, but the upstream informers are.
// we will become synced when this function returns
defer nt.hasSynced.Store(true)
}
// For all nodes on which the driver is registered, collect the topology key/value pairs
// and sort them by key name to make the result deterministic. Skip all segments that have
// been seen before.
Expand Down
35 changes: 35 additions & 0 deletions pkg/capacity/topology/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,41 @@ func TestNodeTopology(t *testing.T) {
}
}

func TestHasSynced(t *testing.T) {
client := fakeclientset.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second /* no resync */)
nodeInformer := informerFactory.Core().V1().Nodes()
csiNodeInformer := informerFactory.Storage().V1().CSINodes()
rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 2*time.Second)
queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "items"})

nt := NewNodeTopology(
driverName,
client,
nodeInformer,
csiNodeInformer,
queue,
).(*nodeTopology)

ctx := t.Context()
nt.sync(ctx)
if nt.HasSynced() {
t.Fatalf("upstream informer not started yet, expected HasSynced to return false")
}

informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
if nt.HasSynced() { // should enqueue a work item
t.Fatalf("nt not started, expected HasSynced to return false")
}

// consume the work item
nt.processNextWorkItem(ctx)
if !nt.HasSynced() {
t.Fatalf("nt should be synced now")
}
}

type segmentsFound map[*Segment]bool

func (sf segmentsFound) Found() []*Segment {
Expand Down