diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 6cd46f745..ff4f4463c 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -1060,7 +1060,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return syncReason, err } - c.ConnectionPooler[role].Deployment = deployment } newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations @@ -1069,7 +1068,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return nil, err } - c.ConnectionPooler[role].Deployment = deployment } } @@ -1098,18 +1096,56 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return nil, fmt.Errorf("could not delete pooler pod: %v", err) } - } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { - patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) - if err != nil { - return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) + } else { + if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { + patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) + } + _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) + } } - _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) - if err != nil { - return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) + } + } + + if oldSpec != nil { + for anno := range oldSpec.Spec.PodAnnotations { + if _, ok := newSpec.Spec.PodAnnotations[anno]; !ok { + // template annotation was removed + for _, ignore := range c.OpConfig.IgnoredAnnotations { + if anno == ignore { + continue + } + } + annotationToRemove := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, anno)) + annotationToRemoveTemplate := []byte(fmt.Sprintf(`{"spec":{"template":{"metadata":{"annotations":{"%s":null}}}}}`, anno)) + if err != nil { + c.logger.Errorf("could not form removal patch for pod annotations: %v", err) + return nil, err + } + deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(), + deployment.Name, types.StrategicMergePatchType, annotationToRemoveTemplate, metav1.PatchOptions{}, "") + if err != nil { + c.logger.Errorf("failed to remove annotation %s from %s connection pooler's pod template: %v", + anno, role, err) + return nil, err + } + for _, pod := range pods { + _, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, + types.StrategicMergePatchType, annotationToRemove, metav1.PatchOptions{}) + if err != nil { + c.logger.Errorf("failed to remove annotation %s from pod %s: %v", anno, pod.Name, err) + return nil, err + } + } } } } + c.ConnectionPooler[role].Deployment = deployment + if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { c.ConnectionPooler[role].Service = service desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index d1a339001..df91ed79b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -573,6 +573,29 @@ func (c *Cluster) syncStatefulSet() error { } } } + for anno := range c.Statefulset.Spec.Template.Annotations { + if _, ok := desiredSts.Spec.Template.Annotations[anno]; !ok { + // template annotation was removed + for _, ignore := range c.OpConfig.IgnoredAnnotations { + if anno == ignore { + continue + } + } + annotationToRemove := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, anno)) + if err != nil { + c.logger.Errorf("could not form removal patch for pod annotations: %v", err) + return err + } + for _, pod := range pods { + _, err = c.KubeClient.Pods(c.Namespace).Patch(context.Background(), pod.Name, + types.StrategicMergePatchType, annotationToRemove, metav1.PatchOptions{}) + if err != nil { + c.logger.Errorf("failed to remove annotation %s from pod %s: %v", anno, pod.Name, err) + return err + } + } + } + } } if !cmp.match { if cmp.rollingUpdate { diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index d45a193cb..f052236fa 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -142,6 +142,160 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { } } +func TestPodAnnotationsSync(t *testing.T) { + // testName := "test pod annotations" + clusterName := "acid-test-cluster-2" + namespace := "default" + podAnnotation := "no-scale-down" + podAnnotations := map[string]string{"no-scale-down": "true"} + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := mocks.NewMockHTTPClient(ctrl) + client, _ := newFakeK8sAnnotationsClient() + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + Volume: acidv1.Volume{ + Size: "1Gi", + }, + EnableConnectionPooler: boolToPointer(true), + EnableLogicalBackup: true, + EnableReplicaConnectionPooler: boolToPointer(true), + PodAnnotations: podAnnotations, + NumberOfInstances: 2, + }, + } + + var cluster = New( + Config{ + OpConfig: config.Config{ + PatroniAPICheckInterval: time.Duration(1), + PatroniAPICheckTimeout: time.Duration(5), + PodManagementPolicy: "ordered_ready", + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: k8sutil.Int32ToPointer(1), + }, + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + ResourceCheckInterval: time.Duration(3), + ResourceCheckTimeout: time.Duration(10), + }, + }, + }, client, pg, logger, eventRecorder) + + configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}` + response := http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(configJson))), + } + + mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes() + cluster.patroni = patroni.New(patroniLogger, mockClient) + cluster.Name = clusterName + cluster.Namespace = namespace + clusterOptions := clusterLabelsOptions(cluster) + + // create a statefulset + _, err := cluster.createStatefulSet() + assert.NoError(t, err) + // create a pods + podsList := createPods(cluster) + for _, pod := range podsList { + _, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + // create connection pooler + _, err = cluster.createConnectionPooler(mockInstallLookupFunction) + assert.NoError(t, err) + + annotateResources(cluster) + err = cluster.Sync(&cluster.Postgresql) + assert.NoError(t, err) + + // 1. PodAnnotations set + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, sts := range stsList.Items { + assert.Contains(t, sts.Spec.Template.Annotations, podAnnotation) + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + assert.NoError(t, err) + assert.Contains(t, deploy.Spec.Template.Annotations, podAnnotation, + fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v", + deploy.Name, podAnnotation, deploy.Spec.Template.Annotations)) + assert.NoError(t, err) + } + + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, pod := range podList.Items { + assert.Contains(t, pod.Annotations, podAnnotation, + fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, podAnnotation, pod.Annotations)) + assert.NoError(t, err) + } + + cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, cronJob := range cronJobList.Items { + assert.Contains(t, cronJob.Annotations, podAnnotation, + fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v", + podAnnotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) + } + + // 2 PodAnnotations removed + newSpec := cluster.Postgresql.DeepCopy() + newSpec.Spec.PodAnnotations = nil + err = cluster.Sync(newSpec) + assert.NoError(t, err) + + stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, sts := range stsList.Items { + assert.NotContains(t, sts.Spec.Template.Annotations, "no-scale-down") + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotContains(t, deploy.Spec.Template.Annotations, podAnnotation, + fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v", + deploy.Name, podAnnotation, deploy.Spec.Template.Annotations)) + assert.NoError(t, err) + } + + podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, pod := range podList.Items { + assert.NotContains(t, pod.Annotations, "no-scale-down", + fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, podAnnotation, pod.Annotations)) + } + + cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + assert.NoError(t, err) + for _, cronJob := range cronJobList.Items { + assert.NotContains(t, cronJob.Annotations, podAnnotation, + fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v", + podAnnotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) + } +} + func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { testName := "test config comparison" client, _ := newFakeK8sSyncClient()