Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

Commit

Permalink
Fix Node Selector Resource Plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Sharathmk99 committed Feb 12, 2024
1 parent 1e16d9c commit e4cc070
Showing 1 changed file with 1 addition and 84 deletions.
85 changes: 1 addition & 84 deletions pkg/node-labels-resources/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
type NodeDetails struct {
Schedulable bool
Allocatable corev1.ResourceList
Pods map[string]corev1.ResourceList
}

type nodeLabelsMonitor struct {
Expand All @@ -43,7 +42,6 @@ type nodeLabelsMonitor struct {
subscribers sync.Map
nodeLabels map[string]string
k8sNodeClient v1.NodeInterface
k8sPodClient v1.PodInterface
allocatable corev1.ResourceList
nodeMutex sync.RWMutex
ctx context.Context
Expand Down Expand Up @@ -74,28 +72,16 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset
options.LabelSelector = labelSelector.String()
}

// this function is used to filter and ignore shadow pods at informer level.
var noShadowPodsFilter = func(options *metav1.ListOptions) {
req, err := labels.NewRequirement(consts.LocalPodLabelKey, selection.NotEquals, []string{consts.LocalPodLabelValue})
utilruntime.Must(err)
options.LabelSelector = labels.NewSelector().Add(*req).String()
options.FieldSelector = fields.OneTermEqualSelector("status.phase", string(corev1.PodRunning)).String()
}

nodeFactory := informers.NewSharedInformerFactoryWithOptions(
clientset, resyncPeriod, informers.WithTweakListOptions(noVirtualNodesFilter),
)
nodeInformer := nodeFactory.Core().V1().Nodes().Informer()
podFactory := informers.NewSharedInformerFactoryWithOptions(
clientset, resyncPeriod, informers.WithTweakListOptions(noShadowPodsFilter),
)
podInformer := podFactory.Core().V1().Pods().Informer()

s := nodeLabelsMonitor{
Server: grpc.NewServer(),
nodeLabels: nodeLabels,
allocatable: corev1.ResourceList{},
k8sNodeClient: clientset.CoreV1().Nodes(),
k8sPodClient: clientset.CoreV1().Pods(corev1.NamespaceAll),
ctx: ctx,
resourceLists: map[string]NodeDetails{},
}
Expand All @@ -105,16 +91,8 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset
DeleteFunc: s.onNodeDelete,
})
utilruntime.Must(err)
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onPodAdd,
// We do not care about update events, since resources are immutable.
DeleteFunc: s.onPodDelete,
})
utilruntime.Must(err)
nodeFactory.Start(ctx.Done())
nodeFactory.WaitForCacheSync(ctx.Done())
podFactory.Start(ctx.Done())
podFactory.WaitForCacheSync(ctx.Done())

resourcemonitors.RegisterResourceReaderServer(s.Server, &s)
if err := s.Server.Serve(lis); err != nil {
Expand All @@ -131,17 +109,8 @@ func (nlm *nodeLabelsMonitor) onNodeAdd(obj interface{}) {
klog.V(4).Infof("Adding Node %s", node.Name)
nlm.resourceLists[node.Name] = NodeDetails{
Allocatable: *toAdd,
Pods: make(map[string]corev1.ResourceList),
Schedulable: utils.IsNodeReady(node) && !node.Spec.Unschedulable,
}
pods, err := nlm.k8sPodClient.List(nlm.ctx, metav1.ListOptions{FieldSelector: "spec.nodeName=" + node.Name})
if err != nil {
klog.Errorf("Failed to list pods for node %s: %v", node.Name, err)
return
}
for i := range pods.Items {
nlm.onPodAdd(&pods.Items[i])
}
nlm.writeClusterResources()
}

Expand All @@ -156,7 +125,6 @@ func (nlm *nodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) {
if !ok {
nlm.resourceLists[newNode.Name] = NodeDetails{
Allocatable: newNodeResources,
Pods: make(map[string]corev1.ResourceList),
Schedulable: true,
}
} else {
Expand Down Expand Up @@ -189,60 +157,14 @@ func (nlm *nodeLabelsMonitor) onNodeDelete(obj interface{}) {
nlm.writeClusterResources()
}

func (nlm *nodeLabelsMonitor) onPodAdd(obj interface{}) {
// Thanks to the filters at the informer level, add events are received only when pods running on physical nodes turn running.
podAdded, ok := obj.(*corev1.Pod)
if !ok {
klog.Error("OnPodAdd: Failed to cast to *corev1.Pod type")
return
}
podResources := extractPodResources(podAdded)
podNodeName := podAdded.Spec.NodeName
nodeDetail, ok := nlm.resourceLists[podNodeName]
if ok {
_, podOk := nodeDetail.Pods[podAdded.Name]
if !podOk {
nodeDetail.Pods[podAdded.Name] = podResources
nlm.resourceLists[podNodeName] = nodeDetail
}
} else {
klog.V(4).Infof("OnPodAdd: Failed to find node %s in resourceLists", podNodeName)
}
nlm.writeClusterResources()
}

func (nlm *nodeLabelsMonitor) onPodDelete(obj interface{}) {
// Thanks to the filters at the informer level, delete events are received only when
// pods previously running on a physical node are no longer running.
podDeleted, ok := obj.(*corev1.Pod)
if !ok {
klog.Errorf("OnPodDelete: Failed to cast to *corev1.Pod type")
return
}
podNodeName := podDeleted.Spec.NodeName
nodeDetail, ok := nlm.resourceLists[podNodeName]
if ok {
delete(nodeDetail.Pods, podDeleted.Name)
nlm.resourceLists[podNodeName] = nodeDetail
} else {
klog.V(4).Infof("OnPodDelete: Failed to find node %s in resourceLists", podNodeName)
}
nlm.writeClusterResources()
}

func (nlm *nodeLabelsMonitor) writeClusterResources() {
podResourceUsage := corev1.ResourceList{}
nodeAllocatable := corev1.ResourceList{}
for _, nodeDetail := range nlm.resourceLists {
if !nodeDetail.Schedulable {
continue
}
addResources(nodeAllocatable, nodeDetail.Allocatable)
for _, podResource := range nodeDetail.Pods {
addResources(podResourceUsage, podResource)
}
}
subResources(nodeAllocatable, podResourceUsage)
nlm.nodeMutex.Lock()
nlm.allocatable = nodeAllocatable.DeepCopy()
klog.V(4).Infof("Cluster resources: %v", nlm.allocatable)
Expand All @@ -253,11 +175,6 @@ func (nlm *nodeLabelsMonitor) writeClusterResources() {
}
}

func extractPodResources(podToExtract *corev1.Pod) corev1.ResourceList {
resourcesToExtract, _ := resourcehelper.PodRequestsAndLimits(podToExtract)
return resourcesToExtract
}

// ReadResources receives a clusterID and returns the resources for that specific clusterID. In this version of the resource plugin
// the clusterID is ignored and the same resources are returned for every clusterID received. Since this method could be called multiple
// times it has to be idempotent.
Expand Down

0 comments on commit e4cc070

Please sign in to comment.