From 2cc0b3564dbcf7512bb522d87eebaeb37db7c8be Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Fri, 21 Jun 2024 23:36:57 +0530 Subject: [PATCH 1/7] feat: Add storage-based checkpoint retention policies - Introduced global, container, pod, and namespace-level policies for checkpoint retention, based on storage/size limits. - Updated CRD definitions to store the storage/size based policies. - Updated the sample configuration of CheckpointRestoreOperator with storage/checkpoint-size based policies Signed-off-by: Parthiba-Hazra --- api/v1/checkpointrestoreoperator_types.go | 28 ++- api/v1/zz_generated.deepcopy.go | 56 ++++- .../criu.org_checkpointrestoreoperators.yaml | 23 +- .../_v1_checkpointrestoreoperator.yaml | 10 + .../checkpointrestoreoperator_controller.go | 196 ++++++++++++++++-- 5 files changed, 277 insertions(+), 36 deletions(-) diff --git a/api/v1/checkpointrestoreoperator_types.go b/api/v1/checkpointrestoreoperator_types.go index aaa44688..af2be526 100644 --- a/api/v1/checkpointrestoreoperator_types.go +++ b/api/v1/checkpointrestoreoperator_types.go @@ -37,24 +37,34 @@ type GlobalPolicySpec struct { MaxCheckpointsPerNamespaces *int `json:"maxCheckpointsPerNamespace,omitempty"` MaxCheckpointsPerPod *int `json:"maxCheckpointsPerPod,omitempty"` MaxCheckpointsPerContainer *int `json:"maxCheckpointsPerContainer,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSizePerNamespace *int `json:"maxTotalSizePerNamespace,omitempty"` + MaxTotalSizePerPod *int `json:"maxTotalSizePerPod,omitempty"` + MaxTotalSizePerContainer *int `json:"maxTotalSizePerContainer,omitempty"` } type ContainerPolicySpec struct { - Namespace string `json:"namespace,omitempty"` - Pod string `json:"pod,omitempty"` - Container string `json:"container,omitempty"` - MaxCheckpoints *int64 `json:"maxCheckpoints,omitempty"` + Namespace string `json:"namespace,omitempty"` + Pod string `json:"pod,omitempty"` + Container string `json:"container,omitempty"` + MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSize *int `json:"maxTotalSize,omitempty"` } type PodPolicySpec struct { - Namespace string `json:"namespace,omitempty"` - Pod string `json:"pod,omitempty"` - MaxCheckpoints *int64 `json:"maxCheckpoints,omitempty"` + Namespace string `json:"namespace,omitempty"` + Pod string `json:"pod,omitempty"` + MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSize *int `json:"maxTotalSize,omitempty"` } type NamespacePolicySpec struct { - Namespace string `json:"namespace,omitempty"` - MaxCheckpoints *int64 `json:"maxCheckpoints,omitempty"` + Namespace string `json:"namespace,omitempty"` + MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSize *int `json:"maxTotalSize,omitempty"` } // CheckpointRestoreOperatorStatus defines the observed state of CheckpointRestoreOperator diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 0b9d714d..24ad2941 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -140,7 +140,17 @@ func (in *ContainerPolicySpec) DeepCopyInto(out *ContainerPolicySpec) { *out = *in if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints - *out = new(int64) + *out = new(int) + **out = **in + } + if in.MaxCheckpointSize != nil { + in, out := &in.MaxCheckpointSize, &out.MaxCheckpointSize + *out = new(int) + **out = **in + } + if in.MaxTotalSize != nil { + in, out := &in.MaxTotalSize, &out.MaxTotalSize + *out = new(int) **out = **in } } @@ -173,6 +183,26 @@ func (in *GlobalPolicySpec) DeepCopyInto(out *GlobalPolicySpec) { *out = new(int) **out = **in } + if in.MaxCheckpointSize != nil { + in, out := &in.MaxCheckpointSize, &out.MaxCheckpointSize + *out = new(int) + **out = **in + } + if in.MaxTotalSizePerNamespace != nil { + in, out := &in.MaxTotalSizePerNamespace, &out.MaxTotalSizePerNamespace + *out = new(int) + **out = **in + } + if in.MaxTotalSizePerPod != nil { + in, out := &in.MaxTotalSizePerPod, &out.MaxTotalSizePerPod + *out = new(int) + **out = **in + } + if in.MaxTotalSizePerContainer != nil { + in, out := &in.MaxTotalSizePerContainer, &out.MaxTotalSizePerContainer + *out = new(int) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalPolicySpec. @@ -190,7 +220,17 @@ func (in *NamespacePolicySpec) DeepCopyInto(out *NamespacePolicySpec) { *out = *in if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints - *out = new(int64) + *out = new(int) + **out = **in + } + if in.MaxCheckpointSize != nil { + in, out := &in.MaxCheckpointSize, &out.MaxCheckpointSize + *out = new(int) + **out = **in + } + if in.MaxTotalSize != nil { + in, out := &in.MaxTotalSize, &out.MaxTotalSize + *out = new(int) **out = **in } } @@ -210,7 +250,17 @@ func (in *PodPolicySpec) DeepCopyInto(out *PodPolicySpec) { *out = *in if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints - *out = new(int64) + *out = new(int) + **out = **in + } + if in.MaxCheckpointSize != nil { + in, out := &in.MaxCheckpointSize, &out.MaxCheckpointSize + *out = new(int) + **out = **in + } + if in.MaxTotalSize != nil { + in, out := &in.MaxTotalSize, &out.MaxTotalSize + *out = new(int) **out = **in } } diff --git a/config/crd/bases/criu.org_checkpointrestoreoperators.yaml b/config/crd/bases/criu.org_checkpointrestoreoperators.yaml index f56382ce..86d572c4 100644 --- a/config/crd/bases/criu.org_checkpointrestoreoperators.yaml +++ b/config/crd/bases/criu.org_checkpointrestoreoperators.yaml @@ -52,8 +52,11 @@ spec: properties: container: type: string + maxCheckpointSize: + type: integer maxCheckpoints: - format: int64 + type: integer + maxTotalSize: type: integer namespace: type: string @@ -63,18 +66,29 @@ spec: type: array globalPolicy: properties: + maxCheckpointSize: + type: integer maxCheckpointsPerContainer: type: integer maxCheckpointsPerNamespace: type: integer maxCheckpointsPerPod: type: integer + maxTotalSizePerContainer: + type: integer + maxTotalSizePerNamespace: + type: integer + maxTotalSizePerPod: + type: integer type: object namespacePolicies: items: properties: + maxCheckpointSize: + type: integer maxCheckpoints: - format: int64 + type: integer + maxTotalSize: type: integer namespace: type: string @@ -83,8 +97,11 @@ spec: podPolicies: items: properties: + maxCheckpointSize: + type: integer maxCheckpoints: - format: int64 + type: integer + maxTotalSize: type: integer namespace: type: string diff --git a/config/samples/_v1_checkpointrestoreoperator.yaml b/config/samples/_v1_checkpointrestoreoperator.yaml index f9a6c2ae..963ffc2c 100644 --- a/config/samples/_v1_checkpointrestoreoperator.yaml +++ b/config/samples/_v1_checkpointrestoreoperator.yaml @@ -15,15 +15,25 @@ spec: maxCheckpointsPerNamespace: 50 maxCheckpointsPerPod: 30 maxCheckpointsPerContainer: 10 + maxCheckpointSize: 10 + maxTotalSizePerNamespace: 1000 + maxTotalSizePerPod: 500 + maxTotalSizePerContainer: 100 # containerPolicies: # - namespace: # pod: # container: # maxCheckpoints: 5 + # maxCheckpointSize: 10 + # maxTotalSize: 100 # podPolicies: # - namespace: # pod: # maxCheckpoints: 10 + # maxCheckpointSize: 10 + # maxTotalSize: 500 # namespacePolicies: # - namespace: # maxCheckpoints: 15 + # maxCheckpointSize: 10 + # maxTotalSize: 1000 diff --git a/internal/controller/checkpointrestoreoperator_controller.go b/internal/controller/checkpointrestoreoperator_controller.go index 22e4392b..a1b68909 100644 --- a/internal/controller/checkpointrestoreoperator_controller.go +++ b/internal/controller/checkpointrestoreoperator_controller.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -47,8 +47,17 @@ import ( type RetentionPolicy int +// This constants are used to specify the policy criteria +// for selecting which checkpoint archives to delete . const ( ByCount RetentionPolicy = iota + BySize +) + +// Constants for unit conversion +const ( + KB = 1024 + MB = 1024 * KB ) var ( @@ -59,6 +68,10 @@ var ( maxCheckpointsPerContainer int = 10 maxCheckpointsPerPod int = 20 maxCheckpointsPerNamespace int = 30 + maxCheckpointSize int = math.MaxInt32 + maxTotalSizePerPod int = math.MaxInt32 + maxTotalSizePerContainer int = math.MaxInt32 + maxTotalSizePerNamespace int = math.MaxInt32 containerPolicies []criuorgv1.ContainerPolicySpec podPolicies []criuorgv1.PodPolicySpec namespacePolicies []criuorgv1.NamespacePolicySpec @@ -91,6 +104,7 @@ func (r *CheckpointRestoreOperatorReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, client.IgnoreNotFound(err) } + resetAllPoliciesToDefault(log) r.handleGlobalPolicies(log, &input.Spec.GlobalPolicies) r.handleSpecificPolicies(log, &input.Spec) @@ -106,6 +120,22 @@ func (r *CheckpointRestoreOperatorReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, nil } +func resetAllPoliciesToDefault(log logr.Logger) { + maxCheckpointsPerContainer = 10 + maxCheckpointsPerPod = 20 + maxCheckpointsPerNamespace = 30 + maxCheckpointSize = math.MaxInt32 + maxTotalSizePerContainer = math.MaxInt32 + maxTotalSizePerPod = math.MaxInt32 + maxTotalSizePerNamespace = math.MaxInt32 + + containerPolicies = nil + podPolicies = nil + namespacePolicies = nil + + log.Info("Policies have been reset to their default values") +} + func (r *CheckpointRestoreOperatorReconciler) handleGlobalPolicies(log logr.Logger, globalPolicies *criuorgv1.GlobalPolicySpec) { policyMutex.Lock() defer policyMutex.Unlock() @@ -124,6 +154,26 @@ func (r *CheckpointRestoreOperatorReconciler) handleGlobalPolicies(log logr.Logg maxCheckpointsPerNamespace = *globalPolicies.MaxCheckpointsPerNamespaces log.Info("Changed MaxCheckpointsPerNamespace", "maxCheckpointsPerNamespace", maxCheckpointsPerNamespace) } + + if globalPolicies.MaxCheckpointSize != nil && *globalPolicies.MaxCheckpointSize >= 0 { + maxCheckpointSize = *globalPolicies.MaxCheckpointSize * MB + log.Info("Changed MaxCheckpointSize", "maxCheckpointSize", maxCheckpointSize) + } + + if globalPolicies.MaxTotalSizePerNamespace != nil && *globalPolicies.MaxTotalSizePerNamespace >= 0 { + maxTotalSizePerNamespace = *globalPolicies.MaxTotalSizePerNamespace * MB + log.Info("Changed MaxTotalSizePerNamespace", "maxTotalSizePerNamespace", maxTotalSizePerNamespace) + } + + if globalPolicies.MaxTotalSizePerPod != nil && *globalPolicies.MaxTotalSizePerPod >= 0 { + maxTotalSizePerPod = *globalPolicies.MaxTotalSizePerPod * MB + log.Info("Changed MaxTotalSizePerPod", "maxTotalSizePerPod", maxTotalSizePerPod) + } + + if globalPolicies.MaxTotalSizePerContainer != nil && *globalPolicies.MaxTotalSizePerContainer >= 0 { + maxTotalSizePerContainer = *globalPolicies.MaxTotalSizePerContainer * MB + log.Info("Changed MaxTotalSizePerContainer", "maxTotalSizePerContainer", maxTotalSizePerContainer) + } } func (r *CheckpointRestoreOperatorReconciler) handleSpecificPolicies(log logr.Logger, spec *criuorgv1.CheckpointRestoreOperatorSpec) { @@ -280,21 +330,58 @@ func getCheckpointArchiveInformation(log logr.Logger, checkpointPath string) (*c return details, nil } +type Policy struct { + MaxCheckpoints int + MaxCheckpointSize int + MaxTotalSize int +} + func applyPolicies(log logr.Logger, details *checkpointDetails) { policyMutex.Lock() defer policyMutex.Unlock() + toInfinity := func(value *int) int { + if value == nil { + return math.MaxInt32 + } + return *value + } + if policy := findContainerPolicy(details); policy != nil { - handleCheckpointsForLevel(log, details, "container", int(*policy.MaxCheckpoints)) + handleCheckpointsForLevel(log, details, "container", Policy{ + MaxCheckpoints: toInfinity(policy.MaxCheckpoints), + MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, + MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, + }) } else if policy := findPodPolicy(details); policy != nil { - handleCheckpointsForLevel(log, details, "pod", int(*policy.MaxCheckpoints)) + handleCheckpointsForLevel(log, details, "pod", Policy{ + MaxCheckpoints: toInfinity(policy.MaxCheckpoints), + MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, + MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, + }) } else if policy := findNamespacePolicy(details); policy != nil { - handleCheckpointsForLevel(log, details, "namespace", int(*policy.MaxCheckpoints)) + handleCheckpointsForLevel(log, details, "namespace", Policy{ + MaxCheckpoints: toInfinity(policy.MaxCheckpoints), + MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, + MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, + }) } else { // Apply global policies if no specific policy found - handleCheckpointsForLevel(log, details, "container", maxCheckpointsPerContainer) - handleCheckpointsForLevel(log, details, "pod", maxCheckpointsPerPod) - handleCheckpointsForLevel(log, details, "namespace", maxCheckpointsPerNamespace) + handleCheckpointsForLevel(log, details, "container", Policy{ + MaxCheckpoints: maxCheckpointsPerContainer, + MaxCheckpointSize: maxCheckpointSize, + MaxTotalSize: maxTotalSizePerContainer, + }) + handleCheckpointsForLevel(log, details, "pod", Policy{ + MaxCheckpoints: maxCheckpointsPerPod, + MaxCheckpointSize: maxCheckpointSize, + MaxTotalSize: maxTotalSizePerPod, + }) + handleCheckpointsForLevel(log, details, "namespace", Policy{ + MaxCheckpoints: maxCheckpointsPerNamespace, + MaxCheckpointSize: maxCheckpointSize, + MaxTotalSize: maxTotalSizePerNamespace, + }) } } @@ -374,8 +461,17 @@ func handleWriteFinished(ctx context.Context, event fsnotify.Event) { applyPolicies(log, details) } -func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, level string, maxCheckpoints int) { - if maxCheckpoints <= 0 { +func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, level string, policy Policy) { + if policy.MaxCheckpoints <= 0 { + log.Info("MaxCheckpoints is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxCheckpoints", policy.MaxCheckpoints) + return + } + if policy.MaxCheckpointSize <= 0 { + log.Info("MaxCheckpointSize is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxCheckpointSize", policy.MaxCheckpointSize) + return + } + if policy.MaxTotalSize <= 0 { + log.Info("MaxTotalSize is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxTotalSize", policy.MaxTotalSize) return } @@ -433,27 +529,36 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve } checkpointArchivesCounter := len(filteredArchives) + totalSize := int64(0) + archiveSizes := make(map[string]int64) archivesToDelete := make(map[int64]string) for _, c := range filteredArchives { fi, err := os.Stat(c) if err != nil { - log.Error( - err, - "failed to stat", - "file", - c, - ) + log.Error(err, "failed to stat", "file", c) + continue + } + + log.Info("Checkpoint archive details", "archive", c, "size", fi.Size(), "maxCheckpointSize", policy.MaxCheckpointSize) + if policy.MaxCheckpointSize > 0 && fi.Size() > int64(policy.MaxCheckpointSize) { + log.Info("Deleting checkpoint archive due to exceeding MaxCheckpointSize", "archive", c, "size", fi.Size(), "maxCheckpointSize", policy.MaxCheckpointSize) + err := os.Remove(c) + if err != nil { + log.Error(err, "failed to remove checkpoint archive", "archive", c) + } continue } + totalSize += fi.Size() + archiveSizes[c] = fi.Size() archivesToDelete[fi.ModTime().UnixMicro()] = c } // Handle excess checkpoints by count - if maxCheckpoints > 0 && checkpointArchivesCounter > maxCheckpoints { - excessCount := int64(checkpointArchivesCounter - maxCheckpoints) - log.Info("Checkpoint count exceeds limit", "checkpointArchivesCounter", checkpointArchivesCounter, "maxCheckpoints", maxCheckpoints, "excessCount", excessCount) - toDelete := selectArchivesToDelete(log, checkpointArchives, excessCount, ByCount) + if policy.MaxCheckpoints > 0 && checkpointArchivesCounter > policy.MaxCheckpoints { + excessCount := int64(checkpointArchivesCounter - policy.MaxCheckpoints) + log.Info("Checkpoint count exceeds limit", "checkpointArchivesCounter", checkpointArchivesCounter, "maxCheckpoints", policy.MaxCheckpoints, "excessCount", excessCount) + toDelete := selectArchivesToDelete(log, checkpointArchives, archiveSizes, excessCount, ByCount) for _, archive := range toDelete { log.Info("Deleting checkpoint archive due to excess count", "archive", archive) err := os.Remove(archive) @@ -461,14 +566,33 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve log.Error(err, "removal of checkpoint archive failed", "archive", archive) } checkpointArchivesCounter-- - if checkpointArchivesCounter <= maxCheckpoints { + if checkpointArchivesCounter <= policy.MaxCheckpoints { + break + } + } + } + + // Handle total size against maxTotalSize + if policy.MaxTotalSize > 0 && totalSize > int64(policy.MaxTotalSize) { + excessSize := totalSize - int64(policy.MaxTotalSize) + log.Info("Total size of checkpoint archives exceeds limit", "totalSize", totalSize, "maxTotalSize", policy.MaxTotalSize, "excessSize", excessSize) + toDelete := selectArchivesToDelete(log, filteredArchives, archiveSizes, excessSize, BySize) + for _, archive := range toDelete { + log.Info("Deleting checkpoint archive due to excess size", "archive", archive) + err := os.Remove(archive) + if err != nil { + log.Error(err, "Removal of checkpoint archive failed", "archive", archive) + } + totalSize -= archiveSizes[archive] + delete(archiveSizes, archive) + if totalSize <= int64(policy.MaxTotalSize) { break } } } } -func selectArchivesToDelete(log logr.Logger, archives []string, excess int64, policy RetentionPolicy) []string { +func selectArchivesToDelete(log logr.Logger, archives []string, archiveSizes map[string]int64, excess int64, policy RetentionPolicy) []string { toDelete := make([]string, 0) switch policy { @@ -493,6 +617,32 @@ func selectArchivesToDelete(log logr.Logger, archives []string, excess int64, po for i := 0; i < int(excess); i++ { toDelete = append(toDelete, archives[i]) } + + case BySize: + // Sort by modification time (oldest first) + sort.Slice(archives, func(i, j int) bool { + fileInfo1, err1 := os.Stat(archives[i]) + if err1 != nil { + log.Error(err1, "Error stating file", archives[i]) + return false + } + + fileInfo2, err2 := os.Stat(archives[j]) + if err2 != nil { + log.Error(err2, "Error stating file", archives[j]) + return false + } + + return fileInfo1.ModTime().Before(fileInfo2.ModTime()) + }) + + for _, archive := range archives { + toDelete = append(toDelete, archive) + excess -= archiveSizes[archive] + if excess <= 0 { + break + } + } } return toDelete @@ -577,6 +727,10 @@ func (gc *garbageCollector) runGarbageCollector() { log.Info("MaxCheckpointsPerContainer", "maxCheckpointsPerContainer", maxCheckpointsPerContainer) log.Info("MaxCheckpointsPerPod", "maxCheckpointsPerPod", maxCheckpointsPerPod) log.Info("MaxCheckpointsPerNamespace", "maxCheckpointsPerNamespace", maxCheckpointsPerNamespace) + log.Info("MaxCheckpointSize", "maxCheckpointSize", maxCheckpointSize) + log.Info("MaxTotalSizePerNamespace", "maxTotalSizePerNamespace", maxTotalSizePerNamespace) + log.Info("MaxTotalSizePerPod", "maxTotalSizePerPod", maxTotalSizePerPod) + log.Info("MaxTotalSizePerContainer", "maxTotalSizePerContainer", maxTotalSizePerContainer) err = watcher.Add(checkpointDirectory) if err != nil { From a0c32a55233e1095bdc96acedc8feebe3994cd24 Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Wed, 31 Jul 2024 21:28:15 +0530 Subject: [PATCH 2/7] ci: Add CI workflow for storage quota policies - Enhance generate_checkpoint_tar.sh to optionally generate tar files larger than 5MB - Update GitHub Actions workflow to test storage quota garbage collection policies Signed-off-by: Parthiba-Hazra --- .github/workflows/tests.yaml | 6 +++++ test/generate_checkpoint_tar.sh | 19 +++++++++++++++ test/run_tests.bats | 24 +++++++++++++++++++ ...test_bySize_checkpointrestoreoperator.yaml | 24 +++++++++++++++++++ 4 files changed, 73 insertions(+) create mode 100644 test/test_bySize_checkpointrestoreoperator.yaml diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index ca2a2ea0..cc11e954 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -95,3 +95,9 @@ jobs: - name: Test Max Checkpoints Set to 1 run: sudo -E bats -f "test_max_checkpoints_set_to_1" ./test/run_tests.bats + + - name: Test Max Total Checkpoint Size + run: sudo -E bats -f "test_max_total_checkpoint_size" ./test/run_tests.bats + + - name: Test Max Checkpoint Size + run: sudo -E bats -f "test_max_checkpoint_size" ./test/run_tests.bats diff --git a/test/generate_checkpoint_tar.sh b/test/generate_checkpoint_tar.sh index 6938d501..27832a44 100755 --- a/test/generate_checkpoint_tar.sh +++ b/test/generate_checkpoint_tar.sh @@ -30,12 +30,31 @@ increment_timestamp() { date -d "$1 + 1 second" +%Y-%m-%dT%H:%M:%S } +generate_large_file() { + local file_path=$1 + local size_mb=$2 + dd if=/dev/urandom of="$file_path" bs=1M count="$size_mb" status=none +} + +CREATE_LARGE=false +if [ "$1" == "large" ]; then + CREATE_LARGE=true + echo "Creating large files: $CREATE_LARGE" +fi + TIMESTAMP=$(date +%Y-%m-%dT%H:%M:%S) for _ in {1..5}; do TAR_NAME="checkpoint.tar" ORIGINAL_NAME="checkpoint-podname_namespace-containername-$TIMESTAMP.tar" + + if [ "$CREATE_LARGE" = true ]; then + LARGE_FILE="$TEMP_DIR/large_file" + generate_large_file "$LARGE_FILE" 5 + fi + create_checkpoint_tar "$TAR_NAME" "$ORIGINAL_NAME" + TIMESTAMP=$(increment_timestamp "$TIMESTAMP") done diff --git a/test/run_tests.bats b/test/run_tests.bats index 7e47e376..0720e488 100755 --- a/test/run_tests.bats +++ b/test/run_tests.bats @@ -58,3 +58,27 @@ function teardown() { log_and_run ls -la "$CHECKPOINT_DIR" [ "$status" -eq 0 ] } + +@test "test_max_total_checkpoint_size" { + log_and_run kubectl apply -f ./test/test_bySize_checkpointrestoreoperator.yaml + [ "$status" -eq 0 ] + log_and_run ./test/generate_checkpoint_tar.sh large + [ "$status" -eq 0 ] + log_and_run ./test/wait_for_checkpoint_reduction.sh 2 + [ "$status" -eq 0 ] + log_and_run ls -la "$CHECKPOINT_DIR" + [ "$status" -eq 0 ] +} + +@test "test_max_checkpoint_size" { + log_and_run sed -i '/^ containerPolicies:/,/maxTotalSize: [0-9]*/ s/^/#/' ./test/test_bySize_checkpointrestoreoperator.yaml + [ "$status" -eq 0 ] + log_and_run kubectl apply -f ./test/test_bySize_checkpointrestoreoperator.yaml + [ "$status" -eq 0 ] + log_and_run ./test/generate_checkpoint_tar.sh large + [ "$status" -eq 0 ] + log_and_run ./test/wait_for_checkpoint_reduction.sh 0 + [ "$status" -eq 0 ] + log_and_run ls -la "$CHECKPOINT_DIR" + [ "$status" -eq 0 ] +} diff --git a/test/test_bySize_checkpointrestoreoperator.yaml b/test/test_bySize_checkpointrestoreoperator.yaml new file mode 100644 index 00000000..2641b9ad --- /dev/null +++ b/test/test_bySize_checkpointrestoreoperator.yaml @@ -0,0 +1,24 @@ +apiVersion: criu.org/v1 +kind: CheckpointRestoreOperator +metadata: + labels: + app.kubernetes.io/name: checkpointrestoreoperator + app.kubernetes.io/instance: checkpointrestoreoperator-sample + app.kubernetes.io/part-of: checkpoint-restore-operator + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: checkpoint-restore-operator + name: checkpointrestoreoperator-sample +spec: + checkpointDirectory: /var/lib/kubelet/checkpoints + applyPoliciesImmediately: true + globalPolicy: + maxCheckpointSize: 4 + maxTotalSizePerNamespace: 1000 + maxTotalSizePerPod: 500 + maxTotalSizePerContainer: 100 + containerPolicies: + - namespace: namespace + pod: podname + container: containername + maxCheckpointSize: 6 + maxTotalSize: 12 From d9c8dbbf7a260aff123141b3d75ff7ead0369722 Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Wed, 31 Jul 2024 21:55:12 +0530 Subject: [PATCH 3/7] docs: Add documentation for storage/size based retention policy Signed-off-by: Parthiba-Hazra --- docs/retention_policy.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/retention_policy.md b/docs/retention_policy.md index b69422e8..8b65071f 100644 --- a/docs/retention_policy.md +++ b/docs/retention_policy.md @@ -36,10 +36,14 @@ spec: # pod: # container: # maxCheckpoints: 5 + # maxCheckpointSize: 6 # Maximum size of a single checkpoint in MB + # maxTotalSize: 20 # Maximum total size of checkpoints for the container in MB # podPolicies: # - namespace: # pod: # maxCheckpoints: 10 + # maxCheckpointSize: 8 # Maximum size of a single checkpoint in MB + # maxTotalSize: 50 # Maximum total size of checkpoints for the pod in MB # namespacePolicies: # - namespace: # maxCheckpoints: 15` @@ -54,18 +58,28 @@ A sample configuration file is available [here](/config/samples/_v1_checkpointre - `maxCheckpointsPerNamespace`: Maximum number of checkpoints per namespace. - `maxCheckpointsPerPod`: Maximum number of checkpoints per pod. - `maxCheckpointsPerContainer`: Maximum number of checkpoints per container. + - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. + - `maxTotalSizePerNamespace`: Maximum total size of checkpoints per namespace in MB. + - `maxTotalSizePerPod`: Maximum total size of checkpoints per pod in MB. + - `maxTotalSizePerContainer`: Maximum total size of checkpoints per container in MB. - `containerPolicies` (optional): Specific retention policies for containers. - `namespace`: Namespace of the container. - `pod`: Pod name of the container. - `container`: Container name. - `maxCheckpoints`: Maximum number of checkpoints for the container. + - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. + - `maxTotalSize`: Maximum total size of checkpoints for the container in MB. - `podPolicies` (optional): Specific retention policies for pods. - `namespace`: Namespace of the pod. - `pod`: Pod name. - `maxCheckpoints`: Maximum number of checkpoints for the pod. + - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. + - `maxTotalSize`: Maximum total size of checkpoints for the pod in MB. - `namespacePolicies` (optional): Specific retention policies for namespaces. - `namespace`: Namespace name. - `maxCheckpoints`: Maximum number of checkpoints for the namespace. + - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. + - `maxTotalSize`: Maximum total size of checkpoints for the namespace in MB. ## Policy Hierarchy and Application From 2ce049a09d82092d31f95ad3a5ceb423e6cbbd51 Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Mon, 12 Aug 2024 13:37:07 +0530 Subject: [PATCH 4/7] Add RBAC permission markers to CheckpointRestoreOperator API definition - To implement the orphan checkpoint retention policy, the manager requires permissions to watch and get resources. This allows the manager pod to watch the relevant resources and retrieve the necessary resource information when applying the policies. Signed-off-by: Parthiba-Hazra --- api/v1/checkpointrestoreoperator_types.go | 2 ++ config/rbac/role.yaml | 16 ++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/api/v1/checkpointrestoreoperator_types.go b/api/v1/checkpointrestoreoperator_types.go index af2be526..728b6e1c 100644 --- a/api/v1/checkpointrestoreoperator_types.go +++ b/api/v1/checkpointrestoreoperator_types.go @@ -74,6 +74,8 @@ type CheckpointRestoreOperatorStatus struct { //+kubebuilder:object:root=true //+kubebuilder:subresource:status +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +//+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch // CheckpointRestoreOperator is the Schema for the checkpointrestoreoperators API type CheckpointRestoreOperator struct { diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index ad8c043c..833ece98 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -4,6 +4,22 @@ kind: ClusterRole metadata: name: manager-role rules: +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch - apiGroups: - criu.org resources: From bb9f48facd8bc4151a2083922dfa4ab51ef33e61 Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Mon, 12 Aug 2024 13:41:22 +0530 Subject: [PATCH 5/7] feat: Implement orphan retention policies - Added support for orphan retention policies at the global, namespace, pod, and container levels. - Introduced the `retainOrphan` field in each policy type to control the retention of orphan checkpoints. - Updated the policy application logic to delete all orphan checkpoints when `retainOrphan` is set to false. - Implemented a PodWatcher to monitor pod deletions and apply policies immediately when a resource is deleted. Signed-off-by: Parthiba-Hazra --- api/v1/checkpointrestoreoperator_types.go | 18 +- api/v1/zz_generated.deepcopy.go | 20 ++ .../criu.org_checkpointrestoreoperators.yaml | 8 + .../_v1_checkpointrestoreoperator.yaml | 2 + go.mod | 2 +- .../checkpointrestoreoperator_controller.go | 200 ++++++++++++++++++ 6 files changed, 242 insertions(+), 8 deletions(-) diff --git a/api/v1/checkpointrestoreoperator_types.go b/api/v1/checkpointrestoreoperator_types.go index 728b6e1c..c88cc988 100644 --- a/api/v1/checkpointrestoreoperator_types.go +++ b/api/v1/checkpointrestoreoperator_types.go @@ -34,19 +34,21 @@ type CheckpointRestoreOperatorSpec struct { } type GlobalPolicySpec struct { - MaxCheckpointsPerNamespaces *int `json:"maxCheckpointsPerNamespace,omitempty"` - MaxCheckpointsPerPod *int `json:"maxCheckpointsPerPod,omitempty"` - MaxCheckpointsPerContainer *int `json:"maxCheckpointsPerContainer,omitempty"` - MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` - MaxTotalSizePerNamespace *int `json:"maxTotalSizePerNamespace,omitempty"` - MaxTotalSizePerPod *int `json:"maxTotalSizePerPod,omitempty"` - MaxTotalSizePerContainer *int `json:"maxTotalSizePerContainer,omitempty"` + RetainOrphan *bool `json:"retainOrphan,omitempty"` + MaxCheckpointsPerNamespaces *int `json:"maxCheckpointsPerNamespace,omitempty"` + MaxCheckpointsPerPod *int `json:"maxCheckpointsPerPod,omitempty"` + MaxCheckpointsPerContainer *int `json:"maxCheckpointsPerContainer,omitempty"` + MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` + MaxTotalSizePerNamespace *int `json:"maxTotalSizePerNamespace,omitempty"` + MaxTotalSizePerPod *int `json:"maxTotalSizePerPod,omitempty"` + MaxTotalSizePerContainer *int `json:"maxTotalSizePerContainer,omitempty"` } type ContainerPolicySpec struct { Namespace string `json:"namespace,omitempty"` Pod string `json:"pod,omitempty"` Container string `json:"container,omitempty"` + RetainOrphan *bool `json:"retainOrphan,omitempty"` MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` MaxTotalSize *int `json:"maxTotalSize,omitempty"` @@ -55,6 +57,7 @@ type ContainerPolicySpec struct { type PodPolicySpec struct { Namespace string `json:"namespace,omitempty"` Pod string `json:"pod,omitempty"` + RetainOrphan *bool `json:"retainOrphan,omitempty"` MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` MaxTotalSize *int `json:"maxTotalSize,omitempty"` @@ -62,6 +65,7 @@ type PodPolicySpec struct { type NamespacePolicySpec struct { Namespace string `json:"namespace,omitempty"` + RetainOrphan *bool `json:"retainOrphan,omitempty"` MaxCheckpoints *int `json:"maxCheckpoints,omitempty"` MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"` MaxTotalSize *int `json:"maxTotalSize,omitempty"` diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 24ad2941..ac633fa1 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -138,6 +138,11 @@ func (in *CheckpointRestoreOperatorStatus) DeepCopy() *CheckpointRestoreOperator // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ContainerPolicySpec) DeepCopyInto(out *ContainerPolicySpec) { *out = *in + if in.RetainOrphan != nil { + in, out := &in.RetainOrphan, &out.RetainOrphan + *out = new(bool) + **out = **in + } if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints *out = new(int) @@ -168,6 +173,11 @@ func (in *ContainerPolicySpec) DeepCopy() *ContainerPolicySpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GlobalPolicySpec) DeepCopyInto(out *GlobalPolicySpec) { *out = *in + if in.RetainOrphan != nil { + in, out := &in.RetainOrphan, &out.RetainOrphan + *out = new(bool) + **out = **in + } if in.MaxCheckpointsPerNamespaces != nil { in, out := &in.MaxCheckpointsPerNamespaces, &out.MaxCheckpointsPerNamespaces *out = new(int) @@ -218,6 +228,11 @@ func (in *GlobalPolicySpec) DeepCopy() *GlobalPolicySpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NamespacePolicySpec) DeepCopyInto(out *NamespacePolicySpec) { *out = *in + if in.RetainOrphan != nil { + in, out := &in.RetainOrphan, &out.RetainOrphan + *out = new(bool) + **out = **in + } if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints *out = new(int) @@ -248,6 +263,11 @@ func (in *NamespacePolicySpec) DeepCopy() *NamespacePolicySpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodPolicySpec) DeepCopyInto(out *PodPolicySpec) { *out = *in + if in.RetainOrphan != nil { + in, out := &in.RetainOrphan, &out.RetainOrphan + *out = new(bool) + **out = **in + } if in.MaxCheckpoints != nil { in, out := &in.MaxCheckpoints, &out.MaxCheckpoints *out = new(int) diff --git a/config/crd/bases/criu.org_checkpointrestoreoperators.yaml b/config/crd/bases/criu.org_checkpointrestoreoperators.yaml index 86d572c4..a235af18 100644 --- a/config/crd/bases/criu.org_checkpointrestoreoperators.yaml +++ b/config/crd/bases/criu.org_checkpointrestoreoperators.yaml @@ -62,6 +62,8 @@ spec: type: string pod: type: string + retainOrphan: + type: boolean type: object type: array globalPolicy: @@ -80,6 +82,8 @@ spec: type: integer maxTotalSizePerPod: type: integer + retainOrphan: + type: boolean type: object namespacePolicies: items: @@ -92,6 +96,8 @@ spec: type: integer namespace: type: string + retainOrphan: + type: boolean type: object type: array podPolicies: @@ -107,6 +113,8 @@ spec: type: string pod: type: string + retainOrphan: + type: boolean type: object type: array type: object diff --git a/config/samples/_v1_checkpointrestoreoperator.yaml b/config/samples/_v1_checkpointrestoreoperator.yaml index 963ffc2c..ac9fa006 100644 --- a/config/samples/_v1_checkpointrestoreoperator.yaml +++ b/config/samples/_v1_checkpointrestoreoperator.yaml @@ -12,6 +12,7 @@ spec: checkpointDirectory: /var/lib/kubelet/checkpoints applyPoliciesImmediately: false globalPolicy: + retainOrphan: true maxCheckpointsPerNamespace: 50 maxCheckpointsPerPod: 30 maxCheckpointsPerContainer: 10 @@ -23,6 +24,7 @@ spec: # - namespace: # pod: # container: + # retainOrphan: false # maxCheckpoints: 5 # maxCheckpointSize: 10 # maxTotalSize: 100 diff --git a/go.mod b/go.mod index 021aa0f4..391b8af4 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/go-logr/logr v1.4.2 github.com/onsi/ginkgo/v2 v2.21.0 github.com/onsi/gomega v1.35.1 + k8s.io/api v0.31.2 k8s.io/apimachinery v0.31.2 k8s.io/client-go v0.31.2 k8s.io/kubelet v0.31.2 @@ -75,7 +76,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.31.2 // indirect k8s.io/apiextensions-apiserver v0.31.2 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20241009091222-67ed5848f094 // indirect diff --git a/internal/controller/checkpointrestoreoperator_controller.go b/internal/controller/checkpointrestoreoperator_controller.go index a1b68909..d8054f75 100644 --- a/internal/controller/checkpointrestoreoperator_controller.go +++ b/internal/controller/checkpointrestoreoperator_controller.go @@ -36,7 +36,14 @@ import ( metadata "github.com/checkpoint-restore/checkpointctl/lib" "github.com/containers/storage/pkg/archive" "github.com/go-logr/logr" + v1 "k8s.io/api/core/v1" + k8err "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" kubelettypes "k8s.io/kubelet/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -68,6 +75,7 @@ var ( maxCheckpointsPerContainer int = 10 maxCheckpointsPerPod int = 20 maxCheckpointsPerNamespace int = 30 + retainOrphan *bool maxCheckpointSize int = math.MaxInt32 maxTotalSizePerPod int = math.MaxInt32 maxTotalSizePerContainer int = math.MaxInt32 @@ -121,6 +129,7 @@ func (r *CheckpointRestoreOperatorReconciler) Reconcile(ctx context.Context, req } func resetAllPoliciesToDefault(log logr.Logger) { + retainOrphan = nil maxCheckpointsPerContainer = 10 maxCheckpointsPerPod = 20 maxCheckpointsPerNamespace = 30 @@ -136,10 +145,22 @@ func resetAllPoliciesToDefault(log logr.Logger) { log.Info("Policies have been reset to their default values") } +func ptr(b bool) *bool { + return &b +} + func (r *CheckpointRestoreOperatorReconciler) handleGlobalPolicies(log logr.Logger, globalPolicies *criuorgv1.GlobalPolicySpec) { policyMutex.Lock() defer policyMutex.Unlock() + if globalPolicies.RetainOrphan == nil { + retainOrphan = ptr(true) + log.Info("RetainOrphan policy not set, using default", "retainOrphan", retainOrphan) + } else { + retainOrphan = globalPolicies.RetainOrphan + log.Info("Changed RetainOrphan policy", "retainOrphan", retainOrphan) + } + if globalPolicies.MaxCheckpointsPerContainer != nil && *globalPolicies.MaxCheckpointsPerContainer >= 0 { maxCheckpointsPerContainer = *globalPolicies.MaxCheckpointsPerContainer log.Info("Changed MaxCheckpointsPerContainer", "maxCheckpointsPerContainer", maxCheckpointsPerContainer) @@ -331,6 +352,7 @@ func getCheckpointArchiveInformation(log logr.Logger, checkpointPath string) (*c } type Policy struct { + RetainOrphan bool MaxCheckpoints int MaxCheckpointSize int MaxTotalSize int @@ -347,20 +369,30 @@ func applyPolicies(log logr.Logger, details *checkpointDetails) { return *value } + ifNil := func(value *bool) bool { + if value == nil { + return true + } + return *value + } + if policy := findContainerPolicy(details); policy != nil { handleCheckpointsForLevel(log, details, "container", Policy{ + RetainOrphan: ifNil(policy.RetainOrphan), MaxCheckpoints: toInfinity(policy.MaxCheckpoints), MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, }) } else if policy := findPodPolicy(details); policy != nil { handleCheckpointsForLevel(log, details, "pod", Policy{ + RetainOrphan: ifNil(policy.RetainOrphan), MaxCheckpoints: toInfinity(policy.MaxCheckpoints), MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, }) } else if policy := findNamespacePolicy(details); policy != nil { handleCheckpointsForLevel(log, details, "namespace", Policy{ + RetainOrphan: ifNil(policy.RetainOrphan), MaxCheckpoints: toInfinity(policy.MaxCheckpoints), MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB, MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB, @@ -368,16 +400,19 @@ func applyPolicies(log logr.Logger, details *checkpointDetails) { } else { // Apply global policies if no specific policy found handleCheckpointsForLevel(log, details, "container", Policy{ + RetainOrphan: *retainOrphan, MaxCheckpoints: maxCheckpointsPerContainer, MaxCheckpointSize: maxCheckpointSize, MaxTotalSize: maxTotalSizePerContainer, }) handleCheckpointsForLevel(log, details, "pod", Policy{ + RetainOrphan: *retainOrphan, MaxCheckpoints: maxCheckpointsPerPod, MaxCheckpointSize: maxCheckpointSize, MaxTotalSize: maxTotalSizePerPod, }) handleCheckpointsForLevel(log, details, "namespace", Policy{ + RetainOrphan: *retainOrphan, MaxCheckpoints: maxCheckpointsPerNamespace, MaxCheckpointSize: maxCheckpointSize, MaxTotalSize: maxTotalSizePerNamespace, @@ -528,6 +563,27 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve filteredArchives = append(filteredArchives, archive) } + if !policy.RetainOrphan { + exist, err := resourceExistsInCluster(level, details) + if err != nil { + log.Error(err, "failed to check if resource exists in cluster", "level", level) + } + + if !exist { + log.Info("RetainOrphan is set to false, deleting all checkpoints", "level", level) + + for _, archive := range filteredArchives { + log.Info("Deleting checkpoint archive due to retainCheckpoint=false", "archive", archive) + err := os.Remove(archive) + if err != nil { + log.Error(err, "failed to remove checkpoint archive", "archive", archive) + } + } + + return + } + } + checkpointArchivesCounter := len(filteredArchives) totalSize := int64(0) archiveSizes := make(map[string]int64) @@ -648,6 +704,147 @@ func selectArchivesToDelete(log logr.Logger, archives []string, archiveSizes map return toDelete } +func resourceExistsInCluster(level string, details *checkpointDetails) (bool, error) { + switch level { + case "container": + pod, err := getPodFromNamespace(details.namespace, details.pod) + if err != nil { + if isNotFoundError(err) { + return false, nil + } + return false, err + } + for _, container := range pod.Spec.Containers { + if container.Name == details.container { + return true, nil + } + } + return false, nil + + case "pod": + _, err := getPodFromNamespace(details.namespace, details.pod) + if err != nil { + if isNotFoundError(err) { + return false, nil + } + return false, err + } + return true, nil + + case "namespace": + _, err := getNamespace(details.namespace) + if err != nil { + if isNotFoundError(err) { + return false, nil + } + return false, err + } + return true, nil + + default: + return false, fmt.Errorf("invalid level: %s", level) + } +} + +func isNotFoundError(err error) bool { + return k8err.IsNotFound(err) +} + +func getKubernetesClient() (*kubernetes.Clientset, error) { + // Use in-cluster configuration + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + return kubernetes.NewForConfig(config) +} + +func getPodFromNamespace(namespace, podName string) (*v1.Pod, error) { + clientset, err := getKubernetesClient() + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client: %v", err) + } + + pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + return pod, nil +} + +func getNamespace(namespace string) (*v1.Namespace, error) { + clientset, err := getKubernetesClient() + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client: %v", err) + } + + ns, err := clientset.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + return ns, nil +} + +func (gc *garbageCollector) PodWatcher(log logr.Logger, stopCh <-chan struct{}) { + clientset, err := getKubernetesClient() + if err != nil { + log.Error(err, "Failed to create Kubernetes client") + return + } + + watchlist := cache.NewListWatchFromClient( + clientset.CoreV1().RESTClient(), + "pods", + metav1.NamespaceAll, + fields.Everything(), + ) + + _, controller := cache.NewInformerWithOptions(cache.InformerOptions{ + ListerWatcher: watchlist, + ObjectType: &v1.Pod{}, + Handler: cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + log.Info("Could not convert object to Pod") + return + } + handlePodDeleted(log, pod) + }, + }, + }) + + go controller.Run(stopCh) + + <-stopCh + log.Info("PodWatcher has been stopped") +} + +func handlePodDeleted(log logr.Logger, pod *v1.Pod) { + log.Info("Pod deleted", "pod", pod.Name) + + containerNames := getContainerNames(pod) + + for _, containerName := range containerNames { + details := &checkpointDetails{ + namespace: pod.Namespace, + pod: pod.Name, + container: containerName, + } + applyPolicies(log, details) + } +} + +func getContainerNames(pod *v1.Pod) []string { + var containerNames []string + for _, container := range pod.Spec.Containers { + containerNames = append(containerNames, container.Name) + } + return containerNames +} + func (gc *garbageCollector) runGarbageCollector() { // This function tries to detect newly created checkpoint archives with the help // of inotify/fsnotify. If a new checkpoint archive is created we get a @@ -680,6 +877,9 @@ func (gc *garbageCollector) runGarbageCollector() { c := make(chan struct{}) + // Start pod watcher in a separate goroutine + go gc.PodWatcher(log, c) + go func() { for { select { From 76ea1c8f3b4c2141f14f2be139f8d875385a0a37 Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Mon, 12 Aug 2024 17:09:39 +0530 Subject: [PATCH 6/7] ci: Add CI test for orphan retention policy - Add `test_orphan_retention_policy` test and update GitHub Actions workflow to test orphan retention policy Signed-off-by: Parthiba-Hazra --- .github/workflows/tests.yaml | 3 +++ test/run_tests.bats | 11 ++++++++++ ...test_orphan_checkpointrestoreoperator.yaml | 20 +++++++++++++++++++ 3 files changed, 34 insertions(+) create mode 100644 test/test_orphan_checkpointrestoreoperator.yaml diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index cc11e954..ff20fb5d 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -101,3 +101,6 @@ jobs: - name: Test Max Checkpoint Size run: sudo -E bats -f "test_max_checkpoint_size" ./test/run_tests.bats + + - name: Test orphan retention + run: sudo -E bats -f "test_orphan_retention_policy" ./test/run_tests.bats diff --git a/test/run_tests.bats b/test/run_tests.bats index 0720e488..389f0ae2 100755 --- a/test/run_tests.bats +++ b/test/run_tests.bats @@ -82,3 +82,14 @@ function teardown() { log_and_run ls -la "$CHECKPOINT_DIR" [ "$status" -eq 0 ] } + +@test "test_orphan_retention_policy" { + log_and_run kubectl apply -f ./test/test_orphan_checkpointrestoreoperator.yaml + [ "$status" -eq 0 ] + log_and_run ./test/generate_checkpoint_tar.sh + [ "$status" -eq 0 ] + log_and_run ./test/wait_for_checkpoint_reduction.sh 0 + [ "$status" -eq 0 ] + log_and_run ls -la "$CHECKPOINT_DIR" + [ "$status" -eq 0 ] +} diff --git a/test/test_orphan_checkpointrestoreoperator.yaml b/test/test_orphan_checkpointrestoreoperator.yaml new file mode 100644 index 00000000..ac868389 --- /dev/null +++ b/test/test_orphan_checkpointrestoreoperator.yaml @@ -0,0 +1,20 @@ +apiVersion: criu.org/v1 +kind: CheckpointRestoreOperator +metadata: + labels: + app.kubernetes.io/name: checkpointrestoreoperator + app.kubernetes.io/instance: checkpointrestoreoperator-sample + app.kubernetes.io/part-of: checkpoint-restore-operator + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: checkpoint-restore-operator + name: checkpointrestoreoperator-sample +spec: + checkpointDirectory: /var/lib/kubelet/checkpoints + applyPoliciesImmediately: true + globalPolicy: + retainOrphan: true + containerPolicies: + - namespace: namespace + pod: podname + container: containername + retainOrphan: false From f0ceb8642389c0a03fac25f674a925dc66838600 Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Mon, 12 Aug 2024 17:25:09 +0530 Subject: [PATCH 7/7] docs: Add documentation for orphan retention policy Signed-off-by: Parthiba-Hazra --- docs/retention_policy.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/retention_policy.md b/docs/retention_policy.md index 8b65071f..0dca871e 100644 --- a/docs/retention_policy.md +++ b/docs/retention_policy.md @@ -27,6 +27,7 @@ spec: checkpointDirectory: /var/lib/kubelet/checkpoints applyPoliciesImmediately: false globalPolicy: + retainOrphan: true maxCheckpointsPerNamespace: 50 maxCheckpointsPerPod: 30 maxCheckpointsPerContainer: 10 @@ -35,6 +36,7 @@ spec: # - namespace: # pod: # container: + # retainOrphan: false # Set to false will delete all orphan checkpoints # maxCheckpoints: 5 # maxCheckpointSize: 6 # Maximum size of a single checkpoint in MB # maxTotalSize: 20 # Maximum total size of checkpoints for the container in MB @@ -46,7 +48,7 @@ spec: # maxTotalSize: 50 # Maximum total size of checkpoints for the pod in MB # namespacePolicies: # - namespace: - # maxCheckpoints: 15` + # maxCheckpoints: 15` ``` A sample configuration file is available [here](/config/samples/_v1_checkpointrestoreoperator.yaml). @@ -55,6 +57,7 @@ A sample configuration file is available [here](/config/samples/_v1_checkpointre - `checkpointDirectory`: Specifies the directory where checkpoints are stored. - `applyPoliciesImmediately`: If set to `true`, the policies are applied immediately. If `false` (default value), they are applied after new checkpoint creation. - `globalPolicy`: Defines global checkpoint retention limits. + - `retainOrphan`: If set to `true` (default), orphan checkpoints (checkpoints whose associated resources have been deleted) will be retained. If set to `false`, orphan checkpoints will be automatically deleted. This is particularly useful for transient checkpoints used to recover from errors by replacing 'container restart' with 'container restore'. - `maxCheckpointsPerNamespace`: Maximum number of checkpoints per namespace. - `maxCheckpointsPerPod`: Maximum number of checkpoints per pod. - `maxCheckpointsPerContainer`: Maximum number of checkpoints per container. @@ -66,17 +69,20 @@ A sample configuration file is available [here](/config/samples/_v1_checkpointre - `namespace`: Namespace of the container. - `pod`: Pod name of the container. - `container`: Container name. + - `retainOrphan`: If set to `true` (default), orphan checkpoints for this container will be retained. If set to `false`, orphan checkpoints will be deleted. - `maxCheckpoints`: Maximum number of checkpoints for the container. - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. - `maxTotalSize`: Maximum total size of checkpoints for the container in MB. - `podPolicies` (optional): Specific retention policies for pods. - `namespace`: Namespace of the pod. - `pod`: Pod name. + - `retainOrphan`: If set to `true` (default), orphan checkpoints for this pod will be retained. If set to `false`, orphan checkpoints will be deleted. - `maxCheckpoints`: Maximum number of checkpoints for the pod. - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. - `maxTotalSize`: Maximum total size of checkpoints for the pod in MB. - `namespacePolicies` (optional): Specific retention policies for namespaces. - `namespace`: Namespace name. + - `retainOrphan`: If set to `true` (default), orphan checkpoints for this namespace will be retained. If set to `false`, orphan checkpoints will be deleted. - `maxCheckpoints`: Maximum number of checkpoints for the namespace. - `maxCheckpointSize`: Maximum size of a single checkpoint in MB. - `maxTotalSize`: Maximum total size of checkpoints for the namespace in MB.