Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/datavolume/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/monitoring/metrics/cdi-cloner:go_default_library",
"//pkg/monitoring/metrics/cdi-controller:go_default_library",
"//pkg/monitoring/metrics/cdi-importer:go_default_library",
"//pkg/storagecapabilities:go_default_library",
"//pkg/token:go_default_library",
"//pkg/util:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/datavolume/clone-controller-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,22 @@ func (r *CloneReconcilerBase) updateStatusPhase(pvc *corev1.PersistentVolumeClai
return r.updateStatusPhaseForPopulator(pvc, dataVolumeCopy, event)
}

// Non-populator CSI clone: if PVC was created with DataSource/Ref pointing to a PVC or VolumeSnapshot, reflect CSI progress
if (pvc.Spec.DataSource != nil && (pvc.Spec.DataSource.Kind == "PersistentVolumeClaim" || pvc.Spec.DataSource.Kind == "VolumeSnapshot")) ||
(pvc.Spec.DataSourceRef != nil && (pvc.Spec.DataSourceRef.Kind == "PersistentVolumeClaim" || pvc.Spec.DataSourceRef.Kind == "VolumeSnapshot")) {
// When PVC is pending/binding under CSI, report CSICloneInProgress
if pvc.Status.Phase == corev1.ClaimPending || pvc.Status.Phase == "" {
dataVolumeCopy.Status.Phase = cdiv1.CSICloneInProgress
if dataVolumeCopy.Status.Progress == "" {
dataVolumeCopy.Status.Progress = "N/A"
}
event.eventType = corev1.EventTypeNormal
event.reason = CSICloneInProgress
event.message = fmt.Sprintf(MessageCsiCloneInProgress, sourceNamespace, sourceName)
return nil
}
}

phase, ok := pvc.Annotations[cc.AnnPodPhase]
if phase != string(corev1.PodSucceeded) {
_, ok = pvc.Annotations[cc.AnnCloneRequest]
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/datavolume/controller-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ func (r *ReconcilerBase) updateStatus(req reconcile.Request, phaseSync *statusPh
}
} else {
dataVolumeCopy.Status.Phase = cdiv1.Succeeded
dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(cc.ProgressDone)
}
case corev1.ClaimBound:
switch dataVolumeCopy.Status.Phase {
Expand All @@ -954,6 +955,7 @@ func (r *ReconcilerBase) updateStatus(req reconcile.Request, phaseSync *statusPh
}
} else {
dataVolumeCopy.Status.Phase = cdiv1.Succeeded
dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(cc.ProgressDone)
}

case corev1.ClaimLost:
Expand Down
117 changes: 112 additions & 5 deletions pkg/controller/datavolume/pvc-clone-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ import (

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
clone "kubevirt.io/containerized-data-importer/pkg/controller/clone"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
"kubevirt.io/containerized-data-importer/pkg/storagecapabilities"
)

const (
Expand Down Expand Up @@ -325,7 +327,9 @@ func (r *PvcCloneReconciler) syncClone(log logr.Logger, req reconcile.Request) (
}
}

finalPvcSpec := pvcSpec
pvcModifier := r.updateAnnotations

