Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 160 additions & 107 deletions src/services/gcp/pkg/stub/cleaner/k8sCleaner.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package cleaner

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
appV1 "k8s.io/api/apps/v1"
batchV1 "k8s.io/api/batch/v1"
apiCoreV1 "k8s.io/api/core/v1"
apiExtV1Beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
typedAppV1 "k8s.io/client-go/kubernetes/typed/apps/v1"
typedBatchV1 "k8s.io/client-go/kubernetes/typed/batch/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/kubernetes/typed/extensions/v1beta1"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"fmt"
"os/exec"
"time"

"github.com/sirupsen/logrus"
appV1 "k8s.io/api/apps/v1"
batchV1 "k8s.io/api/batch/v1"
apiCoreV1 "k8s.io/api/core/v1"
apiExtV1Beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
typedAppV1 "k8s.io/client-go/kubernetes/typed/apps/v1"
typedBatchV1 "k8s.io/client-go/kubernetes/typed/batch/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/kubernetes/typed/extensions/v1beta1"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

// Attempts to remove all objects in a kubernetes cluster connected to persistent resources.
Expand Down Expand Up @@ -44,97 +45,129 @@ func NewK8sCleaner(cs kubernetes.Interface, log *logrus.Entry) *clusterCleaner {
func (cc *clusterCleaner) Cleanup() (bool, error) {
cc.log.Debug("Attempt to clean up cluster")

if isClean, err := cc.cleanAllNamespaces(cc.clientSet); err != nil {
cc.log.Error("couldn't clean all namespaces: ", err.Error())
return false, err
if isClean, err := cc.cleanAllIngressInNamespace(cc.clientSet); err != nil {
cc.log.Error("couldn't clean ingress: ", err.Error())
//return false, err
} else if !isClean { // only cleanup pods, pvc, and pv if all stateful sets, deployments, ... are gone
return false, nil
//return false, nil
}

if isClean, err := cc.cleanPodsInAllNamespaces(cc.clientSet); err != nil {
cc.log.Error("couldn't remove all pods: ", err.Error())
return false, nil
//return false, nil
} else if !isClean { // only cleanup pvc after all pods are gone
return false, nil
//return false, nil
}

if isClean, err := cc.cleanPvcsInAllNamespaces(cc.clientSet); err != nil {
cc.log.Error("couldn't remove all persistent volume claims: ", err.Error())
return false, nil
//return false, nil
} else if !isClean { // only cleanup pv after all claims are gone
return false, nil
//return false, nil
}

isClean, err := cc.deletePersistentVolumes(cc.pvIf)
if err != nil {
cc.log.Error("couldn't remove all persistent volumes: ", err.Error())
return false, err
//return false, err
}

return isClean, err
}

func (cc *clusterCleaner) cleanPvcsInAllNamespaces(clientSet kubernetes.Interface) (bool, error) {
namespaces, err := cc.nsIf.List(v1.ListOptions{})
if err != nil {
cc.log.Error("couldn't enlist all namespaces. err: ", err)
return false, err
}

outChan := make(chan *helperResultStruct, len(namespaces.Items))
for _, ns := range namespaces.Items {
if ns.GetName() == v1.NamespaceSystem {
continue
}
go func(nsName string, pvcIf corev1.PersistentVolumeClaimInterface, out chan *helperResultStruct) {
isClean, err := cc.cleanAllPvcInNamespace(nsName, pvcIf)
out <- &helperResultStruct{isClean, err}
}(ns.GetName(), clientSet.CoreV1().PersistentVolumeClaims(ns.GetName()), outChan)
}
func (cc *clusterCleaner) cleanServiceInAllNamespaces(clientSet kubernetes.Interface) (bool, error) {
cmd := exec.Command("bash", "-c", "kubectl delete service"+"--all")

numExpectedResults := len(namespaces.Items) - 1
allClean, err := cc.collectResults(time.Minute, numExpectedResults, outChan)
out, err := cmd.Output()

if err != nil {
cc.log.Errorf("Could not delete service in all namespace: %v, %v", err, out)
return false, err
}

return allClean, nil
return true, nil
}

type helperResultStruct struct {
isClean bool
err error
}

func (cc *clusterCleaner) cleanPodsInAllNamespaces(clientSet kubernetes.Interface) (bool, error) {
namespaces, err := cc.nsIf.List(v1.ListOptions{})
if err != nil {
cc.log.Error("couldn't enlist all namespaces. err: ", err)
return false, err
}
func (cc *clusterCleaner) cleanPvcsInAllNamespaces(clientSet kubernetes.Interface) (bool, error) {
/* namespaces, err := cc.nsIf.List(v1.ListOptions{})
if err != nil {
cc.log.Error("couldn't enlist all namespaces. err: ", err)
return false, err
}

outChan := make(chan *helperResultStruct, len(namespaces.Items))
for _, ns := range namespaces.Items {
if ns.GetName() == v1.NamespaceSystem {
continue
outChan := make(chan *helperResultStruct, len(namespaces.Items))
for _, ns := range namespaces.Items {
if ns.GetName() == v1.NamespaceSystem {
continue
}
go func(nsName string, pvcIf corev1.PersistentVolumeClaimInterface, out chan *helperResultStruct) {
isClean, err := cc.cleanAllPvcInNamespace(nsName, pvcIf)
out <- &helperResultStruct{isClean, err}
}(ns.GetName(), clientSet.CoreV1().PersistentVolumeClaims(ns.GetName()), outChan)
}
go func(nsName string, podIf corev1.PodInterface, out chan *helperResultStruct) {
isClean, err := cc.cleanAllPodsInNamespace(nsName, podIf)
out <- &helperResultStruct{isClean, err}
}(ns.GetName(), clientSet.CoreV1().Pods(ns.GetName()), outChan)
}

numExpectedResults := len(namespaces.Items) - 1
allClean, err := cc.collectResults(time.Minute, numExpectedResults, outChan)
numExpectedResults := len(namespaces.Items) - 1
allClean, err := cc.collectResults(time.Minute, numExpectedResults, outChan)

if err != nil {
return false, err
}

return allClean, nil*/
cmd := exec.Command("bash", "-c", "kubectl delete pvc"+"--all")

out, err := cmd.Output()

if err != nil {
cc.log.Errorf("Could not delete pvc in all namespace: %v, %v", err, out)
return false, err
}

return allClean, nil
return true, nil

}

type helperResultStruct struct {
isClean bool
err error
func (cc *clusterCleaner) cleanPodsInAllNamespaces(clientSet kubernetes.Interface) (bool, error) {
/*namespaces, err := cc.nsIf.List(v1.ListOptions{})
if err != nil {
cc.log.Error("couldn't enlist all namespaces. err: ", err)
return false, err
}

outChan := make(chan *helperResultStruct, len(namespaces.Items))
for _, ns := range namespaces.Items {
if ns.GetName() == v1.NamespaceSystem {
continue
}
go func(nsName string, podIf corev1.PodInterface, out chan *helperResultStruct) {
isClean, err := cc.cleanAllPodsInNamespace(nsName, podIf)
out <- &helperResultStruct{isClean, err}
}(ns.GetName(), clientSet.CoreV1().Pods(ns.GetName()), outChan)
}

numExpectedResults := len(namespaces.Items) - 1
allClean, err := cc.collectResults(time.Minute, numExpectedResults, outChan)

if err != nil {
return false, err
}

return allClean, nil*/
cmd := exec.Command("bash", "-c", "kubectl delete pod"+"--all")

out, err := cmd.Output()

if err != nil {
cc.log.Errorf("Could not delete pod in all namespace: %v, %v", err, out)
return false, err
}

return true, nil
}

func (cc *clusterCleaner) cleanAllNamespaces(clientSet kubernetes.Interface) (bool, error) {
Expand Down Expand Up @@ -209,10 +242,10 @@ func (cc *clusterCleaner) cleanupNamespace(ns string,

results := make(chan *helperResultStruct, 8)

go func() {
/*go func() {
ingressClean, ingErr := cc.cleanAllIngressInNamespace(ns, ingIf)
results <- &helperResultStruct{ingressClean, ingErr}
}()
}()*/

go func() {
deploymentClean, podErr := cc.cleanAllStatefulSetInNamespace(ns, statefulSetIf)
Expand Down Expand Up @@ -353,30 +386,40 @@ func (cc *clusterCleaner) enableStatefulSetForceDeleteIfNecessary(statefulSet *a
return nil
}

func (cc *clusterCleaner) cleanAllIngressInNamespace(ns string, ingIf v1beta1.IngressInterface) (bool, error) {
list, err := ingIf.List(v1.ListOptions{IncludeUninitialized: true})
if err != nil {
cc.log.Errorf("couldn't list all ingresses in the namespace %s. err: %s", ns, err.Error())
return false, err
}
if len(list.Items) == 0 {
return true, nil
}
func (cc *clusterCleaner) cleanAllIngressInNamespace(clientSet kubernetes.Interface) (bool, error) {
/* list, err := ingIf.List(v1.ListOptions{IncludeUninitialized: true})
if err != nil {
cc.log.Errorf("couldn't list all ingresses in the namespace %s. err: %s", ns, err.Error())
return false, err
}
if len(list.Items) == 0 {
return true, nil
}

now := time.Now()
for i := range list.Items {
if err := cc.enableIngressForceDeleteIfNecessary(&list.Items[i], now, ns, ingIf); err != nil {
now := time.Now()
for i := range list.Items {
if err := cc.enableIngressForceDeleteIfNecessary(&list.Items[i], now, ns, ingIf); err != nil {
return false, err
}
}

delPol := v1.DeletePropagationForeground
if err := ingIf.DeleteCollection(&v1.DeleteOptions{PropagationPolicy: &delPol}, v1.ListOptions{IncludeUninitialized: true}); err != nil {
cc.log.Error("couldn't delete all ingresses. err: ", err)
return false, err
}
}

delPol := v1.DeletePropagationForeground
if err := ingIf.DeleteCollection(&v1.DeleteOptions{PropagationPolicy: &delPol}, v1.ListOptions{IncludeUninitialized: true}); err != nil {
cc.log.Error("couldn't delete all ingresses. err: ", err)
return false, err
return false, nil*/
cmd := exec.Command("bash", "-c", "kubectl delete ingress"+"--all")

out, err := cmd.Output()

if err != nil {
cc.log.Errorf("Could not clean ingress in all-namespaces: %v, %v", err, out)
//return false, err
}

return false, nil
return true, err
}

func (cc *clusterCleaner) enableIngressForceDeleteIfNecessary(ingress *apiExtV1Beta1.Ingress, now time.Time, ns string, ingIf v1beta1.IngressInterface) error {
Expand Down Expand Up @@ -594,36 +637,46 @@ func (cc *clusterCleaner) enablePodForceDeleteIfNecessary(pod *apiCoreV1.Pod, no
}

func (cc *clusterCleaner) deletePersistentVolumes(pvIf corev1.PersistentVolumeInterface) (bool, error) {
cc.log.Debug("clean up all pv")
/*cc.log.Debug("clean up all pv")

list, err := pvIf.List(v1.ListOptions{IncludeUninitialized: true})
if err != nil {
cc.log.Error("couldn't list all persistent volume claims. err: ", err)
return false, err
}
list, err := pvIf.List(v1.ListOptions{IncludeUninitialized: true})
if err != nil {
cc.log.Error("couldn't list all persistent volume claims. err: ", err)
return false, err
}

if len(list.Items) == 0 {
cc.log.Debugf("no pvs are left")
return true, nil
}
if len(list.Items) == 0 {
cc.log.Debugf("no pvs are left")
return true, nil
}

cc.log.Debugf("remove %d pvs", len(list.Items))
cc.log.Debugf("remove %d pvs", len(list.Items))

// we try a force-delete -> remove finalizers if existent
now := time.Now()
for i := range list.Items {
if err := cc.enablePVForceDeleteIfNecessary(&list.Items[i], now, pvIf); err != nil {
return false, err
}
}
// we try a force-delete -> remove finalizers if existent
now := time.Now()
for i := range list.Items {
if err := cc.enablePVForceDeleteIfNecessary(&list.Items[i], now, pvIf); err != nil {
return false, err
}
}

delPol := v1.DeletePropagationForeground
if err := pvIf.DeleteCollection(&v1.DeleteOptions{PropagationPolicy: &delPol}, v1.ListOptions{IncludeUninitialized: true}); err != nil {
cc.log.Error("couldn't delete all persistent volumes. err: ", err)
delPol := v1.DeletePropagationForeground
if err := pvIf.DeleteCollection(&v1.DeleteOptions{PropagationPolicy: &delPol}, v1.ListOptions{IncludeUninitialized: true}); err != nil {
cc.log.Error("couldn't delete all persistent volumes. err: ", err)
return false, err
}

return false, nil*/
cmd := exec.Command("bash", "-c", "kubectl delete pv"+"--all")

out, err := cmd.Output()

if err != nil {
cc.log.Error("Could not delete pv in all namespace: %v, %v", err, out)
return false, err
}

return false, nil
return true, nil
}

func (cc *clusterCleaner) enablePVForceDeleteIfNecessary(pv *apiCoreV1.PersistentVolume, now time.Time, pvIf corev1.PersistentVolumeInterface) error {
Expand Down