diff --git a/go.mod b/go.mod index f3e09a7..a7b9952 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( k8s.io/apimachinery v0.29.1 k8s.io/client-go v0.29.1 k8s.io/klog/v2 v2.110.1 - k8s.io/kubectl v0.29.1 sigs.k8s.io/controller-runtime v0.17.0 ) @@ -67,6 +66,7 @@ require ( k8s.io/apiextensions-apiserver v0.29.0 // indirect k8s.io/component-base v0.29.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect + k8s.io/kubectl v0.29.1 // indirect k8s.io/metrics v0.29.1 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/pkg/node-labels-resources/grpc_server.go b/pkg/node-labels-resources/grpc_server.go index 19ae778..5e07fd3 100644 --- a/pkg/node-labels-resources/grpc_server.go +++ b/pkg/node-labels-resources/grpc_server.go @@ -13,7 +13,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -22,7 +21,6 @@ import ( v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - resourcehelper "k8s.io/kubectl/pkg/util/resource" ctrl "sigs.k8s.io/controller-runtime" "github.com/liqotech/liqo/pkg/consts" @@ -34,7 +32,6 @@ import ( type NodeDetails struct { Schedulable bool Allocatable corev1.ResourceList - Pods map[string]corev1.ResourceList } type nodeLabelsMonitor struct { @@ -43,7 +40,6 @@ type nodeLabelsMonitor struct { subscribers sync.Map nodeLabels map[string]string k8sNodeClient v1.NodeInterface - k8sPodClient v1.PodInterface allocatable corev1.ResourceList nodeMutex sync.RWMutex ctx context.Context @@ -74,28 +70,16 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset options.LabelSelector = labelSelector.String() } - // this function is used to filter and ignore shadow pods at informer level. - var noShadowPodsFilter = func(options *metav1.ListOptions) { - req, err := labels.NewRequirement(consts.LocalPodLabelKey, selection.NotEquals, []string{consts.LocalPodLabelValue}) - utilruntime.Must(err) - options.LabelSelector = labels.NewSelector().Add(*req).String() - options.FieldSelector = fields.OneTermEqualSelector("status.phase", string(corev1.PodRunning)).String() - } - nodeFactory := informers.NewSharedInformerFactoryWithOptions( clientset, resyncPeriod, informers.WithTweakListOptions(noVirtualNodesFilter), ) nodeInformer := nodeFactory.Core().V1().Nodes().Informer() - podFactory := informers.NewSharedInformerFactoryWithOptions( - clientset, resyncPeriod, informers.WithTweakListOptions(noShadowPodsFilter), - ) - podInformer := podFactory.Core().V1().Pods().Informer() + s := nodeLabelsMonitor{ Server: grpc.NewServer(), nodeLabels: nodeLabels, allocatable: corev1.ResourceList{}, k8sNodeClient: clientset.CoreV1().Nodes(), - k8sPodClient: clientset.CoreV1().Pods(corev1.NamespaceAll), ctx: ctx, resourceLists: map[string]NodeDetails{}, } @@ -105,16 +89,8 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset DeleteFunc: s.onNodeDelete, }) utilruntime.Must(err) - _, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: s.onPodAdd, - // We do not care about update events, since resources are immutable. - DeleteFunc: s.onPodDelete, - }) - utilruntime.Must(err) nodeFactory.Start(ctx.Done()) nodeFactory.WaitForCacheSync(ctx.Done()) - podFactory.Start(ctx.Done()) - podFactory.WaitForCacheSync(ctx.Done()) resourcemonitors.RegisterResourceReaderServer(s.Server, &s) if err := s.Server.Serve(lis); err != nil { @@ -131,17 +107,8 @@ func (nlm *nodeLabelsMonitor) onNodeAdd(obj interface{}) { klog.V(4).Infof("Adding Node %s", node.Name) nlm.resourceLists[node.Name] = NodeDetails{ Allocatable: *toAdd, - Pods: make(map[string]corev1.ResourceList), Schedulable: utils.IsNodeReady(node) && !node.Spec.Unschedulable, } - pods, err := nlm.k8sPodClient.List(nlm.ctx, metav1.ListOptions{FieldSelector: "spec.nodeName=" + node.Name}) - if err != nil { - klog.Errorf("Failed to list pods for node %s: %v", node.Name, err) - return - } - for i := range pods.Items { - nlm.onPodAdd(&pods.Items[i]) - } nlm.writeClusterResources() } @@ -156,7 +123,6 @@ func (nlm *nodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) { if !ok { nlm.resourceLists[newNode.Name] = NodeDetails{ Allocatable: newNodeResources, - Pods: make(map[string]corev1.ResourceList), Schedulable: true, } } else { @@ -189,60 +155,14 @@ func (nlm *nodeLabelsMonitor) onNodeDelete(obj interface{}) { nlm.writeClusterResources() } -func (nlm *nodeLabelsMonitor) onPodAdd(obj interface{}) { - // Thanks to the filters at the informer level, add events are received only when pods running on physical nodes turn running. - podAdded, ok := obj.(*corev1.Pod) - if !ok { - klog.Error("OnPodAdd: Failed to cast to *corev1.Pod type") - return - } - podResources := extractPodResources(podAdded) - podNodeName := podAdded.Spec.NodeName - nodeDetail, ok := nlm.resourceLists[podNodeName] - if ok { - _, podOk := nodeDetail.Pods[podAdded.Name] - if !podOk { - nodeDetail.Pods[podAdded.Name] = podResources - nlm.resourceLists[podNodeName] = nodeDetail - } - } else { - klog.V(4).Infof("OnPodAdd: Failed to find node %s in resourceLists", podNodeName) - } - nlm.writeClusterResources() -} - -func (nlm *nodeLabelsMonitor) onPodDelete(obj interface{}) { - // Thanks to the filters at the informer level, delete events are received only when - // pods previously running on a physical node are no longer running. - podDeleted, ok := obj.(*corev1.Pod) - if !ok { - klog.Errorf("OnPodDelete: Failed to cast to *corev1.Pod type") - return - } - podNodeName := podDeleted.Spec.NodeName - nodeDetail, ok := nlm.resourceLists[podNodeName] - if ok { - delete(nodeDetail.Pods, podDeleted.Name) - nlm.resourceLists[podNodeName] = nodeDetail - } else { - klog.V(4).Infof("OnPodDelete: Failed to find node %s in resourceLists", podNodeName) - } - nlm.writeClusterResources() -} - func (nlm *nodeLabelsMonitor) writeClusterResources() { - podResourceUsage := corev1.ResourceList{} nodeAllocatable := corev1.ResourceList{} for _, nodeDetail := range nlm.resourceLists { if !nodeDetail.Schedulable { continue } addResources(nodeAllocatable, nodeDetail.Allocatable) - for _, podResource := range nodeDetail.Pods { - addResources(podResourceUsage, podResource) - } } - subResources(nodeAllocatable, podResourceUsage) nlm.nodeMutex.Lock() nlm.allocatable = nodeAllocatable.DeepCopy() klog.V(4).Infof("Cluster resources: %v", nlm.allocatable) @@ -253,11 +173,6 @@ func (nlm *nodeLabelsMonitor) writeClusterResources() { } } -func extractPodResources(podToExtract *corev1.Pod) corev1.ResourceList { - resourcesToExtract, _ := resourcehelper.PodRequestsAndLimits(podToExtract) - return resourcesToExtract -} - // ReadResources receives a clusterID and returns the resources for that specific clusterID. In this version of the resource plugin // the clusterID is ignored and the same resources are returned for every clusterID received. Since this method could be called multiple // times it has to be idempotent.