if syncRes.usePopulator {
if isCrossNamespaceClone(datavolume) {
if !cc.HasFinalizer(datavolume, crossNamespaceFinalizer) {
Expand All @@ -334,9 +338,27 @@ func (r *PvcCloneReconciler) syncClone(log logr.Logger, req reconcile.Request) (
}
}
pvcModifier = r.updatePVCForPopulation
} else {
// Check if we can use CSI clone when populators are disabled
sc, _ := clone.GetStorageClassForClaim(context.TODO(), r.client, &corev1.PersistentVolumeClaim{Spec: *pvcSpec})
if sc != nil {
if advised, ok := storagecapabilities.GetAdvisedCloneStrategy(sc); ok && advised == cdiv1.CloneStrategyCsiClone {
sourcePvc, err := r.findSourcePvc(datavolume)
if err != nil {
return syncRes, err
}
csiCloneSpec, err := r.newVolumeCloneFromPVCSpec(datavolume, sourcePvc, pvcSpec)
if err != nil {
return syncRes, err
}
finalPvcSpec = csiCloneSpec
pvcModifier = r.updateCSICloneFromPVCAnnotations
}
}
}

newPvc, err := r.createPvcForDatavolume(datavolume, pvcSpec, pvcModifier)
// Create PVC with determined spec and modifier
pvc, err = r.createPvcForDatavolume(datavolume, finalPvcSpec, pvcModifier)
if err != nil {
if cc.ErrQuotaExceeded(err) {
syncErr = r.syncDataVolumeStatusPhaseWithEvent(&syncRes, cdiv1.Pending, nil,
Expand All @@ -351,7 +373,6 @@ func (r *PvcCloneReconciler) syncClone(log logr.Logger, req reconcile.Request) (
}
return syncRes, err
}
pvc = newPvc
}

if syncRes.usePopulator {
Expand All @@ -364,19 +385,105 @@ func (r *PvcCloneReconciler) syncClone(log logr.Logger, req reconcile.Request) (
cc.AddAnnotation(datavolume, cc.AnnCloneType, ct)
}
} else {
cc.AddAnnotation(datavolume, cc.AnnCloneType, string(cdiv1.CloneStrategyHostAssisted))
if err := r.fallbackToHostAssisted(pvc); err != nil {
return syncRes, err
// Determine clone strategy and annotate DV accordingly
sc, _ := clone.GetStorageClassForClaim(context.TODO(), r.client, pvc)
var advised *cdiv1.CDICloneStrategy
if sc != nil {
if s, ok := storagecapabilities.GetAdvisedCloneStrategy(sc); ok {
advised = &s
}
}
if advised != nil && *advised == cdiv1.CloneStrategyCsiClone &&
((pvc.Spec.DataSource != nil && pvc.Spec.DataSource.Kind == "PersistentVolumeClaim") ||
(pvc.Spec.DataSourceRef != nil && pvc.Spec.DataSourceRef.Kind == "PersistentVolumeClaim")) {
cc.AddAnnotation(datavolume, cc.AnnCloneType, string(cdiv1.CloneStrategyCsiClone))
if pvc.Status.Phase == corev1.ClaimBound {
if pvc.Annotations == nil || pvc.Annotations[cc.AnnPopulatedFor] != datavolume.Name {
pvcCpy := pvc.DeepCopy()
cc.AddAnnotation(pvcCpy, cc.AnnPopulatedFor, datavolume.Name)
if err := r.updatePVC(pvcCpy); err != nil {
return syncRes, err
}
pvc = pvcCpy
}
}
} else {
cc.AddAnnotation(datavolume, cc.AnnCloneType, string(cdiv1.CloneStrategyHostAssisted))
if err := r.fallbackToHostAssisted(pvc); err != nil {
return syncRes, err
}
}
}

if err := r.ensureExtendedTokenPVC(datavolume, pvc); err != nil {
return syncRes, err
}

if !syncRes.usePopulator {
cloneType, ok := datavolume.Annotations[cc.AnnCloneType]
if ok && cloneType == string(cdiv1.CloneStrategyCsiClone) {
shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
if err != nil {
return syncRes, err
}

switch pvc.Status.Phase {
case corev1.ClaimBound:
return syncRes, nil
case corev1.ClaimPending:
r.log.V(3).Info("ClaimPending CSIClone")
if !shouldBeMarkedWaitForFirstConsumer {
return syncRes, r.syncCloneStatusPhase(&syncRes, cdiv1.CSICloneInProgress, pvc)
}
case corev1.ClaimLost:
return syncRes,
r.syncDataVolumeStatusPhaseWithEvent(&syncRes, cdiv1.Failed, pvc,
Event{
eventType: corev1.EventTypeWarning,
reason: "ClaimLost",
message: fmt.Sprintf("PVC %s lost", pvc.Name),
})
}
}
}

return syncRes, syncErr
}

// newVolumeCloneFromPVCSpec creates a PVC spec for CSI clone from another PVC
func (r *PvcCloneReconciler) newVolumeCloneFromPVCSpec(dv *cdiv1.DataVolume, sourcePvc *corev1.PersistentVolumeClaim, targetPvcSpec *corev1.PersistentVolumeClaimSpec) (*corev1.PersistentVolumeClaimSpec, error) {
csiCloneSpec := targetPvcSpec.DeepCopy()

if csiCloneSpec.Resources.Requests == nil {
csiCloneSpec.Resources.Requests = corev1.ResourceList{}
}
if sourcePvc.Status.Capacity != nil {
if sourceSize := sourcePvc.Status.Capacity.Storage(); sourceSize != nil {
csiCloneSpec.Resources.Requests[corev1.ResourceStorage] = *sourceSize
}
}

// Set DataSource to point to the source PVC
csiCloneSpec.DataSource = &corev1.TypedLocalObjectReference{
Kind: "PersistentVolumeClaim",
Name: dv.Spec.Source.PVC.Name,
}

return csiCloneSpec, nil
}

// updateCSICloneFromPVCAnnotations sets CSI clone from PVC specific annotations
func (r *PvcCloneReconciler) updateCSICloneFromPVCAnnotations(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
if dataVolume.Spec.Source.PVC == nil {
return errors.Errorf("no PVC source set for clone datavolume")
}
if err := addCloneToken(dataVolume, pvc); err != nil {
return err
}
pvc.Annotations[AnnCSICloneRequest] = "true"
return nil
}

// Verify that the source PVC has been completely populated.
func (r *PvcCloneReconciler) isSourcePVCPopulated(dv *cdiv1.DataVolume) (bool, error) {
sourcePvc := &corev1.PersistentVolumeClaim{}
Expand Down
109 changes: 105 additions & 4 deletions pkg/controller/datavolume/snapshot-clone-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ import (

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
clone "kubevirt.io/containerized-data-importer/pkg/controller/clone"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
"kubevirt.io/containerized-data-importer/pkg/storagecapabilities"
"kubevirt.io/containerized-data-importer/pkg/util"
)

Expand Down Expand Up @@ -222,7 +224,9 @@ func (r *SnapshotCloneReconciler) syncSnapshotClone(log logr.Logger, req reconci
}
}

finalPvcSpec := pvcSpec
pvcModifier := r.updateAnnotations

if syncRes.usePopulator {
if isCrossNamespaceClone(datavolume) {
if !cc.HasFinalizer(datavolume, crossNamespaceFinalizer) {
Expand All @@ -232,12 +236,25 @@ func (r *SnapshotCloneReconciler) syncSnapshotClone(log logr.Logger, req reconci
}
pvcModifier = r.updatePVCForPopulation
} else {
// Check if we can use CSI clone when populators are disabled
sc, _ := clone.GetStorageClassForClaim(context.TODO(), r.client, &corev1.PersistentVolumeClaim{Spec: *pvcSpec})
if sc != nil {
if advised, ok := storagecapabilities.GetAdvisedCloneStrategy(sc); ok && advised == cdiv1.CloneStrategyCsiClone {
csiCloneSpec, err := r.newVolumeCloneFromSnapshotSpec(datavolume, syncRes.snapshot, pvcSpec)
if err != nil {
return syncRes, err
}
finalPvcSpec = csiCloneSpec
pvcModifier = r.updateCSICloneFromSnapshotAnnotations
}
}

if err := r.initLegacyClone(&syncRes); err != nil {
return syncRes, err
}
}

targetPvc, err := r.createPvcForDatavolume(datavolume, pvcSpec, pvcModifier)
targetPvc, err := r.createPvcForDatavolume(datavolume, finalPvcSpec, pvcModifier)
if err != nil {
if cc.ErrQuotaExceeded(err) {
syncErr = r.syncDataVolumeStatusPhaseWithEvent(&syncRes, cdiv1.Pending, nil,
Expand Down Expand Up @@ -265,16 +282,68 @@ func (r *SnapshotCloneReconciler) syncSnapshotClone(log logr.Logger, req reconci
cc.AddAnnotation(datavolume, cc.AnnCloneType, ct)
}
} else {
cc.AddAnnotation(datavolume, cc.AnnCloneType, string(cdiv1.CloneStrategyHostAssisted))
if err := r.fallbackToHostAssisted(pvc); err != nil {
return syncRes, err
// When UsePopulator is false, determine clone strategy and annotate DV accordingly
sc, _ := clone.GetStorageClassForClaim(context.TODO(), r.client, pvc)
var advised *cdiv1.CDICloneStrategy
if sc != nil {
if s, ok := storagecapabilities.GetAdvisedCloneStrategy(sc); ok {
advised = &s
}
}
if advised != nil && *advised == cdiv1.CloneStrategyCsiClone &&
pvc.Spec.DataSourceRef != nil && pvc.Spec.DataSourceRef.Kind == "VolumeSnapshot" {
cc.AddAnnotation(datavolume, cc.AnnCloneType, string(cdiv1.CloneStrategyCsiClone))
if pvc.Status.Phase == corev1.ClaimBound {
if pvc.Annotations == nil || pvc.Annotations[cc.AnnPopulatedFor] != datavolume.Name {
pvcCpy := pvc.DeepCopy()
cc.AddAnnotation(pvcCpy, cc.AnnPopulatedFor, datavolume.Name)
if err := r.updatePVC(pvcCpy); err != nil {
return syncRes, err
}
pvc = pvcCpy
}
}
} else {
cc.AddAnnotation(datavolume, cc.AnnCloneType, string(cdiv1.CloneStrategyHostAssisted))
if err := r.fallbackToHostAssisted(pvc); err != nil {
return syncRes, err
}
}
}

if err := r.ensureExtendedTokenPVC(datavolume, pvc); err != nil {
return syncRes, err
}

if !syncRes.usePopulator {
cloneType, ok := datavolume.Annotations[cc.AnnCloneType]
if ok && cloneType == string(cdiv1.CloneStrategyCsiClone) {
shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
if err != nil {
return syncRes, err
}

switch pvc.Status.Phase {
case corev1.ClaimBound:
// PVC is bound, CSI clone completed successfully
return syncRes, nil
case corev1.ClaimPending:
r.log.V(3).Info("ClaimPending CSI clone from snapshot")
if !shouldBeMarkedWaitForFirstConsumer {
return syncRes, r.syncCloneStatusPhase(&syncRes, cdiv1.CSICloneInProgress, pvc)
}
case corev1.ClaimLost:
return syncRes,
r.syncDataVolumeStatusPhaseWithEvent(&syncRes, cdiv1.Failed, pvc,
Event{
eventType: corev1.EventTypeWarning,
reason: "ClaimLost",
message: fmt.Sprintf("PVC %s lost", pvc.Name),
})
}
}
}

return syncRes, syncErr
}

Expand Down Expand Up @@ -642,3 +711,35 @@ func newPvcFromSnapshot(obj metav1.Object, name string, snapshot *snapshotv1.Vol

return target, nil
}

// newVolumeCloneFromSnapshotSpec creates a PVC spec for CSI clone from VolumeSnapshot
func (r *SnapshotCloneReconciler) newVolumeCloneFromSnapshotSpec(dv *cdiv1.DataVolume, snapshot *snapshotv1.VolumeSnapshot, targetPvcSpec *corev1.PersistentVolumeClaimSpec) (*corev1.PersistentVolumeClaimSpec, error) {
csiCloneSpec := targetPvcSpec.DeepCopy()

if csiCloneSpec.Resources.Requests == nil {
csiCloneSpec.Resources.Requests = corev1.ResourceList{}
}
if snapshot.Status != nil && snapshot.Status.RestoreSize != nil {
csiCloneSpec.Resources.Requests[corev1.ResourceStorage] = *snapshot.Status.RestoreSize
}

csiCloneSpec.DataSourceRef = &corev1.TypedObjectReference{
APIGroup: ptr.To[string]("snapshot.storage.k8s.io"),
Kind: "VolumeSnapshot",
Name: dv.Spec.Source.Snapshot.Name,
}

return csiCloneSpec, nil
}

// updateCSICloneFromSnapshotAnnotations sets CSI clone from snapshot specific annotations
func (r *SnapshotCloneReconciler) updateCSICloneFromSnapshotAnnotations(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) error {
if dataVolume.Spec.Source.Snapshot == nil {
return errors.Errorf("no snapshot source set for clone datavolume")
}
if err := addCloneToken(dataVolume, pvc); err != nil {
return err
}
pvc.Annotations[AnnCSICloneRequest] = "true"
return nil
}