diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index ca2a2ea0..ff20fb5d 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -95,3 +95,12 @@ jobs: - name: Test Max Checkpoints Set to 1 run: sudo -E bats -f "test_max_checkpoints_set_to_1" ./test/run_tests.bats + + - name: Test Max Total Checkpoint Size + run: sudo -E bats -f "test_max_total_checkpoint_size" ./test/run_tests.bats + + - name: Test Max Checkpoint Size + run: sudo -E bats -f "test_max_checkpoint_size" ./test/run_tests.bats + + - name: Test orphan retention + run: sudo -E bats -f "test_orphan_retention_policy" ./test/run_tests.bats diff --git a/api/v1/checkpointrestoreoperator_types.go b/api/v1/checkpointrestoreoperator_types.go index aaa44688..c88cc988 100644 --- a/api/v1/checkpointrestoreoperator_types.go +++ b/api/v1/checkpointrestoreoperator_types.go @@ -34,27 +34,41 @@ type CheckpointRestoreOperatorSpec struct { } type GlobalPolicySpec struct { - MaxCheckpointsPerNamespaces *int `json:"maxCheckpointsPerNamespace,omitempty"` - MaxCheckpointsPerPod *int `json:"maxCheckpointsPerPod,omitempty"` - MaxCheckpointsPerContainer *int `json:"maxCheckpointsPerContainer,omitempty"` + RetainOrphan *bool `json:"retainOrphan,omitempty"` + MaxCheckpointsPerNamespaces *int `json:"maxCheckpointsPerNamespace,omitempty"` + MaxCheckpointsPerPod *int `json:"maxCheckpointsPerPod,omitempty"` + MaxCheckpointsPerContainer *int `json:"maxCheckpointsPerContainer,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSizePerNamespace *int `json:"maxTotalSizePerNamespace,omitempty"` + MaxTotalSizePerPod *int `json:"maxTotalSizePerPod,omitempty"` + MaxTotalSizePerContainer *int `json:"maxTotalSizePerContainer,omitempty"` } type ContainerPolicySpec struct { - Namespace string `json:"namespace,omitempty"` - Pod string `json:"pod,omitempty"` - Container string `json:"container,omitempty"` - MaxCheckpoints *int64 `json:"maxCheckpoints,omitempty"` + Namespace string `json:"namespace,omitempty"` + Pod string `json:"pod,omitempty"` + Container string `json:"container,omitempty"` + RetainOrphan *bool `json:"retainOrphan,omitempty"` + MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSize *int `json:"maxTotalSize,omitempty"` } type PodPolicySpec struct { - Namespace string `json:"namespace,omitempty"` - Pod string `json:"pod,omitempty"` - MaxCheckpoints *int64 `json:"maxCheckpoints,omitempty"` + Namespace string `json:"namespace,omitempty"` + Pod string `json:"pod,omitempty"` + RetainOrphan *bool `json:"retainOrphan,omitempty"` + MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSize *int `json:"maxTotalSize,omitempty"` } type NamespacePolicySpec struct { - Namespace string `json:"namespace,omitempty"` - MaxCheckpoints *int64 `json:"maxCheckpoints,omitempty"` + Namespace string `json:"namespace,omitempty"` + RetainOrphan *bool `json:"retainOrphan,omitempty"` + MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSize *int `json:"maxTotalSize,omitempty"` } // CheckpointRestoreOperatorStatus defines the observed state of CheckpointRestoreOperator @@ -64,6 +78,8 @@ type CheckpointRestoreOperatorStatus struct { //+kubebuilder:object:root=true //+kubebuilder:subresource:status +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +//+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch // CheckpointRestoreOperator is the Schema for the checkpointrestoreoperators API type CheckpointRestoreOperator struct { diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 0b9d714d..ac633fa1 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -138,9 +138,24 @@ func (in *CheckpointRestoreOperatorStatus) DeepCopy() *CheckpointRestoreOperator // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ContainerPolicySpec) DeepCopyInto(out *ContainerPolicySpec) { *out = *in + if in.RetainOrphan != nil { + in, out := &in.RetainOrphan, &out.RetainOrphan + *out = new(bool) + **out = **in + } if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints - *out = new(int64) + *out = new(int) + **out = **in + } + if in.MaxCheckpointSize != nil { + in, out := &in.MaxCheckpointSize, &out.MaxCheckpointSize + *out = new(int) + **out = **in + } + if in.MaxTotalSize != nil { + in, out := &in.MaxTotalSize, &out.MaxTotalSize + *out = new(int) **out = **in } } @@ -158,6 +173,11 @@ func (in *ContainerPolicySpec) DeepCopy() *ContainerPolicySpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GlobalPolicySpec) DeepCopyInto(out *GlobalPolicySpec) { *out = *in + if in.RetainOrphan != nil { + in, out := &in.RetainOrphan, &out.RetainOrphan + *out = new(bool) + **out = **in + } if in.MaxCheckpointsPerNamespaces != nil { in, out := &in.MaxCheckpointsPerNamespaces, &out.MaxCheckpointsPerNamespaces *out = new(int) @@ -173,6 +193,26 @@ func (in *GlobalPolicySpec) DeepCopyInto(out *GlobalPolicySpec) { *out = new(int) **out = **in } + if in.MaxCheckpointSize != nil { + in, out := &in.MaxCheckpointSize, &out.MaxCheckpointSize + *out = new(int) + **out = **in + } + if in.MaxTotalSizePerNamespace != nil { + in, out := &in.MaxTotalSizePerNamespace, &out.MaxTotalSizePerNamespace + *out = new(int) + **out = **in + } + if in.MaxTotalSizePerPod != nil { + in, out := &in.MaxTotalSizePerPod, &out.MaxTotalSizePerPod + *out = new(int) + **out = **in + } + if in.MaxTotalSizePerContainer != nil { + in, out := &in.MaxTotalSizePerContainer, &out.MaxTotalSizePerContainer + *out = new(int) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalPolicySpec. @@ -188,9 +228,24 @@ func (in *GlobalPolicySpec) DeepCopy() *GlobalPolicySpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NamespacePolicySpec) DeepCopyInto(out *NamespacePolicySpec) { *out = *in + if in.RetainOrphan != nil { + in, out := &in.RetainOrphan, &out.RetainOrphan + *out = new(bool) + **out = **in + } if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints - *out = new(int64) + *out = new(int) + **out = **in + } + if in.MaxCheckpointSize != nil { + in, out := &in.MaxCheckpointSize, &out.MaxCheckpointSize + *out = new(int) + **out = **in + } + if in.MaxTotalSize != nil { + in, out := &in.MaxTotalSize, &out.MaxTotalSize + *out = new(int) **out = **in } } @@ -208,9 +263,24 @@ func (in *NamespacePolicySpec) DeepCopy() *NamespacePolicySpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodPolicySpec) DeepCopyInto(out *PodPolicySpec) { *out = *in + if in.RetainOrphan != nil { + in, out := &in.RetainOrphan, &out.RetainOrphan + *out = new(bool) + **out = **in + } if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints - *out = new(int64) + *out = new(int) + **out = **in + } + if in.MaxCheckpointSize != nil { + in, out := &in.MaxCheckpointSize, &out.MaxCheckpointSize + *out = new(int) + **out = **in + } + if in.MaxTotalSize != nil { + in, out := &in.MaxTotalSize, &out.MaxTotalSize + *out = new(int) **out = **in } } diff --git a/config/crd/bases/criu.org_checkpointrestoreoperators.yaml b/config/crd/bases/criu.org_checkpointrestoreoperators.yaml index f56382ce..a235af18 100644 --- a/config/crd/bases/criu.org_checkpointrestoreoperators.yaml +++ b/config/crd/bases/criu.org_checkpointrestoreoperators.yaml @@ -52,44 +52,69 @@ spec: properties: container: type: string + maxCheckpointSize: + type: integer maxCheckpoints: - format: int64 + type: integer + maxTotalSize: type: integer namespace: type: string pod: type: string + retainOrphan: + type: boolean type: object type: array globalPolicy: properties: + maxCheckpointSize: + type: integer maxCheckpointsPerContainer: type: integer maxCheckpointsPerNamespace: type: integer maxCheckpointsPerPod: type: integer + maxTotalSizePerContainer: + type: integer + maxTotalSizePerNamespace: + type: integer + maxTotalSizePerPod: + type: integer + retainOrphan: + type: boolean type: object namespacePolicies: items: properties: + maxCheckpointSize: + type: integer maxCheckpoints: - format: int64 + type: integer + maxTotalSize: type: integer namespace: type: string + retainOrphan: + type: boolean type: object type: array podPolicies: items: properties: + maxCheckpointSize: + type: integer maxCheckpoints: - format: int64 + type: integer + maxTotalSize: type: integer namespace: type: string pod: type: string + retainOrphan: + type: boolean type: object type: array type: object diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index ad8c043c..833ece98 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -4,6 +4,22 @@ kind: ClusterRole metadata: name: manager-role rules: +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch - apiGroups: - criu.org resources: diff --git a/config/samples/_v1_checkpointrestoreoperator.yaml b/config/samples/_v1_checkpointrestoreoperator.yaml index f9a6c2ae..ac9fa006 100644 --- a/config/samples/_v1_checkpointrestoreoperator.yaml +++ b/config/samples/_v1_checkpointrestoreoperator.yaml @@ -12,18 +12,30 @@ spec: checkpointDirectory: /var/lib/kubelet/checkpoints applyPoliciesImmediately: false globalPolicy: + retainOrphan: true maxCheckpointsPerNamespace: 50 maxCheckpointsPerPod: 30 maxCheckpointsPerContainer: 10 + maxCheckpointSize: 10 + maxTotalSizePerNamespace: 1000 + maxTotalSizePerPod: 500 + maxTotalSizePerContainer: 100 # containerPolicies: # - namespace: # pod: # container: + # retainOrphan: false # maxCheckpoints: 5 + # maxCheckpointSize: 10 + # maxTotalSize: 100 # podPolicies: # - namespace: # pod: # maxCheckpoints: 10 + # maxCheckpointSize: 10 + # maxTotalSize: 500 # namespacePolicies: # - namespace: # maxCheckpoints: 15 + # maxCheckpointSize: 10 + # maxTotalSize: 1000 diff --git a/docs/retention_policy.md b/docs/retention_policy.md index b69422e8..0dca871e 100644 --- a/docs/retention_policy.md +++ b/docs/retention_policy.md @@ -27,6 +27,7 @@ spec: checkpointDirectory: /var/lib/kubelet/checkpoints applyPoliciesImmediately: false globalPolicy: + retainOrphan: true maxCheckpointsPerNamespace: 50 maxCheckpointsPerPod: 30 maxCheckpointsPerContainer: 10 @@ -35,14 +36,19 @@ spec: # - namespace: # pod: # container: + # retainOrphan: false # Set to false will delete all orphan checkpoints # maxCheckpoints: 5 + # maxCheckpointSize: 6 # Maximum size of a single checkpoint in MB + # maxTotalSize: 20 # Maximum total size of checkpoints for the container in MB # podPolicies: # - namespace: # pod: # maxCheckpoints: 10 + # maxCheckpointSize: 8 # Maximum size of a single checkpoint in MB + # maxTotalSize: 50 # Maximum total size of checkpoints for the pod in MB # namespacePolicies: # - namespace: - # maxCheckpoints: 15` + # maxCheckpoints: 15` ``` A sample configuration file is available [here](/config/samples/_v1_checkpointrestoreoperator.yaml). @@ -51,21 +57,35 @@ A sample configuration file is available [here](/config/samples/_v1_checkpointre - `checkpointDirectory`: Specifies the directory where checkpoints are stored. - `applyPoliciesImmediately`: If set to `true`, the policies are applied immediately. If `false` (default value), they are applied after new checkpoint creation. - `globalPolicy`: Defines global checkpoint retention limits. + - `retainOrphan`: If set to `true` (default), orphan checkpoints (checkpoints whose associated resources have been deleted) will be retained. If set to `false`, orphan checkpoints will be automatically deleted. This is particularly useful for transient checkpoints used to recover from errors by replacing 'container restart' with 'container restore'. - `maxCheckpointsPerNamespace`: Maximum number of checkpoints per namespace. - `maxCheckpointsPerPod`: Maximum number of checkpoints per pod. - `maxCheckpointsPerContainer`: Maximum number of checkpoints per container. + - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. + - `maxTotalSizePerNamespace`: Maximum total size of checkpoints per namespace in MB. + - `maxTotalSizePerPod`: Maximum total size of checkpoints per pod in MB. + - `maxTotalSizePerContainer`: Maximum total size of checkpoints per container in MB. - `containerPolicies` (optional): Specific retention policies for containers. - `namespace`: Namespace of the container. - `pod`: Pod name of the container. - `container`: Container name. + - `retainOrphan`: If set to `true` (default), orphan checkpoints for this container will be retained. If set to `false`, orphan checkpoints will be deleted. - `maxCheckpoints`: Maximum number of checkpoints for the container. + - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. + - `maxTotalSize`: Maximum total size of checkpoints for the container in MB. - `podPolicies` (optional): Specific retention policies for pods. - `namespace`: Namespace of the pod. - `pod`: Pod name. + - `retainOrphan`: If set to `true` (default), orphan checkpoints for this pod will be retained. If set to `false`, orphan checkpoints will be deleted. - `maxCheckpoints`: Maximum number of checkpoints for the pod. + - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. + - `maxTotalSize`: Maximum total size of checkpoints for the pod in MB. - `namespacePolicies` (optional): Specific retention policies for namespaces. - `namespace`: Namespace name. + - `retainOrphan`: If set to `true` (default), orphan checkpoints for this namespace will be retained. If set to `false`, orphan checkpoints will be deleted. - `maxCheckpoints`: Maximum number of checkpoints for the namespace. + - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. + - `maxTotalSize`: Maximum total size of checkpoints for the namespace in MB. ## Policy Hierarchy and Application diff --git a/go.mod b/go.mod index 021aa0f4..391b8af4 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/go-logr/logr v1.4.2 github.com/onsi/ginkgo/v2 v2.21.0 github.com/onsi/gomega v1.35.1 + k8s.io/api v0.31.2 k8s.io/apimachinery v0.31.2 k8s.io/client-go v0.31.2 k8s.io/kubelet v0.31.2 @@ -75,7 +76,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.31.2 // indirect k8s.io/apiextensions-apiserver v0.31.2 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20241009091222-67ed5848f094 // indirect diff --git a/internal/controller/checkpointrestoreoperator_controller.go b/internal/controller/checkpointrestoreoperator_controller.go index 22e4392b..d8054f75 100644 --- a/internal/controller/checkpointrestoreoperator_controller.go +++ b/internal/controller/checkpointrestoreoperator_controller.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -36,7 +36,14 @@ import ( metadata "github.com/checkpoint-restore/checkpointctl/lib" "github.com/containers/storage/pkg/archive" "github.com/go-logr/logr" + v1 "k8s.io/api/core/v1" + k8err "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" kubelettypes "k8s.io/kubelet/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -47,8 +54,17 @@ import ( type RetentionPolicy int +// This constants are used to specify the policy criteria +// for selecting which checkpoint archives to delete . const ( ByCount RetentionPolicy = iota + BySize +) + +// Constants for unit conversion +const ( + KB = 1024 + MB = 1024 * KB ) var ( @@ -59,6 +75,11 @@ var ( maxCheckpointsPerContainer int = 10 maxCheckpointsPerPod int = 20 maxCheckpointsPerNamespace int = 30 + retainOrphan *bool + maxCheckpointSize int = math.MaxInt32 + maxTotalSizePerPod int = math.MaxInt32 + maxTotalSizePerContainer int = math.MaxInt32 + maxTotalSizePerNamespace int = math.MaxInt32 containerPolicies []criuorgv1.ContainerPolicySpec podPolicies []criuorgv1.PodPolicySpec namespacePolicies []criuorgv1.NamespacePolicySpec @@ -91,6 +112,7 @@ func (r *CheckpointRestoreOperatorReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, client.IgnoreNotFound(err) } + resetAllPoliciesToDefault(log) r.handleGlobalPolicies(log, &input.Spec.GlobalPolicies) r.handleSpecificPolicies(log, &input.Spec) @@ -106,10 +128,39 @@ func (r *CheckpointRestoreOperatorReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, nil } +func resetAllPoliciesToDefault(log logr.Logger) { + retainOrphan = nil + maxCheckpointsPerContainer = 10 + maxCheckpointsPerPod = 20 + maxCheckpointsPerNamespace = 30 + maxCheckpointSize = math.MaxInt32 + maxTotalSizePerContainer = math.MaxInt32 + maxTotalSizePerPod = math.MaxInt32 + maxTotalSizePerNamespace = math.MaxInt32 + + containerPolicies = nil + podPolicies = nil + namespacePolicies = nil + + log.Info("Policies have been reset to their default values") +} + +func ptr(b bool) *bool { + return &b +} + func (r *CheckpointRestoreOperatorReconciler) handleGlobalPolicies(log logr.Logger, globalPolicies *criuorgv1.GlobalPolicySpec) { policyMutex.Lock() defer policyMutex.Unlock() + if globalPolicies.RetainOrphan == nil { + retainOrphan = ptr(true) + log.Info("RetainOrphan policy not set, using default", "retainOrphan", retainOrphan) + } else { + retainOrphan = globalPolicies.RetainOrphan + log.Info("Changed RetainOrphan policy", "retainOrphan", retainOrphan) + } + if globalPolicies.MaxCheckpointsPerContainer != nil && *globalPolicies.MaxCheckpointsPerContainer >= 0 { maxCheckpointsPerContainer = *globalPolicies.MaxCheckpointsPerContainer log.Info("Changed MaxCheckpointsPerContainer", "maxCheckpointsPerContainer", maxCheckpointsPerContainer) @@ -124,6 +175,26 @@ func (r *CheckpointRestoreOperatorReconciler) handleGlobalPolicies(log logr.Logg maxCheckpointsPerNamespace = *globalPolicies.MaxCheckpointsPerNamespaces log.Info("Changed MaxCheckpointsPerNamespace", "maxCheckpointsPerNamespace", maxCheckpointsPerNamespace) } + + if globalPolicies.MaxCheckpointSize != nil && *globalPolicies.MaxCheckpointSize >= 0 { + maxCheckpointSize = *globalPolicies.MaxCheckpointSize * MB + log.Info("Changed MaxCheckpointSize", "maxCheckpointSize", maxCheckpointSize) + } + + if globalPolicies.MaxTotalSizePerNamespace != nil && *globalPolicies.MaxTotalSizePerNamespace >= 0 { + maxTotalSizePerNamespace = *globalPolicies.MaxTotalSizePerNamespace * MB + log.Info("Changed MaxTotalSizePerNamespace", "maxTotalSizePerNamespace", maxTotalSizePerNamespace) + } + + if globalPolicies.MaxTotalSizePerPod != nil && *globalPolicies.MaxTotalSizePerPod >= 0 { + maxTotalSizePerPod = *globalPolicies.MaxTotalSizePerPod * MB + log.Info("Changed MaxTotalSizePerPod", "maxTotalSizePerPod", maxTotalSizePerPod) + } + + if globalPolicies.MaxTotalSizePerContainer != nil && *globalPolicies.MaxTotalSizePerContainer >= 0 { + maxTotalSizePerContainer = *globalPolicies.MaxTotalSizePerContainer * MB + log.Info("Changed MaxTotalSizePerContainer", "maxTotalSizePerContainer", maxTotalSizePerContainer) + } } func (r *CheckpointRestoreOperatorReconciler) handleSpecificPolicies(log logr.Logger, spec *criuorgv1.CheckpointRestoreOperatorSpec) { @@ -280,21 +351,72 @@ func getCheckpointArchiveInformation(log logr.Logger, checkpointPath string) (*c return details, nil } +type Policy struct { + RetainOrphan bool + MaxCheckpoints int + MaxCheckpointSize int + MaxTotalSize int +} + func applyPolicies(log logr.Logger, details *checkpointDetails) { policyMutex.Lock() defer policyMutex.Unlock() + toInfinity := func(value *int) int { + if value == nil { + return math.MaxInt32 + } + return *value + } + + ifNil := func(value *bool) bool { + if value == nil { + return true + } + return *value + } + if policy := findContainerPolicy(details); policy != nil { - handleCheckpointsForLevel(log, details, "container", int(*policy.MaxCheckpoints)) + handleCheckpointsForLevel(log, details, "container", Policy{ + RetainOrphan: ifNil(policy.RetainOrphan), + MaxCheckpoints: toInfinity(policy.MaxCheckpoints), + MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, + MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, + }) } else if policy := findPodPolicy(details); policy != nil { - handleCheckpointsForLevel(log, details, "pod", int(*policy.MaxCheckpoints)) + handleCheckpointsForLevel(log, details, "pod", Policy{ + RetainOrphan: ifNil(policy.RetainOrphan), + MaxCheckpoints: toInfinity(policy.MaxCheckpoints), + MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, + MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, + }) } else if policy := findNamespacePolicy(details); policy != nil { - handleCheckpointsForLevel(log, details, "namespace", int(*policy.MaxCheckpoints)) + handleCheckpointsForLevel(log, details, "namespace", Policy{ + RetainOrphan: ifNil(policy.RetainOrphan), + MaxCheckpoints: toInfinity(policy.MaxCheckpoints), + MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, + MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, + }) } else { // Apply global policies if no specific policy found - handleCheckpointsForLevel(log, details, "container", maxCheckpointsPerContainer) - handleCheckpointsForLevel(log, details, "pod", maxCheckpointsPerPod) - handleCheckpointsForLevel(log, details, "namespace", maxCheckpointsPerNamespace) + handleCheckpointsForLevel(log, details, "container", Policy{ + RetainOrphan: *retainOrphan, + MaxCheckpoints: maxCheckpointsPerContainer, + MaxCheckpointSize: maxCheckpointSize, + MaxTotalSize: maxTotalSizePerContainer, + }) + handleCheckpointsForLevel(log, details, "pod", Policy{ + RetainOrphan: *retainOrphan, + MaxCheckpoints: maxCheckpointsPerPod, + MaxCheckpointSize: maxCheckpointSize, + MaxTotalSize: maxTotalSizePerPod, + }) + handleCheckpointsForLevel(log, details, "namespace", Policy{ + RetainOrphan: *retainOrphan, + MaxCheckpoints: maxCheckpointsPerNamespace, + MaxCheckpointSize: maxCheckpointSize, + MaxTotalSize: maxTotalSizePerNamespace, + }) } } @@ -374,8 +496,17 @@ func handleWriteFinished(ctx context.Context, event fsnotify.Event) { applyPolicies(log, details) } -func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, level string, maxCheckpoints int) { - if maxCheckpoints <= 0 { +func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, level string, policy Policy) { + if policy.MaxCheckpoints <= 0 { + log.Info("MaxCheckpoints is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxCheckpoints", policy.MaxCheckpoints) + return + } + if policy.MaxCheckpointSize <= 0 { + log.Info("MaxCheckpointSize is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxCheckpointSize", policy.MaxCheckpointSize) + return + } + if policy.MaxTotalSize <= 0 { + log.Info("MaxTotalSize is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxTotalSize", policy.MaxTotalSize) return } @@ -432,28 +563,58 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve filteredArchives = append(filteredArchives, archive) } + if !policy.RetainOrphan { + exist, err := resourceExistsInCluster(level, details) + if err != nil { + log.Error(err, "failed to check if resource exists in cluster", "level", level) + } + + if !exist { + log.Info("RetainOrphan is set to false, deleting all checkpoints", "level", level) + + for _, archive := range filteredArchives { + log.Info("Deleting checkpoint archive due to retainCheckpoint=false", "archive", archive) + err := os.Remove(archive) + if err != nil { + log.Error(err, "failed to remove checkpoint archive", "archive", archive) + } + } + + return + } + } + checkpointArchivesCounter := len(filteredArchives) + totalSize := int64(0) + archiveSizes := make(map[string]int64) archivesToDelete := make(map[int64]string) for _, c := range filteredArchives { fi, err := os.Stat(c) if err != nil { - log.Error( - err, - "failed to stat", - "file", - c, - ) + log.Error(err, "failed to stat", "file", c) + continue + } + + log.Info("Checkpoint archive details", "archive", c, "size", fi.Size(), "maxCheckpointSize", policy.MaxCheckpointSize) + if policy.MaxCheckpointSize > 0 && fi.Size() > int64(policy.MaxCheckpointSize) { + log.Info("Deleting checkpoint archive due to exceeding MaxCheckpointSize", "archive", c, "size", fi.Size(), "maxCheckpointSize", policy.MaxCheckpointSize) + err := os.Remove(c) + if err != nil { + log.Error(err, "failed to remove checkpoint archive", "archive", c) + } continue } + totalSize += fi.Size() + archiveSizes[c] = fi.Size() archivesToDelete[fi.ModTime().UnixMicro()] = c } // Handle excess checkpoints by count - if maxCheckpoints > 0 && checkpointArchivesCounter > maxCheckpoints { - excessCount := int64(checkpointArchivesCounter - maxCheckpoints) - log.Info("Checkpoint count exceeds limit", "checkpointArchivesCounter", checkpointArchivesCounter, "maxCheckpoints", maxCheckpoints, "excessCount", excessCount) - toDelete := selectArchivesToDelete(log, checkpointArchives, excessCount, ByCount) + if policy.MaxCheckpoints > 0 && checkpointArchivesCounter > policy.MaxCheckpoints { + excessCount := int64(checkpointArchivesCounter - policy.MaxCheckpoints) + log.Info("Checkpoint count exceeds limit", "checkpointArchivesCounter", checkpointArchivesCounter, "maxCheckpoints", policy.MaxCheckpoints, "excessCount", excessCount) + toDelete := selectArchivesToDelete(log, checkpointArchives, archiveSizes, excessCount, ByCount) for _, archive := range toDelete { log.Info("Deleting checkpoint archive due to excess count", "archive", archive) err := os.Remove(archive) @@ -461,14 +622,33 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve log.Error(err, "removal of checkpoint archive failed", "archive", archive) } checkpointArchivesCounter-- - if checkpointArchivesCounter <= maxCheckpoints { + if checkpointArchivesCounter <= policy.MaxCheckpoints { + break + } + } + } + + // Handle total size against maxTotalSize + if policy.MaxTotalSize > 0 && totalSize > int64(policy.MaxTotalSize) { + excessSize := totalSize - int64(policy.MaxTotalSize) + log.Info("Total size of checkpoint archives exceeds limit", "totalSize", totalSize, "maxTotalSize", policy.MaxTotalSize, "excessSize", excessSize) + toDelete := selectArchivesToDelete(log, filteredArchives, archiveSizes, excessSize, BySize) + for _, archive := range toDelete { + log.Info("Deleting checkpoint archive due to excess size", "archive", archive) + err := os.Remove(archive) + if err != nil { + log.Error(err, "Removal of checkpoint archive failed", "archive", archive) + } + totalSize -= archiveSizes[archive] + delete(archiveSizes, archive) + if totalSize <= int64(policy.MaxTotalSize) { break } } } } -func selectArchivesToDelete(log logr.Logger, archives []string, excess int64, policy RetentionPolicy) []string { +func selectArchivesToDelete(log logr.Logger, archives []string, archiveSizes map[string]int64, excess int64, policy RetentionPolicy) []string { toDelete := make([]string, 0) switch policy { @@ -493,11 +673,178 @@ func selectArchivesToDelete(log logr.Logger, archives []string, excess int64, po for i := 0; i < int(excess); i++ { toDelete = append(toDelete, archives[i]) } + + case BySize: + // Sort by modification time (oldest first) + sort.Slice(archives, func(i, j int) bool { + fileInfo1, err1 := os.Stat(archives[i]) + if err1 != nil { + log.Error(err1, "Error stating file", archives[i]) + return false + } + + fileInfo2, err2 := os.Stat(archives[j]) + if err2 != nil { + log.Error(err2, "Error stating file", archives[j]) + return false + } + + return fileInfo1.ModTime().Before(fileInfo2.ModTime()) + }) + + for _, archive := range archives { + toDelete = append(toDelete, archive) + excess -= archiveSizes[archive] + if excess <= 0 { + break + } + } } return toDelete } +func resourceExistsInCluster(level string, details *checkpointDetails) (bool, error) { + switch level { + case "container": + pod, err := getPodFromNamespace(details.namespace, details.pod) + if err != nil { + if isNotFoundError(err) { + return false, nil + } + return false, err + } + for _, container := range pod.Spec.Containers { + if container.Name == details.container { + return true, nil + } + } + return false, nil + + case "pod": + _, err := getPodFromNamespace(details.namespace, details.pod) + if err != nil { + if isNotFoundError(err) { + return false, nil + } + return false, err + } + return true, nil + + case "namespace": + _, err := getNamespace(details.namespace) + if err != nil { + if isNotFoundError(err) { + return false, nil + } + return false, err + } + return true, nil + + default: + return false, fmt.Errorf("invalid level: %s", level) + } +} + +func isNotFoundError(err error) bool { + return k8err.IsNotFound(err) +} + +func getKubernetesClient() (*kubernetes.Clientset, error) { + // Use in-cluster configuration + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + return kubernetes.NewForConfig(config) +} + +func getPodFromNamespace(namespace, podName string) (*v1.Pod, error) { + clientset, err := getKubernetesClient() + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client: %v", err) + } + + pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + return pod, nil +} + +func getNamespace(namespace string) (*v1.Namespace, error) { + clientset, err := getKubernetesClient() + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client: %v", err) + } + + ns, err := clientset.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + return ns, nil +} + +func (gc *garbageCollector) PodWatcher(log logr.Logger, stopCh <-chan struct{}) { + clientset, err := getKubernetesClient() + if err != nil { + log.Error(err, "Failed to create Kubernetes client") + return + } + + watchlist := cache.NewListWatchFromClient( + clientset.CoreV1().RESTClient(), + "pods", + metav1.NamespaceAll, + fields.Everything(), + ) + + _, controller := cache.NewInformerWithOptions(cache.InformerOptions{ + ListerWatcher: watchlist, + ObjectType: &v1.Pod{}, + Handler: cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + log.Info("Could not convert object to Pod") + return + } + handlePodDeleted(log, pod) + }, + }, + }) + + go controller.Run(stopCh) + + <-stopCh + log.Info("PodWatcher has been stopped") +} + +func handlePodDeleted(log logr.Logger, pod *v1.Pod) { + log.Info("Pod deleted", "pod", pod.Name) + + containerNames := getContainerNames(pod) + + for _, containerName := range containerNames { + details := &checkpointDetails{ + namespace: pod.Namespace, + pod: pod.Name, + container: containerName, + } + applyPolicies(log, details) + } +} + +func getContainerNames(pod *v1.Pod) []string { + var containerNames []string + for _, container := range pod.Spec.Containers { + containerNames = append(containerNames, container.Name) + } + return containerNames +} + func (gc *garbageCollector) runGarbageCollector() { // This function tries to detect newly created checkpoint archives with the help // of inotify/fsnotify. If a new checkpoint archive is created we get a @@ -530,6 +877,9 @@ func (gc *garbageCollector) runGarbageCollector() { c := make(chan struct{}) + // Start pod watcher in a separate goroutine + go gc.PodWatcher(log, c) + go func() { for { select { @@ -577,6 +927,10 @@ func (gc *garbageCollector) runGarbageCollector() { log.Info("MaxCheckpointsPerContainer", "maxCheckpointsPerContainer", maxCheckpointsPerContainer) log.Info("MaxCheckpointsPerPod", "maxCheckpointsPerPod", maxCheckpointsPerPod) log.Info("MaxCheckpointsPerNamespace", "maxCheckpointsPerNamespace", maxCheckpointsPerNamespace) + log.Info("MaxCheckpointSize", "maxCheckpointSize", maxCheckpointSize) + log.Info("MaxTotalSizePerNamespace", "maxTotalSizePerNamespace", maxTotalSizePerNamespace) + log.Info("MaxTotalSizePerPod", "maxTotalSizePerPod", maxTotalSizePerPod) + log.Info("MaxTotalSizePerContainer", "maxTotalSizePerContainer", maxTotalSizePerContainer) err = watcher.Add(checkpointDirectory) if err != nil { diff --git a/test/generate_checkpoint_tar.sh b/test/generate_checkpoint_tar.sh index 6938d501..27832a44 100755 --- a/test/generate_checkpoint_tar.sh +++ b/test/generate_checkpoint_tar.sh @@ -30,12 +30,31 @@ increment_timestamp() { date -d "$1 + 1 second" +%Y-%m-%dT%H:%M:%S } +generate_large_file() { + local file_path=$1 + local size_mb=$2 + dd if=/dev/urandom of="$file_path" bs=1M count="$size_mb" status=none +} + +CREATE_LARGE=false +if [ "$1" == "large" ]; then + CREATE_LARGE=true + echo "Creating large files: $CREATE_LARGE" +fi + TIMESTAMP=$(date +%Y-%m-%dT%H:%M:%S) for _ in {1..5}; do TAR_NAME="checkpoint.tar" ORIGINAL_NAME="checkpoint-podname_namespace-containername-$TIMESTAMP.tar" + + if [ "$CREATE_LARGE" = true ]; then + LARGE_FILE="$TEMP_DIR/large_file" + generate_large_file "$LARGE_FILE" 5 + fi + create_checkpoint_tar "$TAR_NAME" "$ORIGINAL_NAME" + TIMESTAMP=$(increment_timestamp "$TIMESTAMP") done diff --git a/test/run_tests.bats b/test/run_tests.bats index 7e47e376..389f0ae2 100755 --- a/test/run_tests.bats +++ b/test/run_tests.bats @@ -58,3 +58,38 @@ function teardown() { log_and_run ls -la "$CHECKPOINT_DIR" [ "$status" -eq 0 ] } + +@test "test_max_total_checkpoint_size" { + log_and_run kubectl apply -f ./test/test_bySize_checkpointrestoreoperator.yaml + [ "$status" -eq 0 ] + log_and_run ./test/generate_checkpoint_tar.sh large + [ "$status" -eq 0 ] + log_and_run ./test/wait_for_checkpoint_reduction.sh 2 + [ "$status" -eq 0 ] + log_and_run ls -la "$CHECKPOINT_DIR" + [ "$status" -eq 0 ] +} + +@test "test_max_checkpoint_size" { + log_and_run sed -i '/^ containerPolicies:/,/maxTotalSize: [0-9]*/ s/^/#/' ./test/test_bySize_checkpointrestoreoperator.yaml + [ "$status" -eq 0 ] + log_and_run kubectl apply -f ./test/test_bySize_checkpointrestoreoperator.yaml + [ "$status" -eq 0 ] + log_and_run ./test/generate_checkpoint_tar.sh large + [ "$status" -eq 0 ] + log_and_run ./test/wait_for_checkpoint_reduction.sh 0 + [ "$status" -eq 0 ] + log_and_run ls -la "$CHECKPOINT_DIR" + [ "$status" -eq 0 ] +} + +@test "test_orphan_retention_policy" { + log_and_run kubectl apply -f ./test/test_orphan_checkpointrestoreoperator.yaml + [ "$status" -eq 0 ] + log_and_run ./test/generate_checkpoint_tar.sh + [ "$status" -eq 0 ] + log_and_run ./test/wait_for_checkpoint_reduction.sh 0 + [ "$status" -eq 0 ] + log_and_run ls -la "$CHECKPOINT_DIR" + [ "$status" -eq 0 ] +} diff --git a/test/test_bySize_checkpointrestoreoperator.yaml b/test/test_bySize_checkpointrestoreoperator.yaml new file mode 100644 index 00000000..2641b9ad --- /dev/null +++ b/test/test_bySize_checkpointrestoreoperator.yaml @@ -0,0 +1,24 @@ +apiVersion: criu.org/v1 +kind: CheckpointRestoreOperator +metadata: + labels: + app.kubernetes.io/name: checkpointrestoreoperator + app.kubernetes.io/instance: checkpointrestoreoperator-sample + app.kubernetes.io/part-of: checkpoint-restore-operator + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: checkpoint-restore-operator + name: checkpointrestoreoperator-sample +spec: + checkpointDirectory: /var/lib/kubelet/checkpoints + applyPoliciesImmediately: true + globalPolicy: + maxCheckpointSize: 4 + maxTotalSizePerNamespace: 1000 + maxTotalSizePerPod: 500 + maxTotalSizePerContainer: 100 + containerPolicies: + - namespace: namespace + pod: podname + container: containername + maxCheckpointSize: 6 + maxTotalSize: 12 diff --git a/test/test_orphan_checkpointrestoreoperator.yaml b/test/test_orphan_checkpointrestoreoperator.yaml new file mode 100644 index 00000000..ac868389 --- /dev/null +++ b/test/test_orphan_checkpointrestoreoperator.yaml @@ -0,0 +1,20 @@ +apiVersion: criu.org/v1 +kind: CheckpointRestoreOperator +metadata: + labels: + app.kubernetes.io/name: checkpointrestoreoperator + app.kubernetes.io/instance: checkpointrestoreoperator-sample + app.kubernetes.io/part-of: checkpoint-restore-operator + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: checkpoint-restore-operator + name: checkpointrestoreoperator-sample +spec: + checkpointDirectory: /var/lib/kubelet/checkpoints + applyPoliciesImmediately: true + globalPolicy: + retainOrphan: true + containerPolicies: + - namespace: namespace + pod: podname + container: containername + retainOrphan: false