From b80ae72fa50e6627baef7511c2df9cf474ac68a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Thu, 30 Oct 2025 22:26:33 +0800 Subject: [PATCH] capacity: fix duplicate topology When the controller starts, 2 sync() call will run simultaneously, one from HasSynced(), another from processNextWorkItem(). Each will produce an instance for the same topology segment, and pass it to callbacks. This will result in duplicated entries in capacities map, resulting in: either - Two CSIStorageCapacity object get created for the same topology, or - The same CSIStorageCapacity object get assigned to two keys in capacities map. When one of them is updated, the other one will hold an outdated object and all subsequent update will fail with conflict. --- pkg/capacity/topology/nodes.go | 23 ++++++++++++++----- pkg/capacity/topology/nodes_test.go | 35 +++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/pkg/capacity/topology/nodes.go b/pkg/capacity/topology/nodes.go index d19dbef6b0..77cb55b5cd 100644 --- a/pkg/capacity/topology/nodes.go +++ b/pkg/capacity/topology/nodes.go @@ -21,6 +21,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -159,6 +160,7 @@ type nodeTopology struct { nodeInformer coreinformersv1.NodeInformer csiNodeInformer storageinformersv1.CSINodeInformer queue workqueue.TypedRateLimitingInterface[string] + hasSynced atomic.Bool mutex sync.Mutex // segments hold a list of all currently known topology segments. @@ -209,14 +211,20 @@ func (nt *nodeTopology) RunWorker(ctx context.Context) { } } +func (nt *nodeTopology) upstreamSynced() bool { + return nt.nodeInformer.Informer().HasSynced() && + nt.csiNodeInformer.Informer().HasSynced() +} + func (nt *nodeTopology) HasSynced() bool { - if nt.nodeInformer.Informer().HasSynced() && - nt.csiNodeInformer.Informer().HasSynced() { - // Now that both informers are up-to-date, use that - // information to update our own view of the world. - nt.sync(context.Background()) + if nt.hasSynced.Load() { return true } + if nt.upstreamSynced() { + // Now that both informers are up-to-date, + // trigger a sync to update the list of topology segments. + nt.queue.Add("") + } return false } @@ -231,6 +239,11 @@ func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool { } func (nt *nodeTopology) sync(_ context.Context) { + if !nt.hasSynced.Load() && nt.upstreamSynced() { + // We are not yet synced, but the upstream informers are. + // we will become synced when this function returns + defer nt.hasSynced.Store(true) + } // For all nodes on which the driver is registered, collect the topology key/value pairs // and sort them by key name to make the result deterministic. Skip all segments that have // been seen before. diff --git a/pkg/capacity/topology/nodes_test.go b/pkg/capacity/topology/nodes_test.go index 4ea2f39367..6410832d09 100644 --- a/pkg/capacity/topology/nodes_test.go +++ b/pkg/capacity/topology/nodes_test.go @@ -566,6 +566,41 @@ func TestNodeTopology(t *testing.T) { } } +func TestHasSynced(t *testing.T) { + client := fakeclientset.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second /* no resync */) + nodeInformer := informerFactory.Core().V1().Nodes() + csiNodeInformer := informerFactory.Storage().V1().CSINodes() + rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 2*time.Second) + queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "items"}) + + nt := NewNodeTopology( + driverName, + client, + nodeInformer, + csiNodeInformer, + queue, + ).(*nodeTopology) + + ctx := t.Context() + nt.sync(ctx) + if nt.HasSynced() { + t.Fatalf("upstream informer not started yet, expected HasSynced to return false") + } + + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + if nt.HasSynced() { // should enqueue a work item + t.Fatalf("nt not started, expected HasSynced to return false") + } + + // consume the work item + nt.processNextWorkItem(ctx) + if !nt.HasSynced() { + t.Fatalf("nt should be synced now") + } +} + type segmentsFound map[*Segment]bool func (sf segmentsFound) Found() []*Segment {