Skip to content

Commit de81be0

Browse files
committed
capacity: fix duplicate topology
When the controller starts, 2 sync() all will run simultaneously, one from HasSynced(), another from processNextWorkItem(). Each will produce an instance for the same topology segment, and pass it to callbacks. This will result in duplicated entries in capacities map, resulting in: either - Two CSIStorageCapacity object get created for the same topology, or - The same CSIStorageCapacity object get assigned to two keys in capacities map. When one of them is updated, the other one will hold an outdated object and all subsequent update will fail with conflict.
1 parent 1b71152 commit de81be0

File tree

2 files changed

+53
-5
lines changed

2 files changed

+53
-5
lines changed

pkg/capacity/topology/nodes.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"reflect"
2222
"sort"
2323
"sync"
24+
"sync/atomic"
2425

2526
v1 "k8s.io/api/core/v1"
2627
storagev1 "k8s.io/api/storage/v1"
@@ -159,6 +160,7 @@ type nodeTopology struct {
159160
nodeInformer coreinformersv1.NodeInformer
160161
csiNodeInformer storageinformersv1.CSINodeInformer
161162
queue workqueue.TypedRateLimitingInterface[string]
163+
hasSynced atomic.Bool
162164

163165
mutex sync.Mutex
164166
// segments hold a list of all currently known topology segments.
@@ -209,14 +211,20 @@ func (nt *nodeTopology) RunWorker(ctx context.Context) {
209211
}
210212
}
211213

214+
func (nt *nodeTopology) upstreamSynced() bool {
215+
return nt.nodeInformer.Informer().HasSynced() &&
216+
nt.csiNodeInformer.Informer().HasSynced()
217+
}
218+
212219
func (nt *nodeTopology) HasSynced() bool {
213-
if nt.nodeInformer.Informer().HasSynced() &&
214-
nt.csiNodeInformer.Informer().HasSynced() {
215-
// Now that both informers are up-to-date, use that
216-
// information to update our own view of the world.
217-
nt.sync(context.Background())
220+
if nt.hasSynced.Load() {
218221
return true
219222
}
223+
if nt.upstreamSynced() {
224+
// Now that both informers are up-to-date,
225+
// trigger a sync to update the list of topology segments.
226+
nt.queue.Add("")
227+
}
220228
return false
221229
}
222230

@@ -231,6 +239,11 @@ func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool {
231239
}
232240

233241
func (nt *nodeTopology) sync(_ context.Context) {
242+
if !nt.hasSynced.Load() && nt.upstreamSynced() {
243+
// We are not yet synced, but the upstream informers are.
244+
// we will become synced when this function returns
245+
defer nt.hasSynced.Store(true)
246+
}
234247
// For all nodes on which the driver is registered, collect the topology key/value pairs
235248
// and sort them by key name to make the result deterministic. Skip all segments that have
236249
// been seen before.

pkg/capacity/topology/nodes_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,41 @@ func TestNodeTopology(t *testing.T) {
566566
}
567567
}
568568

569+
func TestHasSynced(t *testing.T) {
570+
client := fakeclientset.NewSimpleClientset()
571+
informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second /* no resync */)
572+
nodeInformer := informerFactory.Core().V1().Nodes()
573+
csiNodeInformer := informerFactory.Storage().V1().CSINodes()
574+
rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 2*time.Second)
575+
queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "items"})
576+
577+
nt := NewNodeTopology(
578+
driverName,
579+
client,
580+
nodeInformer,
581+
csiNodeInformer,
582+
queue,
583+
).(*nodeTopology)
584+
585+
ctx := t.Context()
586+
nt.sync(ctx)
587+
if nt.HasSynced() {
588+
t.Fatalf("upstream informer not started yet, expected HasSynced to return false")
589+
}
590+
591+
informerFactory.Start(ctx.Done())
592+
informerFactory.WaitForCacheSync(ctx.Done())
593+
if nt.HasSynced() { // should enqueue a work item
594+
t.Fatalf("nt not started, expected HasSynced to return false")
595+
}
596+
597+
// consume the work item
598+
nt.processNextWorkItem(ctx)
599+
if !nt.HasSynced() {
600+
t.Fatalf("nt should be synced now")
601+
}
602+
}
603+
569604
type segmentsFound map[*Segment]bool
570605

571606
func (sf segmentsFound) Found() []*Segment {

0 commit comments

Comments
 (0)