diff --git a/api/turing/cluster/controller.go b/api/turing/cluster/controller.go index 846338574..5573c821f 100644 --- a/api/turing/cluster/controller.go +++ b/api/turing/cluster/controller.go @@ -66,8 +66,8 @@ type Controller interface { ApplyIstioVirtualService(ctx context.Context, routerEndpoint *VirtualService) error DeleteIstioVirtualService(ctx context.Context, svcName string, namespace string) error - // Deployment - DeleteKubernetesDeployment(ctx context.Context, name string, namespace string, ignoreNotFound bool) error + // StatefulSet + DeleteKubernetesStatefulSet(ctx context.Context, name string, namespace string, ignoreNotFound bool) error // Service DeployKubernetesService(ctx context.Context, svc *KubernetesService) error @@ -82,8 +82,7 @@ type Controller interface { DeleteSecret(ctx context.Context, secretName string, namespace string, ignoreNotFound bool) error // PVC - ApplyPersistentVolumeClaim(ctx context.Context, namespace string, pvc *PersistentVolumeClaim) error - DeletePersistentVolumeClaim(ctx context.Context, pvcName string, namespace string, ignoreNotFound bool) error + DeletePVCs(ctx context.Context, listOptions metav1.ListOptions, namespace string, ignoreNotFound bool) error // Pod ListPods(ctx context.Context, namespace string, labelSelector string) (*apicorev1.PodList, error) @@ -347,34 +346,34 @@ func (c *controller) GetKnativeServiceDesiredReplicas( return int(*rev.Status.DesiredReplicas), nil } -// DeployKubernetesService deploys a kubernetes service and deployment +// DeployKubernetesService deploys a kubernetes service and stateful set func (c *controller) DeployKubernetesService( ctx context.Context, svcConf *KubernetesService, ) error { - desiredDeployment, desiredSvc := svcConf.BuildKubernetesServiceConfig() + desiredStatefulSet, desiredSvc := svcConf.BuildKubernetesServiceConfig() - // Deploy deployment - deployments := c.k8sAppsClient.Deployments(svcConf.Namespace) - // Check if deployment already exists. If exists, update it. If not, create. - var existingDeployment *apiappsv1.Deployment + // Deploy stateful set + statefulSets := c.k8sAppsClient.StatefulSets(svcConf.Namespace) + // Check if stateful set already exists. If exists, update it. If not, create. + var existingStatefulSet *apiappsv1.StatefulSet var err error - existingDeployment, err = deployments.Get(ctx, svcConf.Name, metav1.GetOptions{}) + existingStatefulSet, err = statefulSets.Get(ctx, svcConf.Name, metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { // Create new deployment - _, err = deployments.Create(ctx, desiredDeployment, metav1.CreateOptions{}) + _, err = statefulSets.Create(ctx, desiredStatefulSet, metav1.CreateOptions{}) } else { // Unexpected error, return it return err } } else { // Check for differences between current and new specs - if !k8sDeploymentSemanticEquals(desiredDeployment, existingDeployment) { + if !k8sStatefulSetSemanticEquals(existingStatefulSet, existingStatefulSet) { // Update the existing service with the new config - existingDeployment.Spec.Template = desiredDeployment.Spec.Template - existingDeployment.ObjectMeta.Labels = desiredDeployment.ObjectMeta.Labels - _, err = deployments.Update(ctx, existingDeployment, metav1.UpdateOptions{}) + existingStatefulSet.Spec.Template = desiredStatefulSet.Spec.Template + existingStatefulSet.ObjectMeta.Labels = desiredStatefulSet.ObjectMeta.Labels + _, err = statefulSets.Update(ctx, existingStatefulSet, metav1.UpdateOptions{}) } } if err != nil { @@ -405,25 +404,25 @@ func (c *controller) DeployKubernetesService( } // Wait until deployment ready and return any errors - return c.waitDeploymentReady(ctx, svcConf.Name, svcConf.Namespace) + return c.waitStatefulSetReady(ctx, svcConf.Name, svcConf.Namespace) } -// DeleteKubernetesDeployment deletes a kubernetes deployment -func (c *controller) DeleteKubernetesDeployment( +// DeleteKubernetesStatefulSet deletes a stateful set +func (c *controller) DeleteKubernetesStatefulSet( ctx context.Context, name string, namespace string, ignoreNotFound bool, ) error { - deployments := c.k8sAppsClient.Deployments(namespace) - _, err := deployments.Get(ctx, name, metav1.GetOptions{}) + statefulSets := c.k8sAppsClient.StatefulSets(namespace) + _, err := statefulSets.Get(ctx, name, metav1.GetOptions{}) if err != nil { if ignoreNotFound { return nil } return err } - return deployments.Delete(ctx, name, metav1.DeleteOptions{}) + return statefulSets.Delete(ctx, name, metav1.DeleteOptions{}) } // DeleteKubernetesService deletes a kubernetes service @@ -517,44 +516,21 @@ func (c *controller) GetKnativeServiceURL(ctx context.Context, svcName string, n return url } -// ApplyPersistentVolumeClaim creates a PVC in the given namespace. -// If the PVC already exists, it will update the existing PVC. -func (c *controller) ApplyPersistentVolumeClaim( +// DeletePVCs deletes all PVCs specified by the given list options in the given namespace. +func (c *controller) DeletePVCs( ctx context.Context, - namespace string, - pvcCfg *PersistentVolumeClaim, -) error { - pvcs := c.k8sCoreClient.PersistentVolumeClaims(namespace) - existingPVC, err := pvcs.Get(ctx, pvcCfg.Name, metav1.GetOptions{}) - pvc := pvcCfg.BuildPersistentVolumeClaim() - - // If not exists, create - if err != nil { - _, err := pvcs.Create(ctx, pvc, metav1.CreateOptions{}) - return err - } - // If exists, update - existingPVC.Spec.Resources = pvc.Spec.Resources - _, err = pvcs.Update(ctx, existingPVC, metav1.UpdateOptions{}) - return err -} - -// DeletePersistentVolumeClaim deletes the PVC in the given namespace. -func (c *controller) DeletePersistentVolumeClaim( - ctx context.Context, - pvcName string, + listOptions metav1.ListOptions, namespace string, ignoreNotFound bool, ) error { - pvcs := c.k8sCoreClient.PersistentVolumeClaims(namespace) - _, err := pvcs.Get(ctx, pvcName, metav1.GetOptions{}) + _, err := c.k8sCoreClient.PersistentVolumeClaims(namespace).List(ctx, listOptions) if err != nil { if ignoreNotFound { return nil } - return fmt.Errorf("unable to get pvc with name %s: %s", pvcName, err.Error()) + return err } - return pvcs.Delete(ctx, pvcName, metav1.DeleteOptions{}) + return c.k8sCoreClient.PersistentVolumeClaims(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions) } func (c *controller) ListPods(ctx context.Context, namespace string, labelSelector string) (*apicorev1.PodList, error) { @@ -813,31 +789,31 @@ func (c *controller) getKnativePodTerminationMessage(ctx context.Context, svcNam return terminationMessage } -// waitDeploymentReady waits for the given k8s deployment to become ready, until the +// waitStatefulSetReady waits for the given k8s stateful set to become ready, until the // default timeout -func (c *controller) waitDeploymentReady( +func (c *controller) waitStatefulSetReady( ctx context.Context, - deploymentName string, + statefulSetName string, namespace string, ) error { // Init ticker to check status every second ticker := time.NewTicker(time.Second) defer ticker.Stop() - // Init knative ServicesGetter - deployments := c.k8sAppsClient.Deployments(namespace) + // Init stateful set getter + statefulSets := c.k8sAppsClient.StatefulSets(namespace) for { select { case <-ctx.Done(): - return fmt.Errorf("timeout waiting for deployment %s to be ready", deploymentName) + return fmt.Errorf("timeout waiting for stateful set %s to be ready", statefulSetName) case <-ticker.C: - deployment, err := deployments.Get(ctx, deploymentName, metav1.GetOptions{}) + statefulSet, err := statefulSets.Get(ctx, statefulSetName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("unable to get deployment status for %s: %v", deploymentName, err) + return fmt.Errorf("unable to get stateful set status for %s: %v", statefulSetName, err) } - if deploymentReady(deployment) { + if statefulSetReady(statefulSet) { // Service is completely ready return nil } @@ -845,17 +821,13 @@ func (c *controller) waitDeploymentReady( } } -func deploymentReady(deployment *apiappsv1.Deployment) bool { - if deployment.Generation <= deployment.Status.ObservedGeneration { - cond := deployment.Status.Conditions[0] - ready := cond.Type == apiappsv1.DeploymentAvailable - if deployment.Spec.Replicas != nil { +func statefulSetReady(statefulSet *apiappsv1.StatefulSet) bool { + if statefulSet.Generation <= statefulSet.Status.ObservedGeneration { + if statefulSet.Spec.Replicas != nil { // Account for replica surge during updates - ready = ready && - deployment.Status.ReadyReplicas == *deployment.Spec.Replicas && - deployment.Status.Replicas == *deployment.Spec.Replicas + return statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas && + statefulSet.Status.Replicas == *statefulSet.Spec.Replicas } - return ready } return false } @@ -876,10 +848,10 @@ func knServiceSemanticEquals(desiredService, service *knservingv1.Service) bool equality.Semantic.DeepEqual(desiredService.ObjectMeta.Labels, service.ObjectMeta.Labels) } -func k8sDeploymentSemanticEquals(desiredDeployment, deployment *apiappsv1.Deployment) bool { - return equality.Semantic.DeepEqual(desiredDeployment.Spec.Template, deployment.Spec.Template) && - equality.Semantic.DeepEqual(desiredDeployment.ObjectMeta.Labels, deployment.ObjectMeta.Labels) && - desiredDeployment.Spec.Replicas == deployment.Spec.Replicas +func k8sStatefulSetSemanticEquals(desiredStatefulSet, statefulSet *apiappsv1.StatefulSet) bool { + return equality.Semantic.DeepEqual(desiredStatefulSet.Spec.Template, statefulSet.Spec.Template) && + equality.Semantic.DeepEqual(desiredStatefulSet.ObjectMeta.Labels, statefulSet.ObjectMeta.Labels) && + desiredStatefulSet.Spec.Replicas == statefulSet.Spec.Replicas } func k8sServiceSemanticEquals(desiredService, service *apicorev1.Service) bool { diff --git a/api/turing/cluster/controller_test.go b/api/turing/cluster/controller_test.go index 19dd70ef7..976bef33d 100644 --- a/api/turing/cluster/controller_test.go +++ b/api/turing/cluster/controller_test.go @@ -46,11 +46,13 @@ var reactorVerbs = struct { Create string Update string Delete string + List string }{ Get: "get", Create: "create", Update: "update", Delete: "delete", + List: "list", } const ( @@ -81,7 +83,7 @@ func TestDeployKnativeService(t *testing.T) { } // Define reactor for a successful get - getSuccess := func(action k8stesting.Action) (bool, runtime.Object, error) { + getSuccess := func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &knservingv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: testName, @@ -189,10 +191,10 @@ func TestDeployKnativeService(t *testing.T) { func TestDeployKubernetesService(t *testing.T) { testName, testNamespace := "test-name", "test-namespace" - deploymentResourceItem := schema.GroupVersionResource{ + statefulSetResourceItem := schema.GroupVersionResource{ Group: "apps", Version: "v1", - Resource: "deployments", + Resource: "statefulsets", } svcResourceItem := schema.GroupVersionResource{ Version: "v1", @@ -204,7 +206,7 @@ func TestDeployKubernetesService(t *testing.T) { Namespace: testNamespace, }, } - testK8sDeployment := &appsv1.Deployment{ + testK8sStatefulSet := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: testName, Namespace: testNamespace, @@ -219,29 +221,29 @@ func TestDeployKubernetesService(t *testing.T) { replicas := int32(1) // Define reactor for a successful get - getDeploymentSuccess := func(action k8stesting.Action) (bool, runtime.Object, error) { - return true, &appsv1.Deployment{ + getStatefulSetSuccess := func(_ k8stesting.Action) (bool, runtime.Object, error) { + return true, &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: testName, Namespace: testNamespace, Generation: 1, }, - Spec: appsv1.DeploymentSpec{ + Spec: appsv1.StatefulSetSpec{ Replicas: &replicas, }, - Status: appsv1.DeploymentStatus{ + Status: appsv1.StatefulSetStatus{ ObservedGeneration: 1, Replicas: 1, ReadyReplicas: 1, - Conditions: []appsv1.DeploymentCondition{ + Conditions: []appsv1.StatefulSetCondition{ { - Type: appsv1.DeploymentAvailable, + Type: "Available", }, }, }, }, nil } - getSvcSuccess := func(action k8stesting.Action) (bool, runtime.Object, error) { + getSvcSuccess := func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, testK8sSvc, nil } @@ -255,9 +257,9 @@ func TestDeployKubernetesService(t *testing.T) { []reactor{ { verb: reactorVerbs.Get, - resource: deploymentResourceItem.String(), + resource: statefulSetResourceItem.String(), rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewGetAction(deploymentResourceItem, testNamespace, testName) + expAction := k8stesting.NewGetAction(statefulSetResourceItem, testNamespace, testName) // Check that the method is called with the expected action assert.Equal(t, expAction, action) // Return nil object and error to indicate non existent object @@ -266,15 +268,15 @@ func TestDeployKubernetesService(t *testing.T) { }, { verb: reactorVerbs.Create, - resource: "deployments", + resource: "statefulsets", rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewCreateAction(deploymentResourceItem, testNamespace, testK8sDeployment) + expAction := k8stesting.NewCreateAction(statefulSetResourceItem, testNamespace, testK8sStatefulSet) // Check that the method is called with the expected action assert.Equal(t, expAction, action) // Prepend a new get reactor for waitK8sServiceReady to use - cs.PrependReactor(reactorVerbs.Get, "deployments", getDeploymentSuccess) + cs.PrependReactor(reactorVerbs.Get, "statefulsets", getStatefulSetSuccess) // Nil error indicates Create success - return true, testK8sDeployment, nil + return true, testK8sStatefulSet, nil }, }, { @@ -308,26 +310,26 @@ func TestDeployKubernetesService(t *testing.T) { []reactor{ { verb: reactorVerbs.Get, - resource: "deployments", + resource: "statefulsets", rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewGetAction(deploymentResourceItem, testNamespace, testName) + expAction := k8stesting.NewGetAction(statefulSetResourceItem, testNamespace, testName) // Check that the method is called with the expected action assert.Equal(t, expAction, action) - return true, testK8sDeployment, nil + return true, testK8sStatefulSet, nil }, }, { verb: reactorVerbs.Update, - resource: "deployments", + resource: "statefulsets", rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewUpdateAction(deploymentResourceItem, - testNamespace, testK8sDeployment) + expAction := k8stesting.NewUpdateAction(statefulSetResourceItem, + testNamespace, testK8sStatefulSet) // Check that the method is called with the expected action assert.Equal(t, expAction, action) // Prepend a new get reactor for waitK8sServiceReady to use - cs.PrependReactor(reactorVerbs.Get, "deployments", getDeploymentSuccess) + cs.PrependReactor(reactorVerbs.Get, "statefulsets", getStatefulSetSuccess) // Nil error indicates Update success - return true, testK8sDeployment, nil + return true, testK8sStatefulSet, nil }, }, { @@ -362,8 +364,8 @@ func TestDeployKubernetesService(t *testing.T) { monkey.PatchInstanceMethod( reflect.TypeOf(svcConf), "BuildKubernetesServiceConfig", - func(service *KubernetesService) (*appsv1.Deployment, *corev1.Service) { - return testK8sDeployment, testK8sSvc + func(_ *KubernetesService) (*appsv1.StatefulSet, *corev1.Service) { + return testK8sStatefulSet, testK8sSvc }, ) monkey.Patch(k8sServiceSemanticEquals, @@ -371,8 +373,8 @@ func TestDeployKubernetesService(t *testing.T) { // Make method return false always, so that an update will be triggered return false }) - monkey.Patch(k8sDeploymentSemanticEquals, - func(*appsv1.Deployment, *appsv1.Deployment) bool { + monkey.Patch(k8sStatefulSetSemanticEquals, + func(*appsv1.StatefulSet, *appsv1.StatefulSet) bool { // Make method return false always, so that an update will be triggered return false }) @@ -485,7 +487,7 @@ func TestDeleteKnativeService(t *testing.T) { } } -func TestDeleteKubernetesDeployment(t *testing.T) { +func TestDeleteKubernetesStatefulSet(t *testing.T) { testName, testNamespace := "test-name", "test-namespace" deploymentResourceItem := schema.GroupVersionResource{ Group: "apps", @@ -571,7 +573,7 @@ func TestDeleteKubernetesDeployment(t *testing.T) { // Create test controller c := createTestK8sController(cs, tc.reactors) // Run test - err := c.DeleteKubernetesDeployment(ctx, testName, testNamespace, tc.ignoreNotFound) + err := c.DeleteKubernetesStatefulSet(ctx, testName, testNamespace, tc.ignoreNotFound) // Validate no error assert.Equal(t, err != nil, tc.hasErr) }) @@ -803,7 +805,7 @@ func TestGetJob(t *testing.T) { { verb: reactorVerbs.Get, resource: "job", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, nil, k8serrors.NewNotFound(schema.GroupResource{}, jobName) }, }, @@ -816,7 +818,7 @@ func TestGetJob(t *testing.T) { { verb: reactorVerbs.Get, resource: "jobs", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: jobName, @@ -862,7 +864,7 @@ func TestDeleteJob(t *testing.T) { { verb: reactorVerbs.Delete, resource: "job", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, nil, k8serrors.NewNotFound(schema.GroupResource{}, jobName) }, }, @@ -874,7 +876,7 @@ func TestDeleteJob(t *testing.T) { { verb: reactorVerbs.Delete, resource: "jobs", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, nil, nil }, }, @@ -915,7 +917,7 @@ func TestCreateServiceAccount(t *testing.T) { { verb: reactorVerbs.Get, resource: "serviceaccount", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &corev1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: serviceAccountName, @@ -932,7 +934,7 @@ func TestCreateServiceAccount(t *testing.T) { { verb: reactorVerbs.Create, resource: "serviceaccount", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &corev1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: serviceAccountName, @@ -986,7 +988,7 @@ func TestCreateRole(t *testing.T) { { verb: reactorVerbs.Get, resource: "role", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &rbacv1.Role{ ObjectMeta: metav1.ObjectMeta{ Name: roleName, @@ -1003,7 +1005,7 @@ func TestCreateRole(t *testing.T) { { verb: reactorVerbs.Create, resource: "role", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &rbacv1.Role{ ObjectMeta: metav1.ObjectMeta{ Name: roleName, @@ -1059,7 +1061,7 @@ func TestCreateRoleBinding(t *testing.T) { { verb: reactorVerbs.Get, resource: "rolebinding", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &rbacv1.RoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: roleBindingName, @@ -1076,7 +1078,7 @@ func TestCreateRoleBinding(t *testing.T) { { verb: reactorVerbs.Create, resource: "rolebinding", - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { + rFunc: func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &rbacv1.RoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: roleBindingName, @@ -1125,7 +1127,7 @@ func TestCreateSparkApplication(t *testing.T) { cs.PrependReactor( reactorVerbs.Create, "sparkapplication", - func(action k8stesting.Action) (bool, runtime.Object, error) { + func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &sparkv1beta2.SparkApplication{ ObjectMeta: metav1.ObjectMeta{ Name: "spark", @@ -1173,7 +1175,7 @@ func TestGetSparkApplication(t *testing.T) { cs.PrependReactor( reactorVerbs.Get, "sparkapplication", - func(action k8stesting.Action) (bool, runtime.Object, error) { + func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, &sparkv1beta2.SparkApplication{ ObjectMeta: metav1.ObjectMeta{ Name: "spark", @@ -1203,7 +1205,7 @@ func TestDeleteSparkApplication(t *testing.T) { cs.PrependReactor( reactorVerbs.Get, "sparkapplication", - func(action k8stesting.Action) (bool, runtime.Object, error) { + func(_ k8stesting.Action) (bool, runtime.Object, error) { return true, nil, nil }, ) @@ -1583,110 +1585,24 @@ func TestDeleteSecret(t *testing.T) { } } -func TestCreatePVC(t *testing.T) { +func TestDeletePVCs(t *testing.T) { pvcResource := schema.GroupVersionResource{ Group: "", Version: "v1", Resource: "persistentvolumeclaims", } - testNamespace := "namespace" - cacheVolumeSize := "2Gi" - volSize, _ := resource.ParseQuantity(cacheVolumeSize) // drop error since this volume size is a constant - - pvcConf := PersistentVolumeClaim{ - Name: "test-svc-turing-pvc", - AccessModes: []string{"ReadWriteOnce"}, - Size: volSize, - } - testPvc := pvcConf.BuildPersistentVolumeClaim() - cs := fake.NewSimpleClientset() - tests := []struct { - name string - reactors []reactor - hasErr bool - }{ - {"new_pvc", - []reactor{ - { - verb: reactorVerbs.Get, - resource: pvcResource.Resource, - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewGetAction(pvcResource, testNamespace, testPvc.Name) - // Check that the method is called with the expected action - assert.Equal(t, expAction, action) - // Return nil object and error to indicate non existent object - return true, nil, k8serrors.NewNotFound(schema.GroupResource{}, testPvc.Name) - }, - }, - { - verb: reactorVerbs.Create, - resource: pvcResource.Resource, - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewCreateAction(pvcResource, testNamespace, testPvc) - // Check that the method is called with the expected action - assert.Equal(t, expAction, action) - // Nil error indicates Create success - return true, testPvc, nil - }, - }, - }, - false, - }, - { - "pvc_exists", - []reactor{ - { - verb: reactorVerbs.Get, - resource: pvcResource.Resource, - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewGetAction(pvcResource, testNamespace, testPvc.Name) - // Check that the method is called with the expected action - assert.Equal(t, expAction, action) - // Return nil object and error to indicate non existent object - return true, testPvc, nil - }, - }, - { - verb: reactorVerbs.Update, - resource: pvcResource.Resource, - rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewUpdateAction(pvcResource, testNamespace, testPvc) - // Check that the method is called with the expected action - assert.Equal(t, expAction, action) - // Nil error indicates Create success - return true, testPvc, nil - }, - }, - }, - false, - }, + groupVersionKind := schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PersistentVolumeClaim", } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - // Create test controller - c := createTestK8sController(cs, tc.reactors) - - ctx, cancel := context.WithTimeout(context.Background(), contextTimeoutDuration) - defer cancel() - - // Run test - err := c.ApplyPersistentVolumeClaim(ctx, testNamespace, &pvcConf) - // Validate no error - assert.Equal(t, tc.hasErr, err != nil) - }) - } -} - -func TestDeletePVC(t *testing.T) { - pvcResource := schema.GroupVersionResource{ - Group: "", - Version: "v1", - Resource: "persistentvolumeclaims", - } testNamespace := "namespace" + statefulSetName := "test-svc-turing-fluentd-logger-0" + listOptions := metav1.ListOptions{LabelSelector: "app=" + statefulSetName} + cacheVolumeSize := "2Gi" volSize, _ := resource.ParseQuantity(cacheVolumeSize) // drop error since this volume size is a constant @@ -1707,10 +1623,10 @@ func TestDeletePVC(t *testing.T) { "not_exists; ignore pvc not found", []reactor{ { - verb: reactorVerbs.Get, + verb: reactorVerbs.List, resource: pvcResource.Resource, rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewGetAction(pvcResource, testNamespace, pvcConf.Name) + expAction := k8stesting.NewListAction(pvcResource, groupVersionKind, testNamespace, listOptions) // Check that the method is called with the expected action assert.Equal(t, expAction, action) // Return nil object and error to indicate non existent object @@ -1725,10 +1641,10 @@ func TestDeletePVC(t *testing.T) { "exists; ignore pvc not found", []reactor{ { - verb: reactorVerbs.Get, + verb: reactorVerbs.List, resource: pvcResource.Resource, rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewGetAction(pvcResource, testNamespace, pvcConf.Name) + expAction := k8stesting.NewListAction(pvcResource, groupVersionKind, testNamespace, listOptions) // Check that the method is called with the expected action assert.Equal(t, expAction, action) return true, nil, nil @@ -1751,10 +1667,10 @@ func TestDeletePVC(t *testing.T) { "not_exists; do not ignore pvc not found", []reactor{ { - verb: reactorVerbs.Get, + verb: reactorVerbs.List, resource: pvcResource.Resource, rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) { - expAction := k8stesting.NewGetAction(pvcResource, testNamespace, pvcConf.Name) + expAction := k8stesting.NewListAction(pvcResource, groupVersionKind, testNamespace, listOptions) // Check that the method is called with the expected action assert.Equal(t, expAction, action) // Return nil object and error to indicate non existent object @@ -1776,7 +1692,7 @@ func TestDeletePVC(t *testing.T) { defer cancel() // Run test - err := c.DeletePersistentVolumeClaim(ctx, pvcConf.Name, testNamespace, tc.ignoreNotFound) + err := c.DeletePVCs(ctx, listOptions, testNamespace, tc.ignoreNotFound) // Validate no error assert.Equal(t, tc.hasErr, err != nil) }) diff --git a/api/turing/cluster/kubernetes_service.go b/api/turing/cluster/kubernetes_service.go index 3192a244e..91c281231 100644 --- a/api/turing/cluster/kubernetes_service.go +++ b/api/turing/cluster/kubernetes_service.go @@ -28,13 +28,13 @@ type KubernetesService struct { SecurityContext *corev1.PodSecurityContext `json:"security_context"` } -func (cfg *KubernetesService) BuildKubernetesServiceConfig() (*appsv1.Deployment, *corev1.Service) { - deployment := cfg.buildDeployment(cfg.Labels) +func (cfg *KubernetesService) BuildKubernetesServiceConfig() (*appsv1.StatefulSet, *corev1.Service) { + statefulSet := cfg.buildStatefulSet(cfg.Labels) service := cfg.buildService(cfg.Labels) - return deployment, service + return statefulSet, service } -func (cfg *KubernetesService) buildDeployment(labels map[string]string) *appsv1.Deployment { +func (cfg *KubernetesService) buildStatefulSet(labels map[string]string) *appsv1.StatefulSet { replicas := int32(cfg.Replicas) labels["app"] = cfg.Name @@ -44,13 +44,13 @@ func (cfg *KubernetesService) buildDeployment(labels map[string]string) *appsv1. initContainers[idx] = containerCfg.Build() } - return &appsv1.Deployment{ + return &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: cfg.Name, Namespace: cfg.Namespace, Labels: labels, }, - Spec: appsv1.DeploymentSpec{ + Spec: appsv1.StatefulSetSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ @@ -84,9 +84,10 @@ func (cfg *KubernetesService) buildDeployment(labels map[string]string) *appsv1. SecurityContext: cfg.SecurityContext, }, }, - Strategy: appsv1.DeploymentStrategy{ - Type: appsv1.RollingUpdateDeploymentStrategyType, + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, }, + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{*cfg.PersistentVolumeClaim.BuildPersistentVolumeClaim()}, }, } diff --git a/api/turing/cluster/kubernetes_service_test.go b/api/turing/cluster/kubernetes_service_test.go index d2f800d30..a13083ece 100644 --- a/api/turing/cluster/kubernetes_service_test.go +++ b/api/turing/cluster/kubernetes_service_test.go @@ -68,13 +68,13 @@ func TestBuildKubernetesServiceConfig(t *testing.T) { replicas := int32(1) labels["app"] = "test-svc-fluentd-logger" - expectedDeployment := appsv1.Deployment{ + expectedStatefulSet := appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-svc-fluentd-logger", Namespace: "namespace", Labels: labels, }, - Spec: appsv1.DeploymentSpec{ + Spec: appsv1.StatefulSetSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ @@ -149,9 +149,10 @@ func TestBuildKubernetesServiceConfig(t *testing.T) { }, }, }, - Strategy: appsv1.DeploymentStrategy{ - Type: appsv1.RollingUpdateDeploymentStrategyType, + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, }, + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{*svcConf.PersistentVolumeClaim.BuildPersistentVolumeClaim()}, }, } @@ -176,8 +177,8 @@ func TestBuildKubernetesServiceConfig(t *testing.T) { Type: corev1.ServiceTypeClusterIP, }, } - gotDeployment, gotService := svcConf.BuildKubernetesServiceConfig() - err := tu.CompareObjects(*gotDeployment, expectedDeployment) + gotStatefulSet, gotService := svcConf.BuildKubernetesServiceConfig() + err := tu.CompareObjects(*gotStatefulSet, expectedStatefulSet) assert.NoError(t, err) err = tu.CompareObjects(*gotService, expectedService) assert.NoError(t, err) diff --git a/api/turing/cluster/mocks/controller.go b/api/turing/cluster/mocks/controller.go index 6e5176ba1..01334d818 100644 --- a/api/turing/cluster/mocks/controller.go +++ b/api/turing/cluster/mocks/controller.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.28.2. DO NOT EDIT. +// Code generated by mockery v2.42.0. DO NOT EDIT. package mocks @@ -12,6 +12,8 @@ import ( io "io" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + mock "github.com/stretchr/testify/mock" rbacv1 "k8s.io/api/rbac/v1" @@ -30,6 +32,10 @@ type Controller struct { func (_m *Controller) ApplyConfigMap(ctx context.Context, namespace string, configMap *cluster.ConfigMap) error { ret := _m.Called(ctx, namespace, configMap) + if len(ret) == 0 { + panic("no return value specified for ApplyConfigMap") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.ConfigMap) error); ok { r0 = rf(ctx, namespace, configMap) @@ -44,23 +50,13 @@ func (_m *Controller) ApplyConfigMap(ctx context.Context, namespace string, conf func (_m *Controller) ApplyIstioVirtualService(ctx context.Context, routerEndpoint *cluster.VirtualService) error { ret := _m.Called(ctx, routerEndpoint) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *cluster.VirtualService) error); ok { - r0 = rf(ctx, routerEndpoint) - } else { - r0 = ret.Error(0) + if len(ret) == 0 { + panic("no return value specified for ApplyIstioVirtualService") } - return r0 -} - -// ApplyPersistentVolumeClaim provides a mock function with given fields: ctx, namespace, pvc -func (_m *Controller) ApplyPersistentVolumeClaim(ctx context.Context, namespace string, pvc *cluster.PersistentVolumeClaim) error { - ret := _m.Called(ctx, namespace, pvc) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.PersistentVolumeClaim) error); ok { - r0 = rf(ctx, namespace, pvc) + if rf, ok := ret.Get(0).(func(context.Context, *cluster.VirtualService) error); ok { + r0 = rf(ctx, routerEndpoint) } else { r0 = ret.Error(0) } @@ -72,6 +68,10 @@ func (_m *Controller) ApplyPersistentVolumeClaim(ctx context.Context, namespace func (_m *Controller) ApplyPodDisruptionBudget(ctx context.Context, namespace string, pdb cluster.PodDisruptionBudget) (*v1.PodDisruptionBudget, error) { ret := _m.Called(ctx, namespace, pdb) + if len(ret) == 0 { + panic("no return value specified for ApplyPodDisruptionBudget") + } + var r0 *v1.PodDisruptionBudget var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, cluster.PodDisruptionBudget) (*v1.PodDisruptionBudget, error)); ok { @@ -98,6 +98,10 @@ func (_m *Controller) ApplyPodDisruptionBudget(ctx context.Context, namespace st func (_m *Controller) CreateJob(ctx context.Context, namespace string, job cluster.Job) (*batchv1.Job, error) { ret := _m.Called(ctx, namespace, job) + if len(ret) == 0 { + panic("no return value specified for CreateJob") + } + var r0 *batchv1.Job var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, cluster.Job) (*batchv1.Job, error)); ok { @@ -124,6 +128,10 @@ func (_m *Controller) CreateJob(ctx context.Context, namespace string, job clust func (_m *Controller) CreateNamespace(ctx context.Context, name string) error { ret := _m.Called(ctx, name) + if len(ret) == 0 { + panic("no return value specified for CreateNamespace") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { r0 = rf(ctx, name) @@ -138,6 +146,10 @@ func (_m *Controller) CreateNamespace(ctx context.Context, name string) error { func (_m *Controller) CreateRole(ctx context.Context, namespace string, role *cluster.Role) (*rbacv1.Role, error) { ret := _m.Called(ctx, namespace, role) + if len(ret) == 0 { + panic("no return value specified for CreateRole") + } + var r0 *rbacv1.Role var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.Role) (*rbacv1.Role, error)); ok { @@ -164,6 +176,10 @@ func (_m *Controller) CreateRole(ctx context.Context, namespace string, role *cl func (_m *Controller) CreateRoleBinding(ctx context.Context, namespace string, roleBinding *cluster.RoleBinding) (*rbacv1.RoleBinding, error) { ret := _m.Called(ctx, namespace, roleBinding) + if len(ret) == 0 { + panic("no return value specified for CreateRoleBinding") + } + var r0 *rbacv1.RoleBinding var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.RoleBinding) (*rbacv1.RoleBinding, error)); ok { @@ -190,6 +206,10 @@ func (_m *Controller) CreateRoleBinding(ctx context.Context, namespace string, r func (_m *Controller) CreateSecret(ctx context.Context, secret *cluster.Secret) error { ret := _m.Called(ctx, secret) + if len(ret) == 0 { + panic("no return value specified for CreateSecret") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *cluster.Secret) error); ok { r0 = rf(ctx, secret) @@ -204,6 +224,10 @@ func (_m *Controller) CreateSecret(ctx context.Context, secret *cluster.Secret) func (_m *Controller) CreateServiceAccount(ctx context.Context, namespace string, serviceAccount *cluster.ServiceAccount) (*corev1.ServiceAccount, error) { ret := _m.Called(ctx, namespace, serviceAccount) + if len(ret) == 0 { + panic("no return value specified for CreateServiceAccount") + } + var r0 *corev1.ServiceAccount var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.ServiceAccount) (*corev1.ServiceAccount, error)); ok { @@ -230,6 +254,10 @@ func (_m *Controller) CreateServiceAccount(ctx context.Context, namespace string func (_m *Controller) CreateSparkApplication(ctx context.Context, namespace string, request *cluster.CreateSparkRequest) (*v1beta2.SparkApplication, error) { ret := _m.Called(ctx, namespace, request) + if len(ret) == 0 { + panic("no return value specified for CreateSparkApplication") + } + var r0 *v1beta2.SparkApplication var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.CreateSparkRequest) (*v1beta2.SparkApplication, error)); ok { @@ -256,6 +284,10 @@ func (_m *Controller) CreateSparkApplication(ctx context.Context, namespace stri func (_m *Controller) DeleteConfigMap(ctx context.Context, name string, namespace string, ignoreNotFound bool) error { ret := _m.Called(ctx, name, namespace, ignoreNotFound) + if len(ret) == 0 { + panic("no return value specified for DeleteConfigMap") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, bool) error); ok { r0 = rf(ctx, name, namespace, ignoreNotFound) @@ -270,6 +302,10 @@ func (_m *Controller) DeleteConfigMap(ctx context.Context, name string, namespac func (_m *Controller) DeleteIstioVirtualService(ctx context.Context, svcName string, namespace string) error { ret := _m.Called(ctx, svcName, namespace) + if len(ret) == 0 { + panic("no return value specified for DeleteIstioVirtualService") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { r0 = rf(ctx, svcName, namespace) @@ -284,6 +320,10 @@ func (_m *Controller) DeleteIstioVirtualService(ctx context.Context, svcName str func (_m *Controller) DeleteJob(ctx context.Context, namespace string, jobName string) error { ret := _m.Called(ctx, namespace, jobName) + if len(ret) == 0 { + panic("no return value specified for DeleteJob") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { r0 = rf(ctx, namespace, jobName) @@ -298,6 +338,10 @@ func (_m *Controller) DeleteJob(ctx context.Context, namespace string, jobName s func (_m *Controller) DeleteKnativeService(ctx context.Context, svcName string, namespace string, ignoreNotFound bool) error { ret := _m.Called(ctx, svcName, namespace, ignoreNotFound) + if len(ret) == 0 { + panic("no return value specified for DeleteKnativeService") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, bool) error); ok { r0 = rf(ctx, svcName, namespace, ignoreNotFound) @@ -308,13 +352,17 @@ func (_m *Controller) DeleteKnativeService(ctx context.Context, svcName string, return r0 } -// DeleteKubernetesDeployment provides a mock function with given fields: ctx, name, namespace, ignoreNotFound -func (_m *Controller) DeleteKubernetesDeployment(ctx context.Context, name string, namespace string, ignoreNotFound bool) error { - ret := _m.Called(ctx, name, namespace, ignoreNotFound) +// DeleteKubernetesService provides a mock function with given fields: ctx, svcName, namespace, ignoreNotFound +func (_m *Controller) DeleteKubernetesService(ctx context.Context, svcName string, namespace string, ignoreNotFound bool) error { + ret := _m.Called(ctx, svcName, namespace, ignoreNotFound) + + if len(ret) == 0 { + panic("no return value specified for DeleteKubernetesService") + } var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, bool) error); ok { - r0 = rf(ctx, name, namespace, ignoreNotFound) + r0 = rf(ctx, svcName, namespace, ignoreNotFound) } else { r0 = ret.Error(0) } @@ -322,13 +370,17 @@ func (_m *Controller) DeleteKubernetesDeployment(ctx context.Context, name strin return r0 } -// DeleteKubernetesService provides a mock function with given fields: ctx, svcName, namespace, ignoreNotFound -func (_m *Controller) DeleteKubernetesService(ctx context.Context, svcName string, namespace string, ignoreNotFound bool) error { - ret := _m.Called(ctx, svcName, namespace, ignoreNotFound) +// DeleteKubernetesStatefulSet provides a mock function with given fields: ctx, name, namespace, ignoreNotFound +func (_m *Controller) DeleteKubernetesStatefulSet(ctx context.Context, name string, namespace string, ignoreNotFound bool) error { + ret := _m.Called(ctx, name, namespace, ignoreNotFound) + + if len(ret) == 0 { + panic("no return value specified for DeleteKubernetesStatefulSet") + } var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, bool) error); ok { - r0 = rf(ctx, svcName, namespace, ignoreNotFound) + r0 = rf(ctx, name, namespace, ignoreNotFound) } else { r0 = ret.Error(0) } @@ -336,13 +388,17 @@ func (_m *Controller) DeleteKubernetesService(ctx context.Context, svcName strin return r0 } -// DeletePersistentVolumeClaim provides a mock function with given fields: ctx, pvcName, namespace, ignoreNotFound -func (_m *Controller) DeletePersistentVolumeClaim(ctx context.Context, pvcName string, namespace string, ignoreNotFound bool) error { - ret := _m.Called(ctx, pvcName, namespace, ignoreNotFound) +// DeletePVCs provides a mock function with given fields: ctx, listOptions, namespace, ignoreNotFound +func (_m *Controller) DeletePVCs(ctx context.Context, listOptions metav1.ListOptions, namespace string, ignoreNotFound bool) error { + ret := _m.Called(ctx, listOptions, namespace, ignoreNotFound) + + if len(ret) == 0 { + panic("no return value specified for DeletePVCs") + } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, bool) error); ok { - r0 = rf(ctx, pvcName, namespace, ignoreNotFound) + if rf, ok := ret.Get(0).(func(context.Context, metav1.ListOptions, string, bool) error); ok { + r0 = rf(ctx, listOptions, namespace, ignoreNotFound) } else { r0 = ret.Error(0) } @@ -354,6 +410,10 @@ func (_m *Controller) DeletePersistentVolumeClaim(ctx context.Context, pvcName s func (_m *Controller) DeletePodDisruptionBudget(ctx context.Context, namespace string, pdbName string) error { ret := _m.Called(ctx, namespace, pdbName) + if len(ret) == 0 { + panic("no return value specified for DeletePodDisruptionBudget") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { r0 = rf(ctx, namespace, pdbName) @@ -368,6 +428,10 @@ func (_m *Controller) DeletePodDisruptionBudget(ctx context.Context, namespace s func (_m *Controller) DeleteSecret(ctx context.Context, secretName string, namespace string, ignoreNotFound bool) error { ret := _m.Called(ctx, secretName, namespace, ignoreNotFound) + if len(ret) == 0 { + panic("no return value specified for DeleteSecret") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, bool) error); ok { r0 = rf(ctx, secretName, namespace, ignoreNotFound) @@ -382,6 +446,10 @@ func (_m *Controller) DeleteSecret(ctx context.Context, secretName string, names func (_m *Controller) DeleteSparkApplication(ctx context.Context, namespace string, appName string) error { ret := _m.Called(ctx, namespace, appName) + if len(ret) == 0 { + panic("no return value specified for DeleteSparkApplication") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { r0 = rf(ctx, namespace, appName) @@ -396,6 +464,10 @@ func (_m *Controller) DeleteSparkApplication(ctx context.Context, namespace stri func (_m *Controller) DeployKnativeService(ctx context.Context, svc *cluster.KnativeService) error { ret := _m.Called(ctx, svc) + if len(ret) == 0 { + panic("no return value specified for DeployKnativeService") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *cluster.KnativeService) error); ok { r0 = rf(ctx, svc) @@ -410,6 +482,10 @@ func (_m *Controller) DeployKnativeService(ctx context.Context, svc *cluster.Kna func (_m *Controller) DeployKubernetesService(ctx context.Context, svc *cluster.KubernetesService) error { ret := _m.Called(ctx, svc) + if len(ret) == 0 { + panic("no return value specified for DeployKubernetesService") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *cluster.KubernetesService) error); ok { r0 = rf(ctx, svc) @@ -424,6 +500,10 @@ func (_m *Controller) DeployKubernetesService(ctx context.Context, svc *cluster. func (_m *Controller) GetJob(ctx context.Context, namespace string, jobName string) (*batchv1.Job, error) { ret := _m.Called(ctx, namespace, jobName) + if len(ret) == 0 { + panic("no return value specified for GetJob") + } + var r0 *batchv1.Job var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, string) (*batchv1.Job, error)); ok { @@ -450,6 +530,10 @@ func (_m *Controller) GetJob(ctx context.Context, namespace string, jobName stri func (_m *Controller) GetKnativeServiceDesiredReplicas(ctx context.Context, svcName string, namespace string) (int, error) { ret := _m.Called(ctx, svcName, namespace) + if len(ret) == 0 { + panic("no return value specified for GetKnativeServiceDesiredReplicas") + } + var r0 int var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, string) (int, error)); ok { @@ -474,6 +558,10 @@ func (_m *Controller) GetKnativeServiceDesiredReplicas(ctx context.Context, svcN func (_m *Controller) GetKnativeServiceURL(ctx context.Context, svcName string, namespace string) string { ret := _m.Called(ctx, svcName, namespace) + if len(ret) == 0 { + panic("no return value specified for GetKnativeServiceURL") + } + var r0 string if rf, ok := ret.Get(0).(func(context.Context, string, string) string); ok { r0 = rf(ctx, svcName, namespace) @@ -488,6 +576,10 @@ func (_m *Controller) GetKnativeServiceURL(ctx context.Context, svcName string, func (_m *Controller) GetSparkApplication(ctx context.Context, namespace string, appName string) (*v1beta2.SparkApplication, error) { ret := _m.Called(ctx, namespace, appName) + if len(ret) == 0 { + panic("no return value specified for GetSparkApplication") + } + var r0 *v1beta2.SparkApplication var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, string) (*v1beta2.SparkApplication, error)); ok { @@ -514,6 +606,10 @@ func (_m *Controller) GetSparkApplication(ctx context.Context, namespace string, func (_m *Controller) ListPodLogs(ctx context.Context, namespace string, podName string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { ret := _m.Called(ctx, namespace, podName, opts) + if len(ret) == 0 { + panic("no return value specified for ListPodLogs") + } + var r0 io.ReadCloser var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, string, *corev1.PodLogOptions) (io.ReadCloser, error)); ok { @@ -540,6 +636,10 @@ func (_m *Controller) ListPodLogs(ctx context.Context, namespace string, podName func (_m *Controller) ListPods(ctx context.Context, namespace string, labelSelector string) (*corev1.PodList, error) { ret := _m.Called(ctx, namespace, labelSelector) + if len(ret) == 0 { + panic("no return value specified for ListPods") + } + var r0 *corev1.PodList var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, string) (*corev1.PodList, error)); ok { @@ -562,13 +662,12 @@ func (_m *Controller) ListPods(ctx context.Context, namespace string, labelSelec return r0, r1 } -type mockConstructorTestingTNewController interface { +// NewController creates a new instance of Controller. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewController(t interface { mock.TestingT Cleanup(func()) -} - -// NewController creates a new instance of Controller. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewController(t mockConstructorTestingTNewController) *Controller { +}) *Controller { mock := &Controller{} mock.Mock.Test(t) diff --git a/api/turing/cluster/servicebuilder/fluentd.go b/api/turing/cluster/servicebuilder/fluentd.go index fa38925da..a729cd086 100644 --- a/api/turing/cluster/servicebuilder/fluentd.go +++ b/api/turing/cluster/servicebuilder/fluentd.go @@ -15,9 +15,9 @@ import ( ) const ( - fluentdReplicaCount = 1 - fluentdCPURequest = "2" - fluentdMemRequest = "2Gi" + FluentdReplicaCount = 2 + fluentdCPURequest = "1" + fluentdMemRequest = "1Gi" fluentdPort = 24224 cacheVolumeMountPath = "/cache/" cacheVolumeSize = "2Gi" @@ -40,7 +40,8 @@ func (sb *clusterSvcBuilder) NewFluentdService( {Name: "FLUENTD_LOG_PATH", Value: "/cache/log/bq_load_logs.*.buffer"}, {Name: "FLUENTD_GCP_JSON_KEY_PATH", Value: secretMountPath + secretKeyNameRouter}, {Name: "FLUENTD_BUFFER_LIMIT", Value: "10g"}, - {Name: "FLUENTD_FLUSH_INTERVAL_SECONDS", Value: strconv.Itoa(fluentdConfig.FlushIntervalSeconds)}, + {Name: "FLUENTD_FLUSH_INTERVAL_SECONDS", + Value: strconv.Itoa(fluentdConfig.FlushIntervalSeconds * FluentdReplicaCount)}, {Name: "FLUENTD_TAG", Value: fluentdConfig.Tag}, {Name: "FLUENTD_GCP_PROJECT", Value: tableSplit[0]}, {Name: "FLUENTD_BQ_DATASET", Value: tableSplit[1]}, @@ -77,7 +78,7 @@ func (sb *clusterSvcBuilder) NewFluentdService( Volumes: volumes, VolumeMounts: volumeMounts, }, - Replicas: fluentdReplicaCount, + Replicas: FluentdReplicaCount, Ports: []cluster.Port{ { Name: "tcp-input", @@ -125,17 +126,8 @@ func buildFluentdVolumes( MountPath: secretMountPath, }) - volumes = append(volumes, corev1.Volume{ - Name: ComponentTypes.CacheVolume, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: cacheVolumePVCName, - }, - }, - }) - volumeMounts = append(volumeMounts, corev1.VolumeMount{ - Name: ComponentTypes.CacheVolume, + Name: cacheVolumePVCName, MountPath: cacheVolumeMountPath, }) diff --git a/api/turing/cluster/servicebuilder/fluentd_test.go b/api/turing/cluster/servicebuilder/fluentd_test.go index 40e94b6f7..1c86640ac 100644 --- a/api/turing/cluster/servicebuilder/fluentd_test.go +++ b/api/turing/cluster/servicebuilder/fluentd_test.go @@ -70,7 +70,7 @@ func TestNewFluentdService(t *testing.T) { {Name: "FLUENTD_LOG_PATH", Value: "/cache/log/bq_load_logs.*.buffer"}, {Name: "FLUENTD_GCP_JSON_KEY_PATH", Value: "/var/secret/router-service-account.json"}, {Name: "FLUENTD_BUFFER_LIMIT", Value: "10g"}, - {Name: "FLUENTD_FLUSH_INTERVAL_SECONDS", Value: "30"}, + {Name: "FLUENTD_FLUSH_INTERVAL_SECONDS", Value: "60"}, {Name: "FLUENTD_TAG", Value: "fluentd-tag"}, {Name: "FLUENTD_GCP_PROJECT", Value: "gcp-project-id"}, {Name: "FLUENTD_BQ_DATASET", Value: "dataset_id"}, @@ -104,14 +104,6 @@ func TestNewFluentdService(t *testing.T) { }, }, }, - { - Name: ComponentTypes.CacheVolume, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "test-svc-turing-cache-volume-1", - }, - }, - }, }, VolumeMounts: []corev1.VolumeMount{ { @@ -119,12 +111,12 @@ func TestNewFluentdService(t *testing.T) { MountPath: secretMountPath, }, { - Name: ComponentTypes.CacheVolume, + Name: "test-svc-turing-cache-volume-1", MountPath: cacheVolumeMountPath, }, }, }, - Replicas: fluentdReplicaCount, + Replicas: FluentdReplicaCount, Ports: []cluster.Port{ { Name: "tcp-input", diff --git a/api/turing/cluster/servicebuilder/service_builder.go b/api/turing/cluster/servicebuilder/service_builder.go index 85fa3f510..2a6e31a69 100644 --- a/api/turing/cluster/servicebuilder/service_builder.go +++ b/api/turing/cluster/servicebuilder/service_builder.go @@ -353,19 +353,22 @@ func (sb *clusterSvcBuilder) NewSecret( } // NewPodDisruptionBudget creates a new `cluster.PodDisruptionBudget` -// for the given service (router/enricher/ensembler). +// for the given service (router/enricher/ensembler/fluentd logger). func (sb *clusterSvcBuilder) NewPodDisruptionBudget( routerVersion *models.RouterVersion, project *mlp.Project, componentType string, pdbConfig config.PodDisruptionBudgetConfig, ) *cluster.PodDisruptionBudget { + var matchLabelKey string + if componentType != ComponentTypes.FluentdLogger { + matchLabelKey = cluster.KnativeServiceLabelKey + } else { + matchLabelKey = labeller.AppLabel + } selector := &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": fmt.Sprintf( - "%s-0", - GetComponentName(routerVersion, componentType), - ), + matchLabelKey: GetComponentName(routerVersion, componentType), }, } return &cluster.PodDisruptionBudget{ diff --git a/api/turing/service/router_deployment_service.go b/api/turing/service/router_deployment_service.go index 9bbaf50be..666ad7f13 100644 --- a/api/turing/service/router_deployment_service.go +++ b/api/turing/service/router_deployment_service.go @@ -14,6 +14,7 @@ import ( mlp "github.com/caraml-dev/mlp/api/client" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/caraml-dev/turing/api/turing/cluster" "github.com/caraml-dev/turing/api/turing/cluster/labeller" @@ -187,13 +188,6 @@ func (ds *deploymentService) DeployRouterVersion( if routerVersion.LogConfig.ResultLoggerType == models.BigQueryLogger { fluentdService := ds.svcBuilder.NewFluentdService(routerVersion, project, secretName, ds.routerDefaults.FluentdConfig) - // Create pvc - err = createPVC(ctx, controller, project.Name, fluentdService.PersistentVolumeClaim) - if err != nil { - eventsCh.Write(models.NewErrorEvent( - models.EventStageDeployingDependencies, "failed to deploy fluentd service: %s", err.Error())) - return endpoint, err - } // Deploy fluentd err = deployK8sService(ctx, controller, fluentdService) if err != nil { @@ -300,7 +294,7 @@ func (ds *deploymentService) UndeployRouterVersion( if err != nil { errs = append(errs, err.Error()) } - err = deletePVC(controller, project.Name, fluentdService.PersistentVolumeClaim, isCleanUp) + err = deleteStatefulSetPVCs(controller, project.Name, fluentdService.Name, isCleanUp) if err != nil { errs = append(errs, err.Error()) } @@ -538,7 +532,7 @@ func deleteK8sService( service *cluster.KubernetesService, isCleanUp bool, ) error { - err := controller.DeleteKubernetesDeployment(context.Background(), service.Name, service.Namespace, isCleanUp) + err := controller.DeleteKubernetesStatefulSet(context.Background(), service.Name, service.Namespace, isCleanUp) if err != nil { return err } @@ -564,27 +558,19 @@ func deleteSecret(controller cluster.Controller, secret *cluster.Secret, isClean return controller.DeleteSecret(context.Background(), secret.Name, secret.Namespace, isCleanUp) } -func createPVC( - ctx context.Context, +// deleteStatefulSetPVCs deletes all PVCs belonging to the specified stateful set in the given namespace. +func deleteStatefulSetPVCs( controller cluster.Controller, namespace string, - pvc *cluster.PersistentVolumeClaim, -) error { - select { - case <-ctx.Done(): - return errors.New("timeout deploying service") - default: - return controller.ApplyPersistentVolumeClaim(ctx, namespace, pvc) - } -} - -func deletePVC( - controller cluster.Controller, - namespace string, - pvc *cluster.PersistentVolumeClaim, + statefulSetName string, isCleanUp bool, ) error { - return controller.DeletePersistentVolumeClaim(context.Background(), pvc.Name, namespace, isCleanUp) + listOptions := metav1.ListOptions{LabelSelector: "app=" + statefulSetName} + err := controller.DeletePVCs(context.Background(), listOptions, namespace, isCleanUp) + if err != nil { + return fmt.Errorf("unable to get pvcs of the stateful set name %s: %s", statefulSetName, err.Error()) + } + return nil } // deployKnServices deploys all services simulateneously and waits for all of them to @@ -817,6 +803,19 @@ func (ds *deploymentService) createPodDisruptionBudgets( pdbs = append(pdbs, routerPdb) } + // Fluentd logger's PDB + if routerVersion.LogConfig.ResultLoggerType == models.BigQueryLogger && + math.Ceil(float64(servicebuilder.FluentdReplicaCount)* + minAvailablePercent) < float64(servicebuilder.FluentdReplicaCount) { + fluentdPdb := ds.svcBuilder.NewPodDisruptionBudget( + routerVersion, + project, + servicebuilder.ComponentTypes.FluentdLogger, + ds.pdbConfig, + ) + pdbs = append(pdbs, fluentdPdb) + } + return pdbs } diff --git a/api/turing/service/router_deployment_service_test.go b/api/turing/service/router_deployment_service_test.go index af7ecbcee..f46af6c2d 100644 --- a/api/turing/service/router_deployment_service_test.go +++ b/api/turing/service/router_deployment_service_test.go @@ -220,7 +220,6 @@ func TestDeployEndpoint(t *testing.T) { controller.On("CreateNamespace", mock.Anything, mock.Anything).Return(nil) controller.On("ApplyConfigMap", mock.Anything, mock.Anything, mock.Anything).Return(nil) controller.On("CreateSecret", mock.Anything, mock.Anything).Return(nil) - controller.On("ApplyPersistentVolumeClaim", mock.Anything, mock.Anything, mock.Anything).Return(nil) controller.On("ApplyIstioVirtualService", mock.Anything, mock.Anything).Return(nil) controller.On("ApplyPodDisruptionBudget", mock.Anything, mock.Anything, mock.Anything). Return(&policyv1.PodDisruptionBudget{}, nil) @@ -283,8 +282,6 @@ func TestDeployEndpoint(t *testing.T) { assert.NoError(t, err) assert.Equal(t, fmt.Sprintf("http://%s-router.models.example.com", routerVersion.Router.Name), endpoint) controller.AssertCalled(t, "CreateNamespace", mock.Anything, testNamespace) - controller.AssertCalled(t, "ApplyPersistentVolumeClaim", mock.Anything, - testNamespace, &cluster.PersistentVolumeClaim{Name: "pvc"}) controller.AssertCalled(t, "DeployKubernetesService", mock.Anything, &cluster.KubernetesService{ BaseService: &cluster.BaseService{ Name: fmt.Sprintf("%s-fluentd-logger-%d", routerVersion.Router.Name, routerVersion.Version), @@ -384,7 +381,7 @@ func TestDeployEndpoint(t *testing.T) { }, }, }) - controller.AssertNumberOfCalls(t, "ApplyPodDisruptionBudget", 2) + controller.AssertNumberOfCalls(t, "ApplyPodDisruptionBudget", 3) // Verify endpoint for upi routers routerVersion.Protocol = routerConfig.UPI @@ -415,13 +412,13 @@ func TestDeleteEndpoint(t *testing.T) { controller := &mocks.Controller{} controller.On("DeleteKnativeService", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) - controller.On("DeleteKubernetesDeployment", mock.Anything, mock.Anything, + controller.On("DeleteKubernetesStatefulSet", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) controller.On("DeleteKubernetesService", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) controller.On("DeleteSecret", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) controller.On("DeleteConfigMap", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) - controller.On("DeletePersistentVolumeClaim", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) + controller.On("DeletePVCs", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) controller.On("DeletePodDisruptionBudget", mock.Anything, mock.Anything, mock.Anything).Return(nil) // Create test router version @@ -482,10 +479,10 @@ func TestDeleteEndpoint(t *testing.T) { controller.AssertCalled(t, "DeleteKnativeService", mock.Anything, "test-svc-ensembler-1", testNs, false) controller.AssertCalled(t, "DeleteKnativeService", mock.Anything, "test-svc-router-1", testNs, false) controller.AssertCalled(t, "DeleteSecret", mock.Anything, "test-svc-svc-acct-secret-1", testNs, false) - controller.AssertCalled(t, "DeletePersistentVolumeClaim", mock.Anything, "pvc", testNs, false) + controller.AssertCalled(t, "DeletePVCs", mock.Anything, mock.Anything, testNs, false) controller.AssertCalled(t, "DeletePodDisruptionBudget", mock.Anything, testNs, mock.Anything) controller.AssertNumberOfCalls(t, "DeleteKnativeService", 3) - controller.AssertNumberOfCalls(t, "DeletePodDisruptionBudget", 2) + controller.AssertNumberOfCalls(t, "DeletePodDisruptionBudget", 3) } func TestBuildEnsemblerServiceImage(t *testing.T) { @@ -602,6 +599,9 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { }, }, }, + LogConfig: &models.LogConfig{ + ResultLoggerType: models.BigQueryLogger, + }, }, pdbConfig: config.PodDisruptionBudgetConfig{ Enabled: true, @@ -615,7 +615,7 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { MinAvailablePercentage: &twenty, Selector: &apimetav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "test-turing-enricher-3-0", + "serving.knative.dev/service": "test-turing-enricher-3", }, }, }, @@ -626,7 +626,7 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { MinAvailablePercentage: &twenty, Selector: &apimetav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "test-turing-ensembler-3-0", + "serving.knative.dev/service": "test-turing-ensembler-3", }, }, }, @@ -637,7 +637,18 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { MinAvailablePercentage: &twenty, Selector: &apimetav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "test-turing-router-3-0", + "serving.knative.dev/service": "test-turing-router-3", + }, + }, + }, + { + Name: "test-turing-fluentd-logger-3-pdb", + Namespace: "ns", + Labels: testRouterLabels, + MinAvailablePercentage: &twenty, + Selector: &apimetav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-turing-fluentd-logger-3", }, }, }, @@ -664,6 +675,9 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { }, }, }, + LogConfig: &models.LogConfig{ + ResultLoggerType: models.BigQueryLogger, + }, }, pdbConfig: config.PodDisruptionBudgetConfig{ Enabled: true, @@ -677,7 +691,7 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { MaxUnavailablePercentage: &eighty, Selector: &apimetav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "test-turing-enricher-3-0", + "serving.knative.dev/service": "test-turing-enricher-3", }, }, }, @@ -688,7 +702,7 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { MaxUnavailablePercentage: &eighty, Selector: &apimetav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "test-turing-ensembler-3-0", + "serving.knative.dev/service": "test-turing-ensembler-3", }, }, }, @@ -699,7 +713,18 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { MaxUnavailablePercentage: &eighty, Selector: &apimetav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "test-turing-router-3-0", + "serving.knative.dev/service": "test-turing-router-3", + }, + }, + }, + { + Name: "test-turing-fluentd-logger-3-pdb", + Namespace: "ns", + Labels: testRouterLabels, + MaxUnavailablePercentage: &eighty, + Selector: &apimetav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-turing-fluentd-logger-3", }, }, }, @@ -721,6 +746,9 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { }, }, }, + LogConfig: &models.LogConfig{ + ResultLoggerType: models.NopLogger, + }, }, pdbConfig: config.PodDisruptionBudgetConfig{ Enabled: true, @@ -734,7 +762,7 @@ func TestCreatePodDisruptionBudgets(t *testing.T) { MinAvailablePercentage: &twenty, Selector: &apimetav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "test-turing-ensembler-3-0", + "serving.knative.dev/service": "test-turing-ensembler-3", }, }, },