Skip to content

Commit

Permalink
nfd-gc: Remove stale NRT objects
Browse files Browse the repository at this point in the history
Remove stale NRT objects whose creator pod does not exist anymore.

Signed-off-by: Oleg Zhurakivskyy <[email protected]>
  • Loading branch information
ozhuraki committed Aug 26, 2024
1 parent 4db3216 commit 9bc556d
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
command:
- "nfd-topology-updater"
args:
Expand Down
46 changes: 37 additions & 9 deletions pkg/nfd-gc/nfd-gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nfdgarbagecollector
import (
"context"
"fmt"
"strings"
"time"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
Expand All @@ -28,12 +29,15 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
k8sclient "k8s.io/client-go/kubernetes"
metadataclient "k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
"sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)
Expand All @@ -42,6 +46,7 @@ var (
gvrNF = nfdv1alpha1.SchemeGroupVersion.WithResource("nodefeatures")
gvrNRT = topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
gvrNode = corev1.SchemeGroupVersion.WithResource("nodes")
gcNRTs = map[string]string{}
)

// Args are the command line arguments
Expand All @@ -57,10 +62,11 @@ type NfdGarbageCollector interface {
}

type nfdGarbageCollector struct {
args *Args
stopChan chan struct{}
client metadataclient.Interface
factory metadatainformer.SharedInformerFactory
args *Args
stopChan chan struct{}
client metadataclient.Interface
factory metadatainformer.SharedInformerFactory
k8sClient k8sclient.Interface
}

func New(args *Args) (NfdGarbageCollector, error) {
Expand All @@ -70,12 +76,14 @@ func New(args *Args) (NfdGarbageCollector, error) {
}

cli := metadataclient.NewForConfigOrDie(kubeconfig)
clientset := kubernetes.NewForConfigOrDie(kubeconfig)

return &nfdGarbageCollector{
args: args,
stopChan: make(chan struct{}),
client: cli,
factory: metadatainformer.NewSharedInformerFactory(cli, 0),
args: args,
stopChan: make(chan struct{}),
client: cli,
factory: metadatainformer.NewSharedInformerFactory(cli, 0),
k8sClient: clientset,
}, nil
}

Expand Down Expand Up @@ -190,7 +198,27 @@ func (n *nfdGarbageCollector) garbageCollect() {

// Handle NodeResourceTopology objects
listAndHandle(gvrNRT, func(meta metav1.PartialObjectMetadata) {
if !nodeNames.Has(meta.Name) {
deleteNRT := false

if label, ok := meta.Annotations[nfdtopologyupdater.NRTOwnerPodAnnotation]; ok {
if s := strings.Split(label, "_"); len(s) == 2 {
ns := s[0]
pod := s[1]
_, err := n.k8sClient.CoreV1().Pods(ns).Get(context.TODO(), pod, metav1.GetOptions{})
if errors.IsNotFound(err) {
if _, ok := gcNRTs[meta.Name]; !ok {
gcNRTs[meta.Name] = label
} else {
delete(gcNRTs, meta.Name)
deleteNRT = true
}
} else if err != nil {
klog.ErrorS(err, "failed to get Pod object")
}
}
}

if !nodeNames.Has(meta.Name) || deleteNRT {
n.deleteNRT(meta.Name)
}
})
Expand Down
72 changes: 62 additions & 10 deletions pkg/nfd-gc/nfd-gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
fakek8sclientset "k8s.io/client-go/kubernetes/fake"
metadataclient "k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/fake"
fakemetadataclient "k8s.io/client-go/metadata/fake"
Expand All @@ -37,7 +38,7 @@ import (

func TestNRTGC(t *testing.T) {
Convey("When theres is old NRT ", t, func() {
gc := newMockGC(nil, []string{"node1"})
gc := newMockGC(nil, []string{"node1"}, []string{"pod1"})

errChan := make(chan error)
go func() { errChan <- gc.Run() }()
Expand All @@ -48,7 +49,7 @@ func TestNRTGC(t *testing.T) {
So(<-errChan, ShouldBeNil)
})
Convey("When theres is one old NRT and one up to date", t, func() {
gc := newMockGC([]string{"node1"}, []string{"node1", "node2"})
gc := newMockGC([]string{"node1"}, []string{"node1", "node2"}, []string{"pod1", "pod2"})

errChan := make(chan error)
go func() { errChan <- gc.Run() }()
Expand All @@ -59,7 +60,7 @@ func TestNRTGC(t *testing.T) {
So(<-errChan, ShouldBeNil)
})
Convey("Should react to delete event", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"})
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"})

errChan := make(chan error)
go func() { errChan <- gc.Run() }()
Expand All @@ -71,7 +72,7 @@ func TestNRTGC(t *testing.T) {
So(waitForNRT(gc.client, "node2"), ShouldBeTrue)
})
Convey("periodic GC should remove obsolete NRT", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"})
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"})
// Override period to run fast
gc.args.GCPeriod = 100 * time.Millisecond

Expand All @@ -86,26 +87,46 @@ func TestNRTGC(t *testing.T) {

So(waitForNRT(gc.client, "node1", "node2"), ShouldBeTrue)
})
Convey("periodic GC should remove stale NRT", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"})
// Override period to run fast
gc.args.GCPeriod = 100 * time.Millisecond

nrt := createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", "not-existing")
nrt.ObjectMeta.Annotations = map[string]string{"owner-pod": "pod4"}

errChan := make(chan error)
go func() { errChan <- gc.Run() }()

gvr := topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
_, err := gc.client.Resource(gvr).(fake.MetadataClient).CreateFake(nrt, metav1.CreateOptions{})
So(err, ShouldBeNil)

So(waitForNrtPodsGC(gc.client, "pod1", "pod2"), ShouldBeTrue)
})
}

func newMockGC(nodes, nrts []string) *mockGC {
func newMockGC(nodes, nrts, pods []string) *mockGC {
// Create fake objects
objs := []runtime.Object{}
for _, name := range nodes {
objs = append(objs, createPartialObjectMetadata("v1", "Node", "", name))
}
for _, name := range nrts {
objs = append(objs, createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", name))
for i, name := range nrts {
nrt := createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", name)
nrt.ObjectMeta.Annotations = map[string]string{"owner-pod": pods[i]}
objs = append(objs, nrt)
}

scheme := fake.NewTestScheme()
_ = metav1.AddMetaToScheme(scheme)
cli := fakemetadataclient.NewSimpleMetadataClient(scheme, objs...)
return &mockGC{
nfdGarbageCollector: nfdGarbageCollector{
factory: metadatainformer.NewSharedInformerFactory(cli, 0),
client: cli,
stopChan: make(chan struct{}),
factory: metadatainformer.NewSharedInformerFactory(cli, 0),
client: cli,
k8sClient: fakek8sclientset.NewSimpleClientset(createFakePods(pods...)...),
stopChan: make(chan struct{}),
args: &Args{
GCPeriod: 10 * time.Minute,
},
Expand All @@ -114,6 +135,17 @@ func newMockGC(nodes, nrts []string) *mockGC {
}
}

func createFakePods(names ...string) []runtime.Object {
pods := make([]runtime.Object, len(names))
for i, n := range names {
pods[i] = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: n,
}}
}
return pods
}

func createPartialObjectMetadata(apiVersion, kind, namespace, name string) *metav1.PartialObjectMetadata {
return &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -152,3 +184,23 @@ func waitForNRT(cli metadataclient.Interface, names ...string) bool {
}
return false
}

func waitForNrtPodsGC(cli metadataclient.Interface, pods ...string) bool {
podsSet := sets.NewString(pods...)
gvr := topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
for i := 0; i < 2; i++ {
rsp, err := cli.Resource(gvr).List(context.TODO(), metav1.ListOptions{})
So(err, ShouldBeNil)

nrtPods := sets.NewString()
for _, nrt := range rsp.Items {
nrtPods.Insert(nrt.Annotations["owner-pod"])
}

if nrtPods.Equal(podsSet) {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
3 changes: 3 additions & 0 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
TopologyManagerPolicyAttributeName = "topologyManagerPolicy"
// TopologyManagerScopeAttributeName represents an attribute which defines Topology Manager Policy Scope
TopologyManagerScopeAttributeName = "topologyManagerScope"
NRTOwnerPodAnnotation = "nfd.node.kubernetes.io/owner-pod"
)

// Args are the command line arguments
Expand Down Expand Up @@ -294,6 +295,7 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi
nrtNew := v1alpha2.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: w.nodeName,
Annotations: map[string]string{NRTOwnerPodAnnotation: fmt.Sprintf("%s_%s", w.kubernetesNamespace, os.Getenv("POD_NAME"))},
OwnerReferences: w.ownerRefs,
},
Zones: zoneInfo,
Expand All @@ -317,6 +319,7 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi
nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zoneInfo
nrtMutated.OwnerReferences = w.ownerRefs
nrtMutated.Annotations[NRTOwnerPodAnnotation] = fmt.Sprintf("%s_%s", w.kubernetesNamespace, os.Getenv("POD_NAME"))

attributes := scanResponse.Attributes

Expand Down
8 changes: 8 additions & 0 deletions test/e2e/utils/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1.
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
},
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Expand Down

0 comments on commit 9bc556d

Please sign in to comment.