Skip to content

Commit

Permalink
Add cronjob to purge backups (#1167)
Browse files Browse the repository at this point in the history
  • Loading branch information
emerkle826 authored Jan 19, 2024
1 parent f96ea1c commit 8c32ae3
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 3 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG/CHANGELOG-1.12.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ Changelog for the K8ssandra Operator, new PRs should update the `unreleased` sec
When cutting a new release, update the `unreleased` heading to the tag being generated and date, like `## vX.Y.Z - YYYY-MM-DD` and create a new placeholder section for `unreleased` entries.

## unreleased
- [ENHANCEMENT] [#1094](https://github.com/k8ssandra/k8ssandra-operator/issues/1094) Expose AdditionalAnnotations field for cassDC.
* [ENHANCEMENT] [#1160](https://github.com/k8ssandra/k8ssandra-operator/issues/1160) Allow disabling Reaper front-end auth.
- [ENHANCEMENT] [#1115](https://github.com/k8ssandra/k8ssandra-operator/issues/1115) Add a validation check for the projected pod names length
* [CHANGE] [#1050](https://github.com/k8ssandra/k8ssandra-operator/issues/1050) Remove unnecessary requeues in the Medusa controllers
* [CHANGE] [#1165](https://github.com/k8ssandra/k8ssandra-operator/issues/1165) Upgrade to Medusa v0.17.1
* [FEATURE] [#1165](https://github.com/k8ssandra/k8ssandra-operator/issues/1165) Expose Medusa ssl_verify option to allow disabling cert verification for some on prem S3 compatible systems
* [ENHANCEMENT] [#1094](https://github.com/k8ssandra/k8ssandra-operator/issues/1094) Expose AdditionalAnnotations field for cassDC.
* [ENHANCEMENT] [#1160](https://github.com/k8ssandra/k8ssandra-operator/issues/1160) Allow disabling Reaper front-end auth.
* [ENHANCEMENT] [#1115](https://github.com/k8ssandra/k8ssandra-operator/issues/1115) Add a validation check for the projected pod names length
* [ENHANCEMENT] [#1115](https://github.com/k8ssandra/k8ssandra-operator/issues/1115) Add a validation check for the projected pod names length
* [ENHANCEMENT] [#1161](https://github.com/k8ssandra/k8ssandra-operator/issues/1161) Update cass-operator Helm chart to 0.46.1. Adds containerPort for cass-operator metrics and changes cass-config-builder base from UBI7 to UBI8
* [ENHANCEMENT] [#1154](https://github.com/k8ssandra/k8ssandra-operator/issues/1154) Schedule purges on clusters that have Medusa configured
* [BUGFIX] [#1002](https://github.com/k8ssandra/k8ssandra-operator/issues/1002) Fix reaper secret name sanitization with cluster overrides
11 changes: 11 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ rules:
verbs:
- list
- watch
- apiGroups:
- batch
resources:
- cronjobs
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- cassandra.datastax.com
resources:
Expand Down
35 changes: 35 additions & 0 deletions controllers/k8ssandra/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/k8ssandra/k8ssandra-operator/pkg/result"
"github.com/k8ssandra/k8ssandra-operator/pkg/utils"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -106,6 +107,10 @@ func (r *K8ssandraClusterReconciler) checkDeletion(ctx context.Context, kc *api.
if r.deleteK8ssandraConfigMaps(ctx, kc, dcTemplate, namespace, remoteClient, logger) {
hasErrors = true
}

if r.deleteCronJobs(ctx, kc, dcTemplate, namespace, remoteClient, logger) {
hasErrors = true
}
}

if hasErrors {
Expand Down Expand Up @@ -437,3 +442,33 @@ func (r *K8ssandraClusterReconciler) deleteDeployments(

return
}

func (r *K8ssandraClusterReconciler) deleteCronJobs(
ctx context.Context,
kc *k8ssandraapi.K8ssandraCluster,
dcTemplate k8ssandraapi.CassandraDatacenterTemplate,
namespace string,
remoteClient client.Client,
kcLogger logr.Logger,
) (hasErrors bool) {
selector := k8ssandralabels.CleanedUpByLabels(client.ObjectKey{Namespace: kc.Namespace, Name: kc.SanitizedName()})
options := client.ListOptions{
Namespace: namespace,
LabelSelector: labels.SelectorFromSet(selector),
}
cronJobList := &batchv1.CronJobList{}
if err := remoteClient.List(ctx, cronJobList, &options); err != nil {
kcLogger.Error(err, "Failed to list Medusa CronJobs", "Context", dcTemplate.K8sContext)
return true
}
for _, item := range cronJobList.Items {
kcLogger.Info("Deleting CronJob", "CronJob", utils.GetKey(&item))
if err := remoteClient.Delete(ctx, &item); err != nil {
key := client.ObjectKey{Namespace: namespace, Name: item.Name}
if !errors.IsNotFound(err) {
kcLogger.Error(err, "Failed to delete CronJob", "CronJob", key, "Context", dcTemplate.K8sContext)
}
}
}
return
}
1 change: 1 addition & 0 deletions controllers/k8ssandra/k8ssandracluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type K8ssandraClusterReconciler struct {
// +kubebuilder:rbac:groups=monitoring.coreos.com,namespace="k8ssandra",resources=servicemonitors,verbs=get;list;watch;create;update;patch;delete;deletecollection
// +kubebuilder:rbac:groups=core,namespace="k8ssandra",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",namespace="k8ssandra",resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=batch,namespace="k8ssandra",resources=cronjobs,verbs=get;list;watch;create;update;delete

func (r *K8ssandraClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues("K8ssandraCluster", req.NamespacedName)
Expand Down
15 changes: 15 additions & 0 deletions controllers/k8ssandra/medusa_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ func (r *K8ssandraClusterReconciler) reconcileMedusa(
logger.Info("Medusa standalone deployment is not ready yet")
return result.RequeueSoon(r.DefaultDelay)
}
// Create a cron job to purge Medusa backups
purgeCronJob, err := medusa.PurgeCronJob(dcConfig, kc.SanitizedName(), namespace, logger)
if err != nil {
logger.Info("Failed to create Medusa purge backups cronjob", "error", err)
return result.Error(err)
}
purgeCronJob.SetLabels(labels.CleanedUpByLabels(kcKey))
recRes = reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *purgeCronJob)
switch {
case recRes.IsError():
return recRes
case recRes.IsRequeue():
return recRes
}

} else {
logger.Info("Medusa is not enabled")
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/medusa/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"text/template"

"github.com/adutra/goalesce"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
k8ss "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1"
api "github.com/k8ssandra/k8ssandra-operator/apis/medusa/v1alpha1"
"github.com/k8ssandra/k8ssandra-operator/pkg/images"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -447,6 +449,10 @@ func MedusaStandaloneDeploymentName(clusterName string, dcName string) string {
return fmt.Sprintf("%s-%s-medusa-standalone", clusterName, dcName)
}

func MedusaPurgeCronJobName(clusterName string, dcName string) string {
return fmt.Sprintf("%s-%s-medusa-purge", clusterName, dcName)
}

func StandaloneMedusaDeployment(medusaContainer corev1.Container, clusterName, dcName, namespace string, logger logr.Logger) *appsv1.Deployment {
// The standalone medusa pod won't be able to resolve its own IP address using DNS entries
medusaContainer.Env = append(medusaContainer.Env, corev1.EnvVar{Name: "MEDUSA_RESOLVE_IP_ADDRESSES", Value: "False"})
Expand Down Expand Up @@ -518,6 +524,51 @@ func StandaloneMedusaService(dcConfig *cassandra.DatacenterConfig, medusaSpec *a
return medusaService
}

func PurgeCronJob(dcConfig *cassandra.DatacenterConfig, clusterName, namespace string, logger logr.Logger) (*batchv1.CronJob, error) {
cronJobName := MedusaPurgeCronJobName(cassdcapi.CleanupForKubernetes(clusterName), dcConfig.SanitizedName())
logger.Info(fmt.Sprintf("Creating Medusa purge backups cronjob: %s", cronJobName))
if len(cronJobName) > 253 {
return nil, fmt.Errorf("Medusa purge backups cronjob name too long (must be less than 253 characters). Length: %d, Job name: %s", len(cronJobName), cronJobName)
}
purgeCronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: cronJobName,
Namespace: namespace,
},
Spec: batchv1.CronJobSpec{
Schedule: "0 0 * * *",
Suspend: pointer.Bool(false),
SuccessfulJobsHistoryLimit: pointer.Int32(3),
FailedJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
ServiceAccountName: "k8ssandra-operator",
Containers: []corev1.Container{
{
Name: "k8ssandra-purge-backups",
Image: "bitnami/kubectl:1.17.3",
ImagePullPolicy: corev1.PullIfNotPresent,
TerminationMessagePath: "/dev/termination-log",
TerminationMessagePolicy: "File",
Command: []string{
"/bin/bash",
"-c",
createPurgeTaskStr(dcConfig.SanitizedName(), namespace),
},
},
},
},
},
},
},
},
}
return purgeCronJob, nil
}

func generateMedusaProbe(configuredProbe *corev1.Probe) (*corev1.Probe, error) {
// Goalesce the custom probe with the default probe,
defaultProbe := defaultMedusaProbe()
Expand Down Expand Up @@ -551,3 +602,18 @@ func defaultMedusaProbe() *corev1.Probe {

return probe
}

func createPurgeTaskStr(dcName string, namespace string) string {
return fmt.Sprintf("printf \""+
"apiVersion: medusa.k8ssandra.io/v1alpha1\\n"+
"kind: MedusaTask\\n"+
"metadata:\\n"+
" name: purge-backups-timestamp\\n"+
" namespace: %s\\n"+
"spec:\\n"+
" cassandraDatacenter: %s\\n"+
" operation: purge"+
"\" "+
"| sed \"s/timestamp/$(date +%%Y%%m%%d%%H%%M%%S)/g\" "+
"| kubectl apply -f -", namespace, dcName)
}
32 changes: 32 additions & 0 deletions pkg/medusa/reconcile_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package medusa

import (
"fmt"
"testing"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -516,3 +517,34 @@ func TestGenerateMedusaProbe(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, probe)
}

func TestPurgeCronJob(t *testing.T) {
// Define your test inputs
dcConfig := &cassandra.DatacenterConfig{
DatacenterName: "testDc",
}
clusterName := "testCluster"
namespace := "testNamespace"
logger := logr.New(logr.Discard().GetSink())

// Call the function with the test inputs
actualCronJob, err := PurgeCronJob(dcConfig, clusterName, namespace, logger)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("%s-%s-medusa-purge", "testcluster", "testdc"), actualCronJob.ObjectMeta.Name)
assert.Equal(t, 3, len(actualCronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command))
assert.Contains(t, actualCronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command[2], "\\nspec:\\n cassandraDatacenter: testdc")
}

func TestPurgeCronJobNameTooLong(t *testing.T) {
// Define your test inputs
dcConfig := &cassandra.DatacenterConfig{
DatacenterName: "testDatacentercWithAReallyLongNameToTestThatTheCharacterCountOfTheNameGoesOverTwoHundredFiftyThreeCharactersTestTestTestTest",
}
clusterName := "testClusterNameBeingWayTooLongToTestThatTheCharacterCountOfTheNameGoesOverTwoHundredFiftyThreeCharactersTestTestTestTest"
namespace := "testNamespace"
logger := logr.New(logr.Discard().GetSink())

// Call the function with the test inputs
_, err := PurgeCronJob(dcConfig, clusterName, namespace, logger)
assert.NotNil(t, err)
}
37 changes: 37 additions & 0 deletions test/e2e/medusa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/k8ssandra/k8ssandra-operator/test/framework"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -37,6 +38,7 @@ func createSingleMedusaJob(t *testing.T, ctx context.Context, namespace string,

checkDatacenterReady(t, ctx, dcKey, f)
checkMedusaContainersExist(t, ctx, namespace, dcKey, f, kc)
checkPurgeCronJobExists(t, ctx, namespace, dcKey, f, kc)
createBackupJob(t, ctx, namespace, f, dcKey)
verifyBackupJobFinished(t, ctx, f, dcKey, backupKey)
restoreBackupJob(t, ctx, namespace, f, dcKey)
Expand Down Expand Up @@ -71,6 +73,7 @@ func createMultiMedusaJob(t *testing.T, ctx context.Context, namespace string, f
for _, dcKey := range []framework.ClusterKey{dc1Key, dc2Key} {
checkDatacenterReady(t, ctx, dcKey, f)
checkMedusaContainersExist(t, ctx, namespace, dcKey, f, kc)
checkPurgeCronJobExists(t, ctx, namespace, dcKey, f, kc)
checkMedusaStandaloneDeploymentExists(t, ctx, dcKey, f, kc)
checkMedusaStandaloneServiceExists(t, ctx, dcKey, f, kc)
}
Expand Down Expand Up @@ -104,6 +107,7 @@ func createMultiDcSingleMedusaJob(t *testing.T, ctx context.Context, namespace s

checkDatacenterReady(t, ctx, dcKey, f)
checkMedusaContainersExist(t, ctx, namespace, dcKey, f, kc)
checkPurgeCronJobExists(t, ctx, namespace, dcKey, f, kc)
createBackupJob(t, ctx, namespace, f, dcKey)
verifyBackupJobFinished(t, ctx, f, dcKey, backupKey)
}
Expand All @@ -126,6 +130,39 @@ func checkMedusaContainersExist(t *testing.T, ctx context.Context, namespace str
require.True(found, fmt.Sprintf("%s doesn't have medusa container", dc1.Name))
}

func checkPurgeCronJobExists(t *testing.T, ctx context.Context, namespace string, dcKey framework.ClusterKey, f *framework.E2eFramework, kc *api.K8ssandraCluster) {
require := require.New(t)
// Get the Cassandra pod
dc1 := &cassdcapi.CassandraDatacenter{}
err := f.Get(ctx, dcKey, dc1)
// check medusa containers exist
require.NoError(err, "Error getting the CassandraDatacenter")
t.Log("Checking that all the Medusa related objects have been created and are in the expected state")
// check that the cronjob exists
cronJob := &batchv1.CronJob{}
err = f.Get(ctx, framework.NewClusterKey(dcKey.K8sContext, namespace, medusapkg.MedusaPurgeCronJobName(kc.SanitizedName(), dc1.SanitizedName())), cronJob)
require.NoErrorf(err, "Error getting the Medusa purge CronJob. ClusterName: %s, DataceneterName: %s", kc.SanitizedName(), dc1.SanitizedName())
// create a Job from the cronjob spec
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "test-purge-job",
},
Spec: cronJob.Spec.JobTemplate.Spec,
}
err = f.Create(ctx, dcKey, job)
require.NoErrorf(err, "Error creating the Medusa purge Job. ClusterName: %s, DataceneterName: %s, Namespace: %s, JobName: test-purge-job", kc.SanitizedName(), dc1.SanitizedName(), namespace)
// ensure the job run was successful
require.Eventually(func() bool {
updated := &batchv1.Job{}
err := f.Get(ctx, framework.NewClusterKey(dcKey.K8sContext, namespace, "test-purge-job"), updated)
if err != nil {
return false
}
return updated.Status.Succeeded == 1
}, polling.medusaBackupDone.timeout, polling.medusaBackupDone.interval, "Medusa purge Job didn't finish within timeout")
}

func createBackupJob(t *testing.T, ctx context.Context, namespace string, f *framework.E2eFramework, dcKey framework.ClusterKey) {
require := require.New(t)
t.Log("creating MedusaBackupJob")
Expand Down

0 comments on commit 8c32ae3

Please sign in to comment.