Skip to content

modify: support rollback #513

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (ctrl *resizeController) Run(workers int, ctx context.Context) {
go ctrl.slowSet.Run(stopCh)
}

for i := 0; i < workers; i++ {
for range workers {
go wait.Until(ctrl.syncPVCs, 0, stopCh)
}

Expand Down
8 changes: 3 additions & 5 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestController(t *testing.T) {
disableVolumeInUseErrorHandler: true,
},
} {
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true, false)
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true)
driverName, _ := client.GetDriverName(context.TODO())

var expectedCap resource.Quantity
Expand Down Expand Up @@ -261,9 +261,7 @@ func TestController(t *testing.T) {
stopCh := make(chan struct{})
informerFactory.Start(stopCh)

ctx := context.TODO()
defer ctx.Done()
go controller.Run(1, ctx)
go controller.Run(1, t.Context())

for _, obj := range initialObjects {
switch obj.(type) {
Expand Down Expand Up @@ -380,7 +378,7 @@ func TestResizePVC(t *testing.T) {
},
} {
t.Run(test.Name, func(t *testing.T) {
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true, false)
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true)
if test.expansionError != nil {
client.SetExpansionError(test.expansionError)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/expand_and_recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestExpandAndRecover(t *testing.T) {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true)
client := csi.NewMockClient("foo", !test.disableNodeExpansion, !test.disableControllerExpansion, false, true, true, false)
client := csi.NewMockClient("foo", !test.disableNodeExpansion, !test.disableControllerExpansion, false, true, true)
driverName, _ := client.GetDriverName(context.TODO())
if test.expansionError != nil {
client.SetExpansionError(test.expansionError)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/resize_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestResizeFunctions(t *testing.T) {
tc := test
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true)
client := csi.NewMockClient("foo", true, true, false, true, true, false)
client := csi.NewMockClient("foo", true, true, false, true, true)
driverName, _ := client.GetDriverName(context.TODO())

pvc := test.pvc
Expand Down
17 changes: 14 additions & 3 deletions pkg/csi/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package csi
import (
"context"
"fmt"
"maps"
"sync"
"sync/atomic"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand All @@ -16,7 +18,6 @@ func NewMockClient(
supportsControllerModify bool,
supportsPluginControllerService bool,
supportsControllerSingleNodeMultiWriter bool,
supportsExtraModifyMetada bool,
) *MockClient {
return &MockClient{
name: name,
Expand All @@ -25,7 +26,7 @@ func NewMockClient(
supportsControllerModify: supportsControllerModify,
supportsPluginControllerService: supportsPluginControllerService,
supportsControllerSingleNodeMultiWriter: supportsControllerSingleNodeMultiWriter,
extraModifyMetadata: supportsExtraModifyMetada,
modifiedParameters: make(map[string]string),
}
}

Expand All @@ -43,7 +44,8 @@ type MockClient struct {
checkMigratedLabel bool
usedSecrets atomic.Pointer[map[string]string]
usedCapability atomic.Pointer[csi.VolumeCapability]
extraModifyMetadata bool
modifyMu sync.Mutex
modifiedParameters map[string]string
}

func (c *MockClient) GetDriverName(context.Context) (string, error) {
Expand Down Expand Up @@ -116,6 +118,12 @@ func (c *MockClient) GetModifyCount() int {
return int(c.modifyCalled.Load())
}

func (c *MockClient) GetModifiedParameters() map[string]string {
c.modifyMu.Lock()
defer c.modifyMu.Unlock()
return maps.Clone(c.modifiedParameters)
}

func (c *MockClient) GetCapability() *csi.VolumeCapability {
return c.usedCapability.Load()
}
Expand All @@ -138,5 +146,8 @@ func (c *MockClient) Modify(
if c.modifyError != nil {
return c.modifyError
}
c.modifyMu.Lock()
defer c.modifyMu.Unlock()
maps.Copy(c.modifiedParameters, mutableParameters)
return nil
}
2 changes: 1 addition & 1 deletion pkg/modifier/csi_modifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestNewModifier(t *testing.T) {
SupportsControllerModify: false,
},
} {
client := csi.NewMockClient("mock", false, false, c.SupportsControllerModify, false, false, false)
client := csi.NewMockClient("mock", false, false, c.SupportsControllerModify, false, false)
driverName := "mock-driver"
k8sClient, informerFactory := fakeK8s()
_, err := NewModifierFromClient(client, 0, k8sClient, informerFactory, false, driverName)
Expand Down
28 changes: 12 additions & 16 deletions pkg/modifycontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package modifycontroller
import (
"context"
"fmt"
"sync"
"time"

"github.com/kubernetes-csi/external-resizer/pkg/util"
Expand All @@ -38,6 +39,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)

// ModifyController watches PVCs and checks if they are requesting an modify operation.
Expand All @@ -61,7 +63,7 @@ type modifyController struct {
vacListerSynced cache.InformerSynced
extraModifyMetadata bool
// the key of the map is {PVC_NAMESPACE}/{PVC_NAME}
uncertainPVCs map[string]v1.PersistentVolumeClaim
uncertainPVCs sync.Map
// slowSet tracks PVCs for which modification failed with infeasible error and should be retried at slower rate.
slowSet *slowset.SlowSet
}
Expand Down Expand Up @@ -121,7 +123,6 @@ func NewModifyController(
}

func (ctrl *modifyController) initUncertainPVCs() error {
ctrl.uncertainPVCs = make(map[string]v1.PersistentVolumeClaim)
allPVCs, err := ctrl.pvcLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list pvcs when init uncertain pvcs: %v", err)
Expand All @@ -133,7 +134,7 @@ func (ctrl *modifyController) initUncertainPVCs() error {
if err != nil {
return err
}
ctrl.uncertainPVCs[pvcKey] = *pvc.DeepCopy()
ctrl.uncertainPVCs.Store(pvcKey, pvc)
}
}

Expand All @@ -160,12 +161,11 @@ func (ctrl *modifyController) updatePVC(oldObj, newObj interface{}) {
}

// Only trigger modify volume if the following conditions are met
// 1. Non empty vac name
// 2. oldVacName != newVacName
// 3. PVC is in Bound state
oldVacName := oldPVC.Spec.VolumeAttributesClassName
newVacName := newPVC.Spec.VolumeAttributesClassName
if newVacName != nil && *newVacName != "" && (oldVacName == nil || *newVacName != *oldVacName) && oldPVC.Status.Phase == v1.ClaimBound {
// 1. oldVacName != newVacName
// 2. PVC is in Bound state
oldVacName := ptr.Deref(oldPVC.Spec.VolumeAttributesClassName, "")
newVacName := ptr.Deref(newPVC.Spec.VolumeAttributesClassName, "")
if newVacName != oldVacName && oldPVC.Status.Phase == v1.ClaimBound {
_, err := ctrl.pvLister.Get(oldPVC.Spec.VolumeName)
if err != nil {
klog.Errorf("Get PV %q of pvc %q in PVInformer cache failed: %v", oldPVC.Spec.VolumeName, klog.KObj(oldPVC), err)
Expand All @@ -187,10 +187,7 @@ func (ctrl *modifyController) deletePVC(obj interface{}) {
}

func (ctrl *modifyController) init(ctx context.Context) bool {
informersSyncd := []cache.InformerSynced{ctrl.pvListerSynced, ctrl.pvcListerSynced}
informersSyncd = append(informersSyncd, ctrl.vacListerSynced)

if !cache.WaitForCacheSync(ctx.Done(), informersSyncd...) {
if !cache.WaitForCacheSync(ctx.Done(), ctrl.pvListerSynced, ctrl.pvcListerSynced, ctrl.vacListerSynced) {
klog.ErrorS(nil, "Cannot sync pod, pv, pvc or vac caches")
return false
}
Expand Down Expand Up @@ -220,7 +217,7 @@ func (ctrl *modifyController) Run(
// Starts go-routine that deletes expired slowSet entries.
go ctrl.slowSet.Run(stopCh)

for i := 0; i < workers; i++ {
for range workers {
go wait.Until(ctrl.sync, 0, stopCh)
}

Expand Down Expand Up @@ -277,8 +274,7 @@ func (ctrl *modifyController) syncPVC(key string) error {
return nil
}

vacName := pvc.Spec.VolumeAttributesClassName
if vacName != nil && *vacName != "" && pvc.Status.Phase == v1.ClaimBound {
if pvc.Status.Phase == v1.ClaimBound {
_, _, err, _ := ctrl.modify(pvc, pv)
if err != nil {
return err
Expand Down
Loading