diff --git a/pkg/capacity/topology/nodes.go b/pkg/capacity/topology/nodes.go index d19dbef6b0..77cb55b5cd 100644 --- a/pkg/capacity/topology/nodes.go +++ b/pkg/capacity/topology/nodes.go @@ -21,6 +21,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -159,6 +160,7 @@ type nodeTopology struct { nodeInformer coreinformersv1.NodeInformer csiNodeInformer storageinformersv1.CSINodeInformer queue workqueue.TypedRateLimitingInterface[string] + hasSynced atomic.Bool mutex sync.Mutex // segments hold a list of all currently known topology segments. @@ -209,14 +211,20 @@ func (nt *nodeTopology) RunWorker(ctx context.Context) { } } +func (nt *nodeTopology) upstreamSynced() bool { + return nt.nodeInformer.Informer().HasSynced() && + nt.csiNodeInformer.Informer().HasSynced() +} + func (nt *nodeTopology) HasSynced() bool { - if nt.nodeInformer.Informer().HasSynced() && - nt.csiNodeInformer.Informer().HasSynced() { - // Now that both informers are up-to-date, use that - // information to update our own view of the world. - nt.sync(context.Background()) + if nt.hasSynced.Load() { return true } + if nt.upstreamSynced() { + // Now that both informers are up-to-date, + // trigger a sync to update the list of topology segments. + nt.queue.Add("") + } return false } @@ -231,6 +239,11 @@ func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool { } func (nt *nodeTopology) sync(_ context.Context) { + if !nt.hasSynced.Load() && nt.upstreamSynced() { + // We are not yet synced, but the upstream informers are. + // we will become synced when this function returns + defer nt.hasSynced.Store(true) + } // For all nodes on which the driver is registered, collect the topology key/value pairs // and sort them by key name to make the result deterministic. Skip all segments that have // been seen before. diff --git a/pkg/capacity/topology/nodes_test.go b/pkg/capacity/topology/nodes_test.go index 4ea2f39367..6410832d09 100644 --- a/pkg/capacity/topology/nodes_test.go +++ b/pkg/capacity/topology/nodes_test.go @@ -566,6 +566,41 @@ func TestNodeTopology(t *testing.T) { } } +func TestHasSynced(t *testing.T) { + client := fakeclientset.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second /* no resync */) + nodeInformer := informerFactory.Core().V1().Nodes() + csiNodeInformer := informerFactory.Storage().V1().CSINodes() + rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 2*time.Second) + queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "items"}) + + nt := NewNodeTopology( + driverName, + client, + nodeInformer, + csiNodeInformer, + queue, + ).(*nodeTopology) + + ctx := t.Context() + nt.sync(ctx) + if nt.HasSynced() { + t.Fatalf("upstream informer not started yet, expected HasSynced to return false") + } + + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + if nt.HasSynced() { // should enqueue a work item + t.Fatalf("nt not started, expected HasSynced to return false") + } + + // consume the work item + nt.processNextWorkItem(ctx) + if !nt.HasSynced() { + t.Fatalf("nt should be synced now") + } +} + type segmentsFound map[*Segment]bool func (sf segmentsFound) Found() []*Segment {