From 2dbedcb2842ecbd49b3206e4334fff6627f56fff Mon Sep 17 00:00:00 2001 From: "rybas.dmitriy" Date: Tue, 27 Jan 2026 19:12:06 +0300 Subject: [PATCH] Added support for the VolumeAttributesClass Signed-off-by: rybas.dmitriy --- cmd/main.go | 6 + .../charts/crds/templates/lvmvolume.yaml | 19 + deploy/helm/charts/templates/lvm-node.yaml | 3 + deploy/helm/charts/templates/rbac.yaml | 2 +- deploy/helm/charts/values.yaml | 4 +- deploy/lvm-operator.yaml | 19 + deploy/yamls/lvmvolume-crd.yaml | 19 + pkg/apis/openebs.io/lvm/v1alpha1/lvmvolume.go | 13 + .../lvm/v1alpha1/zz_generated.deepcopy.go | 4 + pkg/builder/volbuilder/volume.go | 12 + pkg/driver/config/config.go | 3 + pkg/driver/controller.go | 54 ++- pkg/driver/params.go | 269 +++++++++++++++ pkg/driver/params_test.go | 324 ++++++++++++++++++ pkg/lvm/iolimiter_cgroup.go | 287 ++++++++++++++++ pkg/lvm/iolimiter_cgroup_test.go | 158 +++++++++ pkg/lvm/mount.go | 43 ++- pkg/mgmt/volume/volume.go | 16 + 18 files changed, 1239 insertions(+), 16 deletions(-) create mode 100644 pkg/driver/params_test.go create mode 100644 pkg/lvm/iolimiter_cgroup.go create mode 100644 pkg/lvm/iolimiter_cgroup_test.go diff --git a/cmd/main.go b/cmd/main.go index 0a026156..3407267d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -134,6 +134,10 @@ func main() { &config.NodeControllerPollingInterval, "node-polling-interval", 60, "The interval, in seconds, between node polling.", ) + cmd.PersistentFlags().StringVar( + &config.KubeletDir, "kubelet-dir", "/var/lib/kubelet/", "Kubelet directory provides the operation of volumeAttributesClassName", + ) + err := cmd.Execute() if err != nil { _, _ = fmt.Fprintf(os.Stderr, "%s", err.Error()) @@ -165,6 +169,8 @@ func run(config *config.Config) { lvm.SetIORateLimits(config) } + lvm.SetQoSValuesConf(config) + err := driver.New(config).Run() if err != nil { log.Fatalln(err) diff --git a/deploy/helm/charts/charts/crds/templates/lvmvolume.yaml b/deploy/helm/charts/charts/crds/templates/lvmvolume.yaml index 307791d8..3dc32e83 100644 --- a/deploy/helm/charts/charts/crds/templates/lvmvolume.yaml +++ b/deploy/helm/charts/charts/crds/templates/lvmvolume.yaml @@ -80,6 +80,25 @@ spec: can not be edited after the volume has been provisioned. minLength: 1 type: string + qos: + description: |- + QoS contains VAC realization based on cgroup v2 io controller. + Nil means "no explicit QoS settings applied". + properties: + readBPS: + type: string + readIOPS: + type: string + writeBPS: + type: string + writeIOPS: + type: string + required: + - readBPS + - readIOPS + - writeBPS + - writeIOPS + type: object shared: description: Shared specifies whether the volume can be shared among multiple pods. If it is not set to "yes", then the LVM LocalPV Driver diff --git a/deploy/helm/charts/templates/lvm-node.yaml b/deploy/helm/charts/templates/lvm-node.yaml index 662b2a5f..91df852a 100644 --- a/deploy/helm/charts/templates/lvm-node.yaml +++ b/deploy/helm/charts/templates/lvm-node.yaml @@ -85,6 +85,9 @@ spec: {{- if .Values.lvmPlugin.metricsPort }} - "--listen-address=$(METRICS_LISTEN_ADDRESS)" {{- end }} + {{- if .Values.lvmNode.kubeletDir }} + - "--kubelet-dir={{ include "lvmlocalpv.lvmNode.kubeletDir" . }}" + {{- end }} env: - name: OPENEBS_NODE_ID valueFrom: diff --git a/deploy/helm/charts/templates/rbac.yaml b/deploy/helm/charts/templates/rbac.yaml index edf3a349..977f92f7 100644 --- a/deploy/helm/charts/templates/rbac.yaml +++ b/deploy/helm/charts/templates/rbac.yaml @@ -27,7 +27,7 @@ rules: resources: ["persistentvolumeclaims/status"] verbs: ["update", "patch"] - apiGroups: ["storage.k8s.io"] - resources: ["storageclasses", "csinodes"] + resources: ["storageclasses", "csinodes","volumeattributesclasses"] verbs: ["get", "list", "watch"] - apiGroups: [ "storage.k8s.io" ] resources: [ "csistoragecapacities"] diff --git a/deploy/helm/charts/values.yaml b/deploy/helm/charts/values.yaml index 961a2aa6..51d8119a 100644 --- a/deploy/helm/charts/values.yaml +++ b/deploy/helm/charts/values.yaml @@ -85,7 +85,7 @@ lvmController: repository: sig-storage/csi-resizer pullPolicy: IfNotPresent # Overrides the image tag whose default is the chart appVersion. - tag: v1.11.2 + tag: v2.0.0 snapshotter: name: "csi-snapshotter" image: @@ -115,7 +115,7 @@ lvmController: repository: sig-storage/csi-provisioner pullPolicy: IfNotPresent # Overrides the image tag whose default is the chart appVersion. - tag: v5.2.0 + tag: v6.1.0 updateStrategy: type: RollingUpdate annotations: {} diff --git a/deploy/lvm-operator.yaml b/deploy/lvm-operator.yaml index 20ac0c0b..6e01f0e7 100644 --- a/deploy/lvm-operator.yaml +++ b/deploy/lvm-operator.yaml @@ -105,6 +105,25 @@ spec: OwnerNodeID can not be edited after the volume has been provisioned. minLength: 1 type: string + qos: + description: |- + QoS contains VAC realization based on cgroup v2 io controller. + Nil means "no explicit QoS settings applied". + properties: + readBPS: + type: string + readIOPS: + type: string + writeBPS: + type: string + writeIOPS: + type: string + required: + - readBPS + - readIOPS + - writeBPS + - writeIOPS + type: object shared: description: |- Shared specifies whether the volume can be shared among multiple pods. diff --git a/deploy/yamls/lvmvolume-crd.yaml b/deploy/yamls/lvmvolume-crd.yaml index d2e50ec1..ac4679c2 100644 --- a/deploy/yamls/lvmvolume-crd.yaml +++ b/deploy/yamls/lvmvolume-crd.yaml @@ -84,6 +84,25 @@ spec: OwnerNodeID can not be edited after the volume has been provisioned. minLength: 1 type: string + qos: + description: |- + QoS contains VAC realization based on cgroup v2 io controller. + Nil means "no explicit QoS settings applied". + properties: + readBPS: + type: string + readIOPS: + type: string + writeBPS: + type: string + writeIOPS: + type: string + required: + - readBPS + - readIOPS + - writeBPS + - writeIOPS + type: object shared: description: |- Shared specifies whether the volume can be shared among multiple pods. diff --git a/pkg/apis/openebs.io/lvm/v1alpha1/lvmvolume.go b/pkg/apis/openebs.io/lvm/v1alpha1/lvmvolume.go index b2fd3bbf..7e25e92f 100644 --- a/pkg/apis/openebs.io/lvm/v1alpha1/lvmvolume.go +++ b/pkg/apis/openebs.io/lvm/v1alpha1/lvmvolume.go @@ -74,6 +74,10 @@ type VolumeInfo struct { // +kubebuilder:validation:MinLength=1 Capacity string `json:"capacity"` + // QoS contains VAC realization based on cgroup v2 io controller. + // Nil means "no explicit QoS settings applied". + QoS *VolumeQoS `json:"qos,omitempty"` + // Shared specifies whether the volume can be shared among multiple pods. // If it is not set to "yes", then the LVM LocalPV Driver will not allow // the volumes to be mounted by more than one pods. @@ -93,6 +97,15 @@ type VolumeInfo struct { Source string `json:"source,omitempty"` } +// VolumeQoS now represents per-volume IO management based on cgroup v2 io.max. +// Each field be either "max" or a positive integer string (base-10). +type VolumeQoS struct { + ReadBPS string `json:"readBPS"` + ReadIOPS string `json:"readIOPS"` + WriteBPS string `json:"writeBPS"` + WriteIOPS string `json:"writeIOPS"` +} + // VolStatus string that specifies the current state of the volume provisioning request. type VolStatus struct { // State specifies the current state of the volume provisioning request. diff --git a/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go index dcfecf44..2a45b2bf 100644 --- a/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go @@ -305,6 +305,10 @@ func (in *VolumeGroup) DeepCopy() *VolumeGroup { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VolumeInfo) DeepCopyInto(out *VolumeInfo) { *out = *in + if in.QoS != nil { + out.QoS = new(VolumeQoS) + *out.QoS = *in.QoS + } return } diff --git a/pkg/builder/volbuilder/volume.go b/pkg/builder/volbuilder/volume.go index e41535ed..b00c8bd6 100644 --- a/pkg/builder/volbuilder/volume.go +++ b/pkg/builder/volbuilder/volume.go @@ -213,6 +213,18 @@ func (b *Builder) WithSource(source string) *Builder { return b } +// WithQoS sets optional per-volume distribution of IO resources +func (b *Builder) WithQoS(qos *apis.VolumeQoS) *Builder { + if qos == nil { + b.volume.Object.Spec.QoS = nil + return b + } + cp := new(apis.VolumeQoS) + *cp = *qos + b.volume.Object.Spec.QoS = cp + return b +} + // Build returns LVMVolume API object func (b *Builder) Build() (*apis.LVMVolume, error) { if len(b.errs) > 0 { diff --git a/pkg/driver/config/config.go b/pkg/driver/config/config.go index b8eb2995..d0783fc1 100644 --- a/pkg/driver/config/config.go +++ b/pkg/driver/config/config.go @@ -88,6 +88,9 @@ type Config struct { // NodeControllerPollingInterval is the interval, in seconds, between node polling. NodeControllerPollingInterval int + + // Kubelet directory provides the operation of volumeAttributesClassName. + KubeletDir string } // Default returns a new instance of config diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 99bfc9c6..63926c32 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -366,6 +366,12 @@ func CreateLVMVolume(ctx context.Context, req *csi.CreateVolumeRequest, } klog.Infof("scheduling the volume %s/%s on node %s", params.VgPattern.String(), volName, owner) + + qos, err := NewQoSParams(req.GetMutableParameters()) + if err != nil { + return nil, err + } + volObj, err := volbuilder.NewBuilder(). WithName(volName). WithCapacity(capacity). @@ -373,7 +379,9 @@ func CreateLVMVolume(ctx context.Context, req *csi.CreateVolumeRequest, WithOwnerNode(owner). WithVolumeStatus(lvm.LVMStatusPending). WithShared(params.Shared). - WithThinProvision(params.ThinProvision).Build() + WithThinProvision(params.ThinProvision). + WithQoS(QoSParamsCreateVolume(qos)). + Build() if err != nil { return nil, status.Errorf(codes.Internal, @@ -1103,6 +1111,7 @@ func newControllerCapabilities() []*csi.ControllerServiceCapability { csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, csi.ControllerServiceCapability_RPC_GET_CAPACITY, + csi.ControllerServiceCapability_RPC_MODIFY_VOLUME, } { capabilities = append(capabilities, fromType(cap)) } @@ -1255,9 +1264,50 @@ func (cs *controller) ControllerGetVolume( return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not implemented") } +// ControllerModifyVolume update previously provisioned VAC parametrs +// +// This implements csi.ControllerServer func (cs *controller) ControllerModifyVolume( ctx context.Context, req *csi.ControllerModifyVolumeRequest, ) (*csi.ControllerModifyVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "ControllerModifyVolume is not implemented") + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request is nil") + } + volID := strings.ToLower(strings.TrimSpace(req.GetVolumeId())) + if volID == "" { + return nil, status.Error(codes.InvalidArgument, "volume_id is empty") + } + + vol, err := volbuilder.NewKubeclient(). + WithNamespace(lvm.LvmNamespace). + Get(volID, metav1.GetOptions{}) + if err != nil { + if k8serror.IsNotFound(err) { + return nil, status.Errorf(codes.NotFound, "volume %s not found", volID) + } + return nil, status.Errorf(codes.Internal, "failed to get lvmvolume %s: %v", volID, err) + } + + // get the VAC values and prepare lvmvolume spec io format + ioPatch, err := NewQoSParams(req.GetMutableParameters()) + if err != nil { + return nil, err + } + + // apply new io values + modify, changed := QoSParamsModifyVolume(ioPatch, vol.Spec.QoS) + if !changed { + klog.Infof("ModifyVolume: QoS unchanged for volume %v, skipping update", vol) + return &csi.ControllerModifyVolumeResponse{}, nil + } + vol.Spec.QoS = modify + + if _, err := volbuilder.NewKubeclient(). + WithNamespace(lvm.LvmNamespace). + Update(vol); err != nil { + return nil, status.Errorf(codes.Internal, "failed to update lvmvolume qos %s: %v", volID, err) + } + + return &csi.ControllerModifyVolumeResponse{}, nil } diff --git a/pkg/driver/params.go b/pkg/driver/params.go index a229d521..cb174236 100644 --- a/pkg/driver/params.go +++ b/pkg/driver/params.go @@ -22,10 +22,16 @@ import ( "strconv" "strings" + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/openebs/lib-csi/pkg/common/helpers" "k8s.io/apimachinery/pkg/api/resource" ) +const qosParamMax = "max" + // VolumeParams holds collection of supported settings that can // be configured in storage class. type VolumeParams struct { @@ -52,6 +58,38 @@ type SnapshotParams struct { AbsSnapSize bool } +// qosParamState describes "what to do" with one parameter. +// ParamsPresent=false => the key did not arrive from VAC => do not change anything +// ParamsPresent=true => the key is present => set ParamsValue ("max" or >0) +type qosParamState struct { + ParamPresent bool + ParamValue string +} + +// qosParams holds collection of supported settings that can be configured +// in VolumeAttributesClass. +type qosParams struct { + UnifiedIOPS qosParamState + ReadIOPS qosParamState + WriteIOPS qosParamState + + UnifiedBPS qosParamState + ReadBPS qosParamState + WriteBPS qosParamState +} + +// qosParamsPresent returns true if VAC request contains at least one supported QoS key. +// Helps determine the state of the VAC. +func (p *qosParams) qosParamsPresent() bool { + if p == nil { + return false + } + return p.UnifiedIOPS.ParamPresent || + p.ReadIOPS.ParamPresent || p.WriteIOPS.ParamPresent || + p.UnifiedBPS.ParamPresent || + p.ReadBPS.ParamPresent || p.WriteBPS.ParamPresent +} + // NewVolumeParams parses the input params and instantiates new VolumeParams. func NewVolumeParams(m map[string]string) (*VolumeParams, error) { params := &VolumeParams{ // set up defaults, if any. @@ -140,3 +178,234 @@ func NewSnapshotParams(m map[string]string) (*SnapshotParams, error) { return params, nil } + +// NewQoSParams parses and validates VAC mutable parameters into qosParams: +// keys are case-sensitive. +// recognizes only keys for QoS. +// validates and determines allowed parameters. +// rejects conflicting unified vs per-direction values. +func NewQoSParams(m map[string]string) (*qosParams, error) { + var qosKeys = struct { + IOPS, IOPSRead, IOPSWrite string + BPS, BPSRead, BPSWrite string + }{ + IOPS: "qosIopsLimit", + IOPSRead: "qosIopsReadLimit", + IOPSWrite: "qosIopsWriteLimit", + BPS: "qosBandwithPerSec", + BPSRead: "qosBandwithReadPerSec", + BPSWrite: "qosBandwithWritePerSec", + } + + // VAC didn't provide QoS keys at all. + if len(m) == 0 { + return nil, nil + } + + p := &qosParams{} + var errs []string + + iopsTargets := map[string]*qosParamState{ + qosKeys.IOPS: &p.UnifiedIOPS, + qosKeys.IOPSRead: &p.ReadIOPS, + qosKeys.IOPSWrite: &p.WriteIOPS, + } + bpsTargets := map[string]*qosParamState{ + qosKeys.BPS: &p.UnifiedBPS, + qosKeys.BPSRead: &p.ReadBPS, + qosKeys.BPSWrite: &p.WriteBPS, + } + + for k, raw := range m { + if raw != strings.TrimSpace(raw) { + errs = append(errs, fmt.Sprintf("%s: value must not contain leading/trailing whitespace", k)) + continue + } + + if dst, ok := iopsTargets[k]; ok { + *dst = qosParamsParseIOPS(&errs, k, raw) + continue + } + if dst, ok := bpsTargets[k]; ok { + *dst = qosParamsParseBPS(&errs, k, raw) + continue + } + errs = append(errs, fmt.Sprintf("%s: unsupported VAC key", k)) + } + + // check unified parameter and a separate parameter corresponding to it and find conflict + errs = qosParamsConflict(errs, qosKeys.IOPS, p.UnifiedIOPS, qosKeys.IOPSRead, p.ReadIOPS) + errs = qosParamsConflict(errs, qosKeys.IOPS, p.UnifiedIOPS, qosKeys.IOPSWrite, p.WriteIOPS) + errs = qosParamsConflict(errs, qosKeys.BPS, p.UnifiedBPS, qosKeys.BPSRead, p.ReadBPS) + errs = qosParamsConflict(errs, qosKeys.BPS, p.UnifiedBPS, qosKeys.BPSWrite, p.WriteBPS) + + // apply unified parameter when separate parameter not provided + if p.UnifiedIOPS.ParamPresent { + if _, ok := m[qosKeys.IOPSRead]; !ok { + p.ReadIOPS = p.UnifiedIOPS + } + if _, ok := m[qosKeys.IOPSWrite]; !ok { + p.WriteIOPS = p.UnifiedIOPS + } + } + if p.UnifiedBPS.ParamPresent { + if _, ok := m[qosKeys.BPSRead]; !ok { + p.ReadBPS = p.UnifiedBPS + } + if _, ok := m[qosKeys.BPSWrite]; !ok { + p.WriteBPS = p.UnifiedBPS + } + } + + if len(errs) > 0 { + return nil, status.Error(codes.InvalidArgument, "invalid VAC mutableParameters: "+strings.Join(errs, "; ")) + } + return p, nil +} + +// qosParamsConflict detects a conflict between a unified key (e.g. qosIopsLimit) +// and a per-direction key (e.g. qosIopsReadLimit). +// This prevents configurations such as qosIopsLimit="100" and qosIopsReadLimit="200". +func qosParamsConflict(errs []string, uKey string, uParam qosParamState, sKey string, sParam qosParamState) []string { + if !uParam.ParamPresent || !sParam.ParamPresent { + return errs + } + + // In case of an error during parsing, additional checking + if uParam.ParamValue == "" || sParam.ParamValue == "" { + errs = append(errs, fmt.Sprintf("invalid value when checking parameters for conflict: %s vs %s", sKey, uKey)) + } + if uParam.ParamValue != sParam.ParamValue { + errs = append(errs, fmt.Sprintf("%s: conflicts with %s (%s vs %s)", sKey, uKey, sParam.ParamValue, uParam.ParamValue)) + } + return errs +} + +// qosParamsParseIOPS parses an IOPS parameter value. +// "max" => remove limit +// uint64 string (>0) => ParamValue set +// empty/invalid/zero => validation error is appended. +func qosParamsParseIOPS(errs *[]string, key, raw string) qosParamState { + ps := qosParamState{ParamPresent: true} + + if raw == "" { + *errs = append(*errs, fmt.Sprintf("%s: empty value", key)) + return ps + } + if raw == qosParamMax { + ps.ParamValue = qosParamMax + return ps + } + + v, err := strconv.ParseUint(raw, 10, 64) + if err != nil { + *errs = append(*errs, fmt.Sprintf("%s: invalid uint64 %q", key, raw)) + return ps + } + if v == 0 { + *errs = append(*errs, fmt.Sprintf("%s: must be > 0 or 'max'", key)) + return ps + } + ps.ParamValue = strconv.FormatUint(v, 10) + return ps +} + +// qosParamsParseBPS parses a throughput parameter value using Kubernetes Quantity. +// "max" => remove limit +// Quantity that converts to a positive int64 => ParamValue set +// empty/invalid/non-integer/<=0 => validation error is appended. +func qosParamsParseBPS(errs *[]string, key, raw string) qosParamState { + ps := qosParamState{ParamPresent: true} + + if raw == "" { + *errs = append(*errs, fmt.Sprintf("%s: empty value", key)) + return ps + } + if raw == qosParamMax { + ps.ParamValue = qosParamMax + return ps + } + + v, err := resource.ParseQuantity(raw) + if err != nil { + *errs = append(*errs, fmt.Sprintf("%s: invalid quantity %q", key, raw)) + return ps + } + n, ok := v.AsInt64() + if !ok || n <= 0 { + *errs = append(*errs, fmt.Sprintf("%s: must be > 0 or 'max'", key)) + return ps + } + + ps.ParamValue = strconv.FormatInt(n, 10) + return ps +} + +// QoSParamsCreateVolume builds the initial VolumeQoS spec for volume creation. +// Only parameters that were explicitly provided AND contain a numeric value +// are written. Returns nil if VAC didn't provide any QoS keys. +func QoSParamsCreateVolume(in *qosParams) *apis.VolumeQoS { + if in == nil || !in.qosParamsPresent() { + return nil + } + + out := qosParamsDefaultMax() + apply := func(ps qosParamState, dst *string) { + if !ps.ParamPresent { + return + } + *dst = ps.ParamValue // "max" or number + } + + apply(in.ReadIOPS, &out.ReadIOPS) + apply(in.WriteIOPS, &out.WriteIOPS) + apply(in.ReadBPS, &out.ReadBPS) + apply(in.WriteBPS, &out.WriteBPS) + + return &out +} + +// QoSParamsModifyVolume applies mutable parameters to an existing VolumeQoS spec and +// reports whether it changed. No-op if VAC request contains no QoS keys. +func QoSParamsModifyVolume(in *qosParams, old *apis.VolumeQoS) (*apis.VolumeQoS, bool) { + if in == nil || !in.qosParamsPresent() { + return old, false + } + + var out apis.VolumeQoS + if old != nil { + out = *old + } else { + out = qosParamsDefaultMax() + } + + before := out + + apply := func(ps qosParamState, dst *string) { + // CSI spec on ControllerModifyVolumeRequest says: SPs MUST NOT + // modify volumes based on the absence of keys, only keys + // that are specified should result in modifications to the volume. + if !ps.ParamPresent { + return + } + *dst = ps.ParamValue + } + + apply(in.ReadIOPS, &out.ReadIOPS) + apply(in.WriteIOPS, &out.WriteIOPS) + apply(in.ReadBPS, &out.ReadBPS) + apply(in.WriteBPS, &out.WriteBPS) + + changed := (out != before) + return &out, changed +} + +// qosParamsDefaultMax prepares basic VAC structure to which restrictions will be applied. +func qosParamsDefaultMax() apis.VolumeQoS { + return apis.VolumeQoS{ + ReadBPS: qosParamMax, + WriteBPS: qosParamMax, + ReadIOPS: qosParamMax, + WriteIOPS: qosParamMax, + } +} diff --git a/pkg/driver/params_test.go b/pkg/driver/params_test.go new file mode 100644 index 00000000..76635539 --- /dev/null +++ b/pkg/driver/params_test.go @@ -0,0 +1,324 @@ +package driver + +import ( + "reflect" + "strings" + "testing" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestNewQoSParams_EmptyMapReturnsNil(t *testing.T) { + p, err := NewQoSParams(map[string]string{}) + if err != nil { + t.Fatalf("expected nil error, got %v", err) + } + if p != nil { + t.Fatalf("expected nil params, got %#v", p) + } +} + +func TestNewQoSParams_NilMapReturnsNil(t *testing.T) { + var m map[string]string + p, err := NewQoSParams(m) + if err != nil { + t.Fatalf("expected nil error, got %v", err) + } + if p != nil { + t.Fatalf("expected nil params, got %#v", p) + } +} + +func TestNewQoSParams_Cases(t *testing.T) { + type want struct { + params *qosParams + code codes.Code + errHas []string + } + + tests := []struct { + name string + in map[string]string + want want + }{ + { + name: "low: unified iops + unified bps", + in: map[string]string{ + "qosIopsLimit": "20", + "qosBandwithPerSec": "20000", + }, + want: want{ + code: codes.OK, + params: &qosParams{ + UnifiedIOPS: qosParamState{ParamPresent: true, ParamValue: "20"}, + ReadIOPS: qosParamState{ParamPresent: true, ParamValue: "20"}, // applied from unified + WriteIOPS: qosParamState{ParamPresent: true, ParamValue: "20"}, // applied from unified + + UnifiedBPS: qosParamState{ParamPresent: true, ParamValue: "20000"}, + ReadBPS: qosParamState{ParamPresent: true, ParamValue: "20000"}, // applied from unified + WriteBPS: qosParamState{ParamPresent: true, ParamValue: "20000"}, // applied from unified + }, + }, + }, + { + name: "iops-max: iops=max, bps numeric", + in: map[string]string{ + "qosIopsLimit": "max", + "qosBandwithPerSec": "7000000", + }, + want: want{ + code: codes.OK, + params: &qosParams{ + UnifiedIOPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + ReadIOPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + WriteIOPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + + UnifiedBPS: qosParamState{ParamPresent: true, ParamValue: "7000000"}, + ReadBPS: qosParamState{ParamPresent: true, ParamValue: "7000000"}, + WriteBPS: qosParamState{ParamPresent: true, ParamValue: "7000000"}, + }, + }, + }, + { + name: "mi quantity bps valid", + in: map[string]string{ + "qosIopsLimit": "10", + "qosBandwithPerSec": "7000Mi", + }, + want: want{ + code: codes.OK, + params: &qosParams{ + UnifiedIOPS: qosParamState{ParamPresent: true, ParamValue: "10"}, + ReadIOPS: qosParamState{ParamPresent: true, ParamValue: "10"}, + WriteIOPS: qosParamState{ParamPresent: true, ParamValue: "10"}, + + UnifiedBPS: qosParamState{ParamPresent: true, ParamValue: "7340032000"}, + ReadBPS: qosParamState{ParamPresent: true, ParamValue: "7340032000"}, + WriteBPS: qosParamState{ParamPresent: true, ParamValue: "7340032000"}, + }, + }, + }, + { + name: "directional limits valid", + in: map[string]string{ + "qosIopsReadLimit": "100", + "qosIopsWriteLimit": "200", + "qosBandwithReadPerSec": "7000Mi", + "qosBandwithWritePerSec": "8000Mi", + }, + want: want{ + code: codes.OK, + params: &qosParams{ + UnifiedIOPS: qosParamState{ParamPresent: false, ParamValue: ""}, + ReadIOPS: qosParamState{ParamPresent: true, ParamValue: "100"}, + WriteIOPS: qosParamState{ParamPresent: true, ParamValue: "200"}, + + UnifiedBPS: qosParamState{ParamPresent: false, ParamValue: ""}, + ReadBPS: qosParamState{ParamPresent: true, ParamValue: "7340032000"}, + WriteBPS: qosParamState{ParamPresent: true, ParamValue: "8388608000"}, + }, + }, + }, + { + name: "unlim: max/max", + in: map[string]string{ + "qosIopsLimit": "max", + "qosBandwithPerSec": "max", + }, + want: want{ + code: codes.OK, + params: &qosParams{ + UnifiedIOPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + ReadIOPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + WriteIOPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + + UnifiedBPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + ReadBPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + WriteBPS: qosParamState{ParamPresent: true, ParamValue: "max"}, + }, + }, + }, + { + name: "zero iops invalid", + in: map[string]string{ + "qosIopsLimit": "0", + "qosBandwithPerSec": "7000000", + }, + want: want{ + code: codes.InvalidArgument, + errHas: []string{"qosIopsLimit: must be > 0 or 'max'"}, + }, + }, + { + name: "duplicate key different case is unsupported (case-sensitive)", + in: map[string]string{ + "qosIopsLimit": "10", + "qosIopslimit": "20", + "qosBandwithPerSec": "7000Mi", + }, + want: want{ + code: codes.InvalidArgument, + errHas: []string{"qosIopslimit: unsupported VAC key"}, + }, + }, + { + name: "conflict: unified iops=max with read=1000", + in: map[string]string{ + "qosIopsLimit": "max", + "qosIopsReadLimit": "1000", + }, + want: want{ + code: codes.InvalidArgument, + errHas: []string{"qosIopsReadLimit: conflicts with qosIopsLimit (1000 vs max)"}, + }, + }, + { + name: "wrong key unsupported", + in: map[string]string{ + "wrong": "1", + }, + want: want{ + code: codes.InvalidArgument, + errHas: []string{"wrong: unsupported VAC key"}, + }, + }, + { + name: "whitespace in value rejected", + in: map[string]string{ + "qosIopsLimit": " 20", + "qosBandwithPerSec": "20000", + }, + want: want{ + code: codes.InvalidArgument, + errHas: []string{"qosIopsLimit: value must not contain leading/trailing whitespace"}, + }, + }, + { + name: "empty string in value rejected", + in: map[string]string{ + "qosBandwithPerSec": "", + }, + want: want{ + code: codes.InvalidArgument, + errHas: []string{"qosBandwithPerSec: empty value"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewQoSParams(tt.in) + + if tt.want.code == codes.OK { + if err != nil { + t.Fatalf("expected nil error, got %v", err) + } + if got == nil { + t.Fatalf("expected non-nil params") + } + if !reflect.DeepEqual(got, tt.want.params) { + t.Fatalf("params mismatch\n got: %#v\n want: %#v", got, tt.want.params) + } + if !got.qosParamsPresent() { + t.Fatalf("expected qosParamsPresent()=true, got false") + } + return + } + + if err == nil { + t.Fatalf("expected error with code %v, got nil", tt.want.code) + } + st, ok := status.FromError(err) + if !ok { + t.Fatalf("expected grpc status error, got %T: %v", err, err) + } + if st.Code() != tt.want.code { + t.Fatalf("expected code %v, got %v (msg=%q)", tt.want.code, st.Code(), st.Message()) + } + msg := st.Message() + for _, sub := range tt.want.errHas { + if !strings.Contains(msg, sub) { + t.Fatalf("expected error message to contain %q, got %q", sub, msg) + } + } + }) + } +} + +func TestQosParamsConflict(t *testing.T) { + tests := []struct { + name string + uKey string + u qosParamState + sKey string + s qosParamState + wantAdded []string + }{ + { + name: "no conflict if unified not present", + uKey: "qosIopsLimit", + u: qosParamState{ParamPresent: false, ParamValue: "10"}, + sKey: "qosIopsReadLimit", + s: qosParamState{ParamPresent: true, ParamValue: "20"}, + wantAdded: nil, + }, + { + name: "no conflict if separate not present", + uKey: "qosIopsLimit", + u: qosParamState{ParamPresent: true, ParamValue: "10"}, + sKey: "qosIopsReadLimit", + s: qosParamState{ParamPresent: false, ParamValue: "20"}, + wantAdded: nil, + }, + { + name: "no conflict if equal values", + uKey: "qosIopsLimit", + u: qosParamState{ParamPresent: true, ParamValue: "100"}, + sKey: "qosIopsReadLimit", + s: qosParamState{ParamPresent: true, ParamValue: "100"}, + wantAdded: nil, + }, + { + name: "conflict if different values", + uKey: "qosIopsLimit", + u: qosParamState{ParamPresent: true, ParamValue: "max"}, + sKey: "qosIopsReadLimit", + s: qosParamState{ParamPresent: true, ParamValue: "1000"}, + wantAdded: []string{ + "qosIopsReadLimit: conflicts with qosIopsLimit (1000 vs max)", + }, + }, + { + name: "invalid value check triggers when empty param values", + uKey: "qosIopsLimit", + u: qosParamState{ParamPresent: true, ParamValue: ""}, + sKey: "qosIopsReadLimit", + s: qosParamState{ParamPresent: true, ParamValue: "1000"}, + wantAdded: []string{ + "invalid value when checking parameters for conflict", + "qosIopsReadLimit: conflicts with qosIopsLimit (1000 vs )", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var errs []string + errs = qosParamsConflict(errs, tt.uKey, tt.u, tt.sKey, tt.s) + + if len(tt.wantAdded) == 0 { + if len(errs) != 0 { + t.Fatalf("expected no errors, got %#v", errs) + } + return + } + joined := strings.Join(errs, "; ") + for _, sub := range tt.wantAdded { + if !strings.Contains(joined, sub) { + t.Fatalf("expected errs to contain %q, got %q", sub, joined) + } + } + }) + } +} diff --git a/pkg/lvm/iolimiter_cgroup.go b/pkg/lvm/iolimiter_cgroup.go new file mode 100644 index 00000000..3c76403a --- /dev/null +++ b/pkg/lvm/iolimiter_cgroup.go @@ -0,0 +1,287 @@ +package lvm + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + + mnt "github.com/openebs/lib-csi/pkg/mount" + "golang.org/x/sys/unix" + klog "k8s.io/klog/v2" + + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" + "github.com/openebs/lvm-localpv/pkg/driver/config" +) + +const qosValueMax = "max" + +var ( + kDir string + qosValuesMu sync.Mutex +) + +// SetQoSValuesConf sets config parametrs +func SetQoSValuesConf(config *config.Config) { + kDir = config.KubeletDir +} + +// ReconcileSetQoSValues update vol.Spec.QoS(VAC) on all pods currently using this volume on +// this node. Logic of the operation is based on use of information about mounting points. +// Get the path to the device, all mount points for this device and set VAC parametrs. +func ReconcileSetQoSValues(vol *apis.LVMVolume) error { + if vol == nil || vol.Spec.QoS == nil { + return nil + } + + devPath := GetVolumeDevPath(vol) + + mounts, err := mnt.GetMounts(devPath) + if err != nil { + return fmt.Errorf("get mounts for %s: %w", devPath, err) + } + + podUIDs := make(map[string]struct{}) + + // when pvc volumeMode: filesystem + for _, mp := range mounts { + if uid, ok := getPodUIDFromPath(mp); ok { + podUIDs[uid] = struct{}{} + } + } + // when pvc volumeMode: Block + if len(podUIDs) == 0 { + uids, err := getPodUIDFromVolDevice(vol.Name) + if err != nil { + return err + } + for _, uid := range uids { + podUIDs[uid] = struct{}{} + } + } + + if len(podUIDs) == 0 { + klog.Infof("Not found pod uuid from mount on node") + return nil + } + + for uid := range podUIDs { + if err := SetQoSValues(vol, uid); err != nil { + klog.Warningf("io.max apply failed vol=%s podUID=%s: %v", vol.Name, uid, err) + } + } + return nil +} + +// getPodUIDFromVolDevice helper function tries to discover podUIDs for CSI block volumes. +// Entries are typically named by UUID. +func getPodUIDFromVolDevice(volumeID string) ([]string, error) { + publishDir := filepath.Join( + kDir, + "plugins/kubernetes.io/csi/volumeDevices/publish", + volumeID, + ) + + ents, err := os.ReadDir(publishDir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("readdir %s: %w", publishDir, err) + } + + out := make([]string, 0, len(ents)) + seen := make(map[string]struct{}, len(ents)) + + for _, e := range ents { + name := e.Name() + if len(name) != 36 || strings.Count(name, "-") != 4 { + continue + } + if _, ok := seen[name]; ok { + continue + } + seen[name] = struct{}{} + out = append(out, name) + } + + return out, nil +} + +// getPodUIDFromPath helper function to calculate the from the kubelet mount points: +// /var/lib/kubelet/pods//volumes/.../mount +func getPodUIDFromPath(mountPath string) (string, bool) { + prefix := filepath.Join(kDir, "pods") + + rest, found := strings.CutPrefix(mountPath, prefix) + if !found { + return "", false + } + + // cut first slash + rest = strings.TrimPrefix(rest, string(filepath.Separator)) + + // get uid + uid, _, ok := strings.Cut(rest, string(filepath.Separator)) + if !ok || uid == "" { + return "", false + } + return uid, true +} + +// SetQoSValues applies vol.Spec.QoS for the given podUID. After getting the path to the +// device we can find out its id. With the path to the device, we will find out its id in +// order to form the parameters for writing to io.max and apply the limits. +func SetQoSValues(vol *apis.LVMVolume, podUID string) error { + if vol == nil || vol.Spec.QoS == nil { + return nil + } + + devPath := GetVolumeDevPath(vol) + + maj, min, err := getDevMajorMinor(devPath) + if err != nil { + return fmt.Errorf("stat device %s: %w", devPath, err) + } + + line, err := setQoSValuesLine(maj, min, vol.Spec.QoS) + if err != nil { + return fmt.Errorf("build io.max line: %w", err) + } + + cgDirs, err := getPodCgroupPath(podUID) + if err != nil { + return err + } + if len(cgDirs) == 0 { + return nil + } + + qosValuesMu.Lock() + defer qosValuesMu.Unlock() + + for _, dir := range cgDirs { + p := filepath.Join(dir, "io.max") + if _, err := os.Stat(p); err != nil { + continue + } + if err := QoSValuesWriteFile(p, line); err != nil { + klog.Warningf("failed writing %s: %v", p, err) + continue + } + klog.Infof("Applied io.max vol=%s podUID=%s file=%s line=%q", vol.Name, podUID, p, line) + } + + return nil +} + +// getDevMajorMinor helper function to calculate device id number, for example: +// major minor => 253 0 +func getDevMajorMinor(devPath string) (uint32, uint32, error) { + var st unix.Stat_t + if err := unix.Stat(devPath, &st); err != nil { + return 0, 0, err + } + return unix.Major(uint64(st.Rdev)), unix.Minor(uint64(st.Rdev)), nil +} + +// setQoSValuesLine helper function which transforms all keys to desired state. +// cgroup v2 io.max format => ": rbps= wbps= riops= wiops=" +func setQoSValuesLine(maj, min uint32, qos *apis.VolumeQoS) (string, error) { + rbps := qosValueMax + wbps := qosValueMax + riops := qosValueMax + wiops := qosValueMax + + if qos != nil { + var err error + + rbps, err = qosValuesConvert(qos.ReadBPS) + if err != nil { + return "", fmt.Errorf("readBPS: %w", err) + } + wbps, err = qosValuesConvert(qos.WriteBPS) + if err != nil { + return "", fmt.Errorf("writeBPS: %w", err) + } + riops, err = qosValuesConvert(qos.ReadIOPS) + if err != nil { + return "", fmt.Errorf("readIOPS: %w", err) + } + wiops, err = qosValuesConvert(qos.WriteIOPS) + if err != nil { + return "", fmt.Errorf("writeIOPS: %w", err) + } + } + + return fmt.Sprintf("%d:%d rbps=%s wbps=%s riops=%s wiops=%s", maj, min, rbps, wbps, riops, wiops), nil +} + +// getPodCgroupPath helper function which find pod cgroup directory and supports both +// systemd and cgroupfs paths. +func getPodCgroupPath(podUID string) ([]string, error) { + kDriver := "/sys/fs/cgroup/kubepods.slice" + cDriver := "/sys/fs/cgroup/kubepods" + + uidDash := podUID + uidUnderscore := strings.ReplaceAll(podUID, "-", "_") + + candidates := []string{ + // systemd + filepath.Join(kDriver, "kubepods-burstable.slice", fmt.Sprintf("kubepods-burstable-pod%s.slice", uidUnderscore)), + filepath.Join(kDriver, "kubepods-besteffort.slice", fmt.Sprintf("kubepods-besteffort-pod%s.slice", uidUnderscore)), + filepath.Join(kDriver, "kubepods-guaranteed.slice", fmt.Sprintf("kubepods-guaranteed-pod%s.slice", uidUnderscore)), + filepath.Join(kDriver, fmt.Sprintf("kubepods-pod%s.slice", uidUnderscore)), + + // cgroupfs + filepath.Join(cDriver, "burstable", fmt.Sprintf("pod%s", uidDash)), + filepath.Join(cDriver, "besteffort", fmt.Sprintf("pod%s", uidDash)), + filepath.Join(cDriver, fmt.Sprintf("pod%s", uidDash)), + } + + out := make([]string, 0, 2) + for _, dir := range candidates { + if st, err := os.Stat(dir); err == nil && st.IsDir() { + if _, err := os.Stat(filepath.Join(dir, "io.max")); err == nil { + out = append(out, dir) + } + } + } + if len(out) == 0 { + return nil, fmt.Errorf("pod cgroup dir not found for podUID=%s", podUID) + } + return out, nil +} + +// QoSValuesWriteFile helper function which write a VAC value to the file. +func QoSValuesWriteFile(path, line string) error { + f, err := os.OpenFile(path, os.O_WRONLY, 0) + if err != nil { + return err + } + defer f.Close() + _, err = f.WriteString(line + "\n") + return err +} + +// QoSValuesConvert helper function which converts a VAC value to io.max value +// with checking for invalid values ​​in case of parsing error. +func qosValuesConvert(v string) (string, error) { + if v == qosValueMax { + return qosValueMax, nil + } + if v == "" { + return "", fmt.Errorf("qos value is empty") + } + u, err := strconv.ParseUint(v, 10, 64) + if err != nil { + return "", fmt.Errorf("qos value %q is not a valid uint64", v) + } + if u == 0 { + return "", fmt.Errorf("qos value must be > 0 or 'max', got %q", v) + } + + return strconv.FormatUint(u, 10), nil +} diff --git a/pkg/lvm/iolimiter_cgroup_test.go b/pkg/lvm/iolimiter_cgroup_test.go new file mode 100644 index 00000000..b995826e --- /dev/null +++ b/pkg/lvm/iolimiter_cgroup_test.go @@ -0,0 +1,158 @@ +package lvm + +import ( + "os" + "path/filepath" + "testing" + + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" +) + +func TestGetPodUIDFromPath(t *testing.T) { + kDir = "/var/lib/kubelet/" + tests := []struct { + name string + in string + uid string + ok bool + }{ + { + name: "valid kubelet csi mount path", + in: "/var/lib/kubelet/pods/eb9d54fe-07d5-4428-9c46-f937f4e12043/volumes/kubernetes.io~csi/pvc-aaa/mount", + uid: "eb9d54fe-07d5-4428-9c46-f937f4e12043", + ok: true, + }, + { + name: "path without kubelet prefix", + in: "/something/else/pods/eb9d54fe-07d5-4428-9c46-f937f4e12043/mount", + uid: "", + ok: false, + }, + { + name: "prefix but no slash after uid", + in: "/var/lib/kubelet/pods/eb9d54fe-07d5-4428-9c46-f937f4e12043", + uid: "", + ok: false, + }, + { + name: "prefix but empty uid", + in: "/var/lib/kubelet/pods//volumes/kubernetes.io~csi/x/mount", + uid: "", + ok: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + uid, ok := getPodUIDFromPath(tt.in) + if ok != tt.ok { + t.Fatalf("ok mismatch: got %v want %v (uid=%q)", ok, tt.ok, uid) + } + if uid != tt.uid { + t.Fatalf("uid mismatch: got %q want %q", uid, tt.uid) + } + }) + } +} + +func TestQoSValuesConvert(t *testing.T) { + tests := []struct { + in string + want string + wantErr bool + }{ + {in: "max", want: "max", wantErr: false}, + {in: "1", want: "1", wantErr: false}, + {in: "123456", want: "123456", wantErr: false}, + {in: "", wantErr: true}, + {in: "0", wantErr: true}, + {in: "nope", wantErr: true}, + {in: " 1", wantErr: true}, + } + + for _, tt := range tests { + got, err := qosValuesConvert(tt.in) + if tt.wantErr { + if err == nil { + t.Fatalf("QoSValuesConvert(%q) expected error, got %q", tt.in, got) + } + continue + } + if err != nil { + t.Fatalf("QoSValuesConvert(%q) unexpected error: %v", tt.in, err) + } + if got != tt.want { + t.Fatalf("QoSValuesConvert(%q)=%q want %q", tt.in, got, tt.want) + } + } +} + +func TestSetQoSValuesLine(t *testing.T) { + maj := uint32(253) + min := uint32(7) + + t.Run("nil qos means all max", func(t *testing.T) { + got, err := setQoSValuesLine(maj, min, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := "253:7 rbps=max wbps=max riops=max wiops=max" + if got != want { + t.Fatalf("got %q want %q", got, want) + } + }) + + t.Run("mixed values", func(t *testing.T) { + qos := &apis.VolumeQoS{ + ReadBPS: "100", + WriteBPS: "max", + ReadIOPS: "5", + WriteIOPS: "max", + } + got, err := setQoSValuesLine(maj, min, qos) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := "253:7 rbps=100 wbps=max riops=5 wiops=max" + if got != want { + t.Fatalf("got %q want %q", got, want) + } + }) + + t.Run("invalid value triggers error", func(t *testing.T) { + qos := &apis.VolumeQoS{ + ReadBPS: "0", + WriteBPS: "max", + ReadIOPS: "5", + WriteIOPS: "max", + } + _, err := setQoSValuesLine(maj, min, qos) + if err == nil { + t.Fatalf("expected error, got nil") + } + }) +} + +func TestQoSValuesWriteFile(t *testing.T) { + dir := t.TempDir() + p := filepath.Join(dir, "io.max") + + if err := os.WriteFile(p, []byte(""), 0o644); err != nil { + t.Fatalf("prepare file: %v", err) + } + + line := "253:7 rbps=100 wbps=max riops=5 wiops=max" + if err := QoSValuesWriteFile(p, line); err != nil { + t.Fatalf("QoSValuesWriteFile error: %v", err) + } + + b, err := os.ReadFile(p) + if err != nil { + t.Fatalf("read back: %v", err) + } + + want := line + "\n" + if string(b) != want { + t.Fatalf("file content mismatch:\n got: %q\nwant: %q", string(b), want) + } +} diff --git a/pkg/lvm/mount.go b/pkg/lvm/mount.go index 39cd3713..8d99276b 100644 --- a/pkg/lvm/mount.go +++ b/pkg/lvm/mount.go @@ -23,7 +23,6 @@ import ( "strconv" "github.com/openebs/lib-csi/pkg/device/iolimit" - mnt "github.com/openebs/lib-csi/pkg/mount" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -214,11 +213,22 @@ func MountVolume(vol *apis.LVMVolume, mount *MountInfo, podLVInfo *PodLVInfo) er klog.Infof("lvm: volume %v mounted %v fs %v", volume, mount.MountPath, mount.FSType) - if ioLimitsEnabled && podLVInfo != nil { - if err := setIOLimits(vol, podLVInfo, devicePath); err != nil { - klog.Warningf("lvm: error setting io limits: podUid %s, device %s, err=%v", podLVInfo.UID, devicePath, err) - } else { - klog.Infof("lvm: io limits set for podUid %v, device %s", podLVInfo.UID, devicePath) + if podLVInfo != nil { + if ioLimitsEnabled { + if err := setIOLimits(vol, podLVInfo, devicePath); err != nil { + klog.Warningf("lvm: error setting io limits: podUid %s, device %s, err=%v", + podLVInfo.UID, devicePath, err) + } else { + klog.Infof("lvm: io limits set for podUid %s, device %s", + podLVInfo.UID, devicePath) + } + } + + if vol.Spec.QoS != nil { + if err := SetQoSValues(vol, podLVInfo.UID); err != nil { + klog.Warningf("lvm: error applying io.max: podUid %s, device %s, err=%v", + podLVInfo.UID, devicePath, err) + } } } @@ -259,11 +269,22 @@ func MountBlock(vol *apis.LVMVolume, mountinfo *MountInfo, podLVInfo *PodLVInfo) klog.Infof("NodePublishVolume mounted block device %s at %s", devicePath, target) - if ioLimitsEnabled && podLVInfo != nil { - if err := setIOLimits(vol, podLVInfo, devicePath); err != nil { - klog.Warningf(": error setting io limits for podUid %s, device %s, err=%v", podLVInfo.UID, devicePath, err) - } else { - klog.Infof("lvm: io limits set for podUid %s, device %s", podLVInfo.UID, devicePath) + if podLVInfo != nil { + if ioLimitsEnabled { + if err := setIOLimits(vol, podLVInfo, devicePath); err != nil { + klog.Warningf("lvm: error setting io limits: podUid %s, device %s, err=%v", + podLVInfo.UID, devicePath, err) + } else { + klog.Infof("lvm: io limits set for podUid %s, device %s", + podLVInfo.UID, devicePath) + } + } + + if vol.Spec.QoS != nil { + if err := SetQoSValues(vol, podLVInfo.UID); err != nil { + klog.Warningf("lvm: error applying io.max: podUid %s, device %s, err=%v", + podLVInfo.UID, devicePath, err) + } } } return nil diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index 90eea4ae..80b578ee 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -18,6 +18,7 @@ package volume import ( "fmt" + "reflect" "regexp" "sort" "strconv" @@ -89,6 +90,11 @@ func (c *VolController) addVol(obj interface{}) { // updateVol is the update event handler for LVMVolume func (c *VolController) updateVol(oldObj, newObj interface{}) { + oldVol, ok := c.getStructuredObject(oldObj) + if !ok { + runtime.HandleError(fmt.Errorf("couldn't get old Vol object %#v", oldObj)) + return + } newVol, ok := c.getStructuredObject(newObj) if !ok { runtime.HandleError(fmt.Errorf("couldn't get Vol object %#v", newVol)) @@ -103,6 +109,11 @@ func (c *VolController) updateVol(oldObj, newObj interface{}) { klog.Infof("Got update event for deleted Vol %s, Deletion timestamp %s", newVol.Name, newVol.DeletionTimestamp) c.enqueueVol(newVol) } + if !reflect.DeepEqual(oldVol.Spec.QoS, newVol.Spec.QoS) { + klog.Infof("Got update event for Vol %s: spec.qos changed old=%+v new=%+v", newVol.Name, oldVol.Spec.QoS, newVol.Spec.QoS) + c.enqueueVol(newVol) + return + } } // deleteVol is the delete event handler for LVMVolume @@ -183,6 +194,11 @@ func (c *VolController) syncVol(vol *apis.LVMVolume) error { klog.Warningf("Skipping retrying lvm volume provisioning as its already in failed state: %+v", vol.Status.Error) return nil case lvm.LVMStatusReady: + if vol.Spec.QoS != nil { + if err := lvm.ReconcileSetQoSValues(vol); err != nil { + klog.Warningf("failed to reconcile io.max for vol %s: %v", vol.Name, err) + } + } klog.Info("lvm volume already provisioned") return nil }