diff --git a/CHANGELOG/CHANGELOG-1.12.md b/CHANGELOG/CHANGELOG-1.12.md index 520f871d5..411c45663 100644 --- a/CHANGELOG/CHANGELOG-1.12.md +++ b/CHANGELOG/CHANGELOG-1.12.md @@ -14,12 +14,13 @@ Changelog for the K8ssandra Operator, new PRs should update the `unreleased` sec When cutting a new release, update the `unreleased` heading to the tag being generated and date, like `## vX.Y.Z - YYYY-MM-DD` and create a new placeholder section for `unreleased` entries. ## unreleased -- [ENHANCEMENT] [#1094](https://github.com/k8ssandra/k8ssandra-operator/issues/1094) Expose AdditionalAnnotations field for cassDC. -* [ENHANCEMENT] [#1160](https://github.com/k8ssandra/k8ssandra-operator/issues/1160) Allow disabling Reaper front-end auth. -- [ENHANCEMENT] [#1115](https://github.com/k8ssandra/k8ssandra-operator/issues/1115) Add a validation check for the projected pod names length * [CHANGE] [#1050](https://github.com/k8ssandra/k8ssandra-operator/issues/1050) Remove unnecessary requeues in the Medusa controllers * [CHANGE] [#1165](https://github.com/k8ssandra/k8ssandra-operator/issues/1165) Upgrade to Medusa v0.17.1 * [FEATURE] [#1165](https://github.com/k8ssandra/k8ssandra-operator/issues/1165) Expose Medusa ssl_verify option to allow disabling cert verification for some on prem S3 compatible systems +* [ENHANCEMENT] [#1094](https://github.com/k8ssandra/k8ssandra-operator/issues/1094) Expose AdditionalAnnotations field for cassDC. +* [ENHANCEMENT] [#1160](https://github.com/k8ssandra/k8ssandra-operator/issues/1160) Allow disabling Reaper front-end auth. +* [ENHANCEMENT] [#1115](https://github.com/k8ssandra/k8ssandra-operator/issues/1115) Add a validation check for the projected pod names length * [ENHANCEMENT] [#1115](https://github.com/k8ssandra/k8ssandra-operator/issues/1115) Add a validation check for the projected pod names length * [ENHANCEMENT] [#1161](https://github.com/k8ssandra/k8ssandra-operator/issues/1161) Update cass-operator Helm chart to 0.46.1. Adds containerPort for cass-operator metrics and changes cass-config-builder base from UBI7 to UBI8 +* [ENHANCEMENT] [#1154](https://github.com/k8ssandra/k8ssandra-operator/issues/1154) Schedule purges on clusters that have Medusa configured * [BUGFIX] [#1002](https://github.com/k8ssandra/k8ssandra-operator/issues/1002) Fix reaper secret name sanitization with cluster overrides \ No newline at end of file diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 4e64e49e8..916de604c 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -40,6 +40,17 @@ rules: verbs: - list - watch +- apiGroups: + - batch + resources: + - cronjobs + verbs: + - create + - delete + - get + - list + - update + - watch - apiGroups: - cassandra.datastax.com resources: diff --git a/controllers/k8ssandra/cleanup.go b/controllers/k8ssandra/cleanup.go index a24a1f9c2..4ce31a980 100644 --- a/controllers/k8ssandra/cleanup.go +++ b/controllers/k8ssandra/cleanup.go @@ -16,6 +16,7 @@ import ( "github.com/k8ssandra/k8ssandra-operator/pkg/result" "github.com/k8ssandra/k8ssandra-operator/pkg/utils" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -106,6 +107,10 @@ func (r *K8ssandraClusterReconciler) checkDeletion(ctx context.Context, kc *api. if r.deleteK8ssandraConfigMaps(ctx, kc, dcTemplate, namespace, remoteClient, logger) { hasErrors = true } + + if r.deleteCronJobs(ctx, kc, dcTemplate, namespace, remoteClient, logger) { + hasErrors = true + } } if hasErrors { @@ -437,3 +442,33 @@ func (r *K8ssandraClusterReconciler) deleteDeployments( return } + +func (r *K8ssandraClusterReconciler) deleteCronJobs( + ctx context.Context, + kc *k8ssandraapi.K8ssandraCluster, + dcTemplate k8ssandraapi.CassandraDatacenterTemplate, + namespace string, + remoteClient client.Client, + kcLogger logr.Logger, +) (hasErrors bool) { + selector := k8ssandralabels.CleanedUpByLabels(client.ObjectKey{Namespace: kc.Namespace, Name: kc.SanitizedName()}) + options := client.ListOptions{ + Namespace: namespace, + LabelSelector: labels.SelectorFromSet(selector), + } + cronJobList := &batchv1.CronJobList{} + if err := remoteClient.List(ctx, cronJobList, &options); err != nil { + kcLogger.Error(err, "Failed to list Medusa CronJobs", "Context", dcTemplate.K8sContext) + return true + } + for _, item := range cronJobList.Items { + kcLogger.Info("Deleting CronJob", "CronJob", utils.GetKey(&item)) + if err := remoteClient.Delete(ctx, &item); err != nil { + key := client.ObjectKey{Namespace: namespace, Name: item.Name} + if !errors.IsNotFound(err) { + kcLogger.Error(err, "Failed to delete CronJob", "CronJob", key, "Context", dcTemplate.K8sContext) + } + } + } + return +} diff --git a/controllers/k8ssandra/k8ssandracluster_controller.go b/controllers/k8ssandra/k8ssandracluster_controller.go index 478a0fb49..e589f1c34 100644 --- a/controllers/k8ssandra/k8ssandracluster_controller.go +++ b/controllers/k8ssandra/k8ssandracluster_controller.go @@ -74,6 +74,7 @@ type K8ssandraClusterReconciler struct { // +kubebuilder:rbac:groups=monitoring.coreos.com,namespace="k8ssandra",resources=servicemonitors,verbs=get;list;watch;create;update;patch;delete;deletecollection // +kubebuilder:rbac:groups=core,namespace="k8ssandra",resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",namespace="k8ssandra",resources=events,verbs=create;patch +// +kubebuilder:rbac:groups=batch,namespace="k8ssandra",resources=cronjobs,verbs=get;list;watch;create;update;delete func (r *K8ssandraClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithValues("K8ssandraCluster", req.NamespacedName) diff --git a/controllers/k8ssandra/medusa_reconciler.go b/controllers/k8ssandra/medusa_reconciler.go index a4837c146..730d666a8 100644 --- a/controllers/k8ssandra/medusa_reconciler.go +++ b/controllers/k8ssandra/medusa_reconciler.go @@ -112,6 +112,21 @@ func (r *K8ssandraClusterReconciler) reconcileMedusa( logger.Info("Medusa standalone deployment is not ready yet") return result.RequeueSoon(r.DefaultDelay) } + // Create a cron job to purge Medusa backups + purgeCronJob, err := medusa.PurgeCronJob(dcConfig, kc.SanitizedName(), namespace, logger) + if err != nil { + logger.Info("Failed to create Medusa purge backups cronjob", "error", err) + return result.Error(err) + } + purgeCronJob.SetLabels(labels.CleanedUpByLabels(kcKey)) + recRes = reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *purgeCronJob) + switch { + case recRes.IsError(): + return recRes + case recRes.IsRequeue(): + return recRes + } + } else { logger.Info("Medusa is not enabled") } diff --git a/pkg/medusa/reconcile.go b/pkg/medusa/reconcile.go index c8be9b1d7..ab0ba12b2 100644 --- a/pkg/medusa/reconcile.go +++ b/pkg/medusa/reconcile.go @@ -7,10 +7,12 @@ import ( "text/template" "github.com/adutra/goalesce" + cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" k8ss "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" api "github.com/k8ssandra/k8ssandra-operator/apis/medusa/v1alpha1" "github.com/k8ssandra/k8ssandra-operator/pkg/images" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/utils/pointer" @@ -447,6 +449,10 @@ func MedusaStandaloneDeploymentName(clusterName string, dcName string) string { return fmt.Sprintf("%s-%s-medusa-standalone", clusterName, dcName) } +func MedusaPurgeCronJobName(clusterName string, dcName string) string { + return fmt.Sprintf("%s-%s-medusa-purge", clusterName, dcName) +} + func StandaloneMedusaDeployment(medusaContainer corev1.Container, clusterName, dcName, namespace string, logger logr.Logger) *appsv1.Deployment { // The standalone medusa pod won't be able to resolve its own IP address using DNS entries medusaContainer.Env = append(medusaContainer.Env, corev1.EnvVar{Name: "MEDUSA_RESOLVE_IP_ADDRESSES", Value: "False"}) @@ -518,6 +524,51 @@ func StandaloneMedusaService(dcConfig *cassandra.DatacenterConfig, medusaSpec *a return medusaService } +func PurgeCronJob(dcConfig *cassandra.DatacenterConfig, clusterName, namespace string, logger logr.Logger) (*batchv1.CronJob, error) { + cronJobName := MedusaPurgeCronJobName(cassdcapi.CleanupForKubernetes(clusterName), dcConfig.SanitizedName()) + logger.Info(fmt.Sprintf("Creating Medusa purge backups cronjob: %s", cronJobName)) + if len(cronJobName) > 253 { + return nil, fmt.Errorf("Medusa purge backups cronjob name too long (must be less than 253 characters). Length: %d, Job name: %s", len(cronJobName), cronJobName) + } + purgeCronJob := &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: cronJobName, + Namespace: namespace, + }, + Spec: batchv1.CronJobSpec{ + Schedule: "0 0 * * *", + Suspend: pointer.Bool(false), + SuccessfulJobsHistoryLimit: pointer.Int32(3), + FailedJobsHistoryLimit: pointer.Int32(1), + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + ServiceAccountName: "k8ssandra-operator", + Containers: []corev1.Container{ + { + Name: "k8ssandra-purge-backups", + Image: "bitnami/kubectl:1.17.3", + ImagePullPolicy: corev1.PullIfNotPresent, + TerminationMessagePath: "/dev/termination-log", + TerminationMessagePolicy: "File", + Command: []string{ + "/bin/bash", + "-c", + createPurgeTaskStr(dcConfig.SanitizedName(), namespace), + }, + }, + }, + }, + }, + }, + }, + }, + } + return purgeCronJob, nil +} + func generateMedusaProbe(configuredProbe *corev1.Probe) (*corev1.Probe, error) { // Goalesce the custom probe with the default probe, defaultProbe := defaultMedusaProbe() @@ -551,3 +602,18 @@ func defaultMedusaProbe() *corev1.Probe { return probe } + +func createPurgeTaskStr(dcName string, namespace string) string { + return fmt.Sprintf("printf \""+ + "apiVersion: medusa.k8ssandra.io/v1alpha1\\n"+ + "kind: MedusaTask\\n"+ + "metadata:\\n"+ + " name: purge-backups-timestamp\\n"+ + " namespace: %s\\n"+ + "spec:\\n"+ + " cassandraDatacenter: %s\\n"+ + " operation: purge"+ + "\" "+ + "| sed \"s/timestamp/$(date +%%Y%%m%%d%%H%%M%%S)/g\" "+ + "| kubectl apply -f -", namespace, dcName) +} diff --git a/pkg/medusa/reconcile_test.go b/pkg/medusa/reconcile_test.go index 0668e1d33..17b5c52af 100644 --- a/pkg/medusa/reconcile_test.go +++ b/pkg/medusa/reconcile_test.go @@ -1,6 +1,7 @@ package medusa import ( + "fmt" "testing" "github.com/go-logr/logr" @@ -516,3 +517,34 @@ func TestGenerateMedusaProbe(t *testing.T) { assert.Error(t, err) assert.Nil(t, probe) } + +func TestPurgeCronJob(t *testing.T) { + // Define your test inputs + dcConfig := &cassandra.DatacenterConfig{ + DatacenterName: "testDc", + } + clusterName := "testCluster" + namespace := "testNamespace" + logger := logr.New(logr.Discard().GetSink()) + + // Call the function with the test inputs + actualCronJob, err := PurgeCronJob(dcConfig, clusterName, namespace, logger) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("%s-%s-medusa-purge", "testcluster", "testdc"), actualCronJob.ObjectMeta.Name) + assert.Equal(t, 3, len(actualCronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command)) + assert.Contains(t, actualCronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command[2], "\\nspec:\\n cassandraDatacenter: testdc") +} + +func TestPurgeCronJobNameTooLong(t *testing.T) { + // Define your test inputs + dcConfig := &cassandra.DatacenterConfig{ + DatacenterName: "testDatacentercWithAReallyLongNameToTestThatTheCharacterCountOfTheNameGoesOverTwoHundredFiftyThreeCharactersTestTestTestTest", + } + clusterName := "testClusterNameBeingWayTooLongToTestThatTheCharacterCountOfTheNameGoesOverTwoHundredFiftyThreeCharactersTestTestTestTest" + namespace := "testNamespace" + logger := logr.New(logr.Discard().GetSink()) + + // Call the function with the test inputs + _, err := PurgeCronJob(dcConfig, clusterName, namespace, logger) + assert.NotNil(t, err) +} diff --git a/test/e2e/medusa_test.go b/test/e2e/medusa_test.go index f40919179..58210b350 100644 --- a/test/e2e/medusa_test.go +++ b/test/e2e/medusa_test.go @@ -14,6 +14,7 @@ import ( "github.com/k8ssandra/k8ssandra-operator/test/framework" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -37,6 +38,7 @@ func createSingleMedusaJob(t *testing.T, ctx context.Context, namespace string, checkDatacenterReady(t, ctx, dcKey, f) checkMedusaContainersExist(t, ctx, namespace, dcKey, f, kc) + checkPurgeCronJobExists(t, ctx, namespace, dcKey, f, kc) createBackupJob(t, ctx, namespace, f, dcKey) verifyBackupJobFinished(t, ctx, f, dcKey, backupKey) restoreBackupJob(t, ctx, namespace, f, dcKey) @@ -71,6 +73,7 @@ func createMultiMedusaJob(t *testing.T, ctx context.Context, namespace string, f for _, dcKey := range []framework.ClusterKey{dc1Key, dc2Key} { checkDatacenterReady(t, ctx, dcKey, f) checkMedusaContainersExist(t, ctx, namespace, dcKey, f, kc) + checkPurgeCronJobExists(t, ctx, namespace, dcKey, f, kc) checkMedusaStandaloneDeploymentExists(t, ctx, dcKey, f, kc) checkMedusaStandaloneServiceExists(t, ctx, dcKey, f, kc) } @@ -104,6 +107,7 @@ func createMultiDcSingleMedusaJob(t *testing.T, ctx context.Context, namespace s checkDatacenterReady(t, ctx, dcKey, f) checkMedusaContainersExist(t, ctx, namespace, dcKey, f, kc) + checkPurgeCronJobExists(t, ctx, namespace, dcKey, f, kc) createBackupJob(t, ctx, namespace, f, dcKey) verifyBackupJobFinished(t, ctx, f, dcKey, backupKey) } @@ -126,6 +130,39 @@ func checkMedusaContainersExist(t *testing.T, ctx context.Context, namespace str require.True(found, fmt.Sprintf("%s doesn't have medusa container", dc1.Name)) } +func checkPurgeCronJobExists(t *testing.T, ctx context.Context, namespace string, dcKey framework.ClusterKey, f *framework.E2eFramework, kc *api.K8ssandraCluster) { + require := require.New(t) + // Get the Cassandra pod + dc1 := &cassdcapi.CassandraDatacenter{} + err := f.Get(ctx, dcKey, dc1) + // check medusa containers exist + require.NoError(err, "Error getting the CassandraDatacenter") + t.Log("Checking that all the Medusa related objects have been created and are in the expected state") + // check that the cronjob exists + cronJob := &batchv1.CronJob{} + err = f.Get(ctx, framework.NewClusterKey(dcKey.K8sContext, namespace, medusapkg.MedusaPurgeCronJobName(kc.SanitizedName(), dc1.SanitizedName())), cronJob) + require.NoErrorf(err, "Error getting the Medusa purge CronJob. ClusterName: %s, DataceneterName: %s", kc.SanitizedName(), dc1.SanitizedName()) + // create a Job from the cronjob spec + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "test-purge-job", + }, + Spec: cronJob.Spec.JobTemplate.Spec, + } + err = f.Create(ctx, dcKey, job) + require.NoErrorf(err, "Error creating the Medusa purge Job. ClusterName: %s, DataceneterName: %s, Namespace: %s, JobName: test-purge-job", kc.SanitizedName(), dc1.SanitizedName(), namespace) + // ensure the job run was successful + require.Eventually(func() bool { + updated := &batchv1.Job{} + err := f.Get(ctx, framework.NewClusterKey(dcKey.K8sContext, namespace, "test-purge-job"), updated) + if err != nil { + return false + } + return updated.Status.Succeeded == 1 + }, polling.medusaBackupDone.timeout, polling.medusaBackupDone.interval, "Medusa purge Job didn't finish within timeout") +} + func createBackupJob(t *testing.T, ctx context.Context, namespace string, f *framework.E2eFramework, dcKey framework.ClusterKey) { require := require.New(t) t.Log("creating MedusaBackupJob")