Skip to content

Commit

Permalink
feat(api): Replace fluentd deployment with statefulset (#370)
Browse files Browse the repository at this point in the history
* Replace fluentd deployment with statefulset

* Remove redundant pvc volume and refactor volume mount from fluentd pod spec

* Refactor fluentd deletion steps to remove its stateful set

* Add pod disruption budgets to fluentd stateful set

* Shorten lines that are too long

* Halve fluentd cpu and mem requests

* Fix docstring typo

* Configure fluentd flush interval seconds to make it dependent on the number of replicas

* Refactor pvc deletion helper function to make it more generic

* Make pdb key dependent on the component type to be deployed

* Fix long line lint error

* Fix unit tests after changes to pdb keys
  • Loading branch information
deadlycoconuts authored Apr 3, 2024
1 parent 40df8e9 commit 7515693
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 337 deletions.
118 changes: 45 additions & 73 deletions api/turing/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type Controller interface {
ApplyIstioVirtualService(ctx context.Context, routerEndpoint *VirtualService) error
DeleteIstioVirtualService(ctx context.Context, svcName string, namespace string) error

// Deployment
DeleteKubernetesDeployment(ctx context.Context, name string, namespace string, ignoreNotFound bool) error
// StatefulSet
DeleteKubernetesStatefulSet(ctx context.Context, name string, namespace string, ignoreNotFound bool) error

// Service
DeployKubernetesService(ctx context.Context, svc *KubernetesService) error
Expand All @@ -82,8 +82,7 @@ type Controller interface {
DeleteSecret(ctx context.Context, secretName string, namespace string, ignoreNotFound bool) error

// PVC
ApplyPersistentVolumeClaim(ctx context.Context, namespace string, pvc *PersistentVolumeClaim) error
DeletePersistentVolumeClaim(ctx context.Context, pvcName string, namespace string, ignoreNotFound bool) error
DeletePVCs(ctx context.Context, listOptions metav1.ListOptions, namespace string, ignoreNotFound bool) error

// Pod
ListPods(ctx context.Context, namespace string, labelSelector string) (*apicorev1.PodList, error)
Expand Down Expand Up @@ -347,34 +346,34 @@ func (c *controller) GetKnativeServiceDesiredReplicas(
return int(*rev.Status.DesiredReplicas), nil
}

// DeployKubernetesService deploys a kubernetes service and deployment
// DeployKubernetesService deploys a kubernetes service and stateful set
func (c *controller) DeployKubernetesService(
ctx context.Context,
svcConf *KubernetesService,
) error {
desiredDeployment, desiredSvc := svcConf.BuildKubernetesServiceConfig()
desiredStatefulSet, desiredSvc := svcConf.BuildKubernetesServiceConfig()

// Deploy deployment
deployments := c.k8sAppsClient.Deployments(svcConf.Namespace)
// Check if deployment already exists. If exists, update it. If not, create.
var existingDeployment *apiappsv1.Deployment
// Deploy stateful set
statefulSets := c.k8sAppsClient.StatefulSets(svcConf.Namespace)
// Check if stateful set already exists. If exists, update it. If not, create.
var existingStatefulSet *apiappsv1.StatefulSet
var err error
existingDeployment, err = deployments.Get(ctx, svcConf.Name, metav1.GetOptions{})
existingStatefulSet, err = statefulSets.Get(ctx, svcConf.Name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
// Create new deployment
_, err = deployments.Create(ctx, desiredDeployment, metav1.CreateOptions{})
_, err = statefulSets.Create(ctx, desiredStatefulSet, metav1.CreateOptions{})
} else {
// Unexpected error, return it
return err
}
} else {
// Check for differences between current and new specs
if !k8sDeploymentSemanticEquals(desiredDeployment, existingDeployment) {
if !k8sStatefulSetSemanticEquals(existingStatefulSet, existingStatefulSet) {
// Update the existing service with the new config
existingDeployment.Spec.Template = desiredDeployment.Spec.Template
existingDeployment.ObjectMeta.Labels = desiredDeployment.ObjectMeta.Labels
_, err = deployments.Update(ctx, existingDeployment, metav1.UpdateOptions{})
existingStatefulSet.Spec.Template = desiredStatefulSet.Spec.Template
existingStatefulSet.ObjectMeta.Labels = desiredStatefulSet.ObjectMeta.Labels
_, err = statefulSets.Update(ctx, existingStatefulSet, metav1.UpdateOptions{})
}
}
if err != nil {
Expand Down Expand Up @@ -405,25 +404,25 @@ func (c *controller) DeployKubernetesService(
}

// Wait until deployment ready and return any errors
return c.waitDeploymentReady(ctx, svcConf.Name, svcConf.Namespace)
return c.waitStatefulSetReady(ctx, svcConf.Name, svcConf.Namespace)
}

// DeleteKubernetesDeployment deletes a kubernetes deployment
func (c *controller) DeleteKubernetesDeployment(
// DeleteKubernetesStatefulSet deletes a stateful set
func (c *controller) DeleteKubernetesStatefulSet(
ctx context.Context,
name string,
namespace string,
ignoreNotFound bool,
) error {
deployments := c.k8sAppsClient.Deployments(namespace)
_, err := deployments.Get(ctx, name, metav1.GetOptions{})
statefulSets := c.k8sAppsClient.StatefulSets(namespace)
_, err := statefulSets.Get(ctx, name, metav1.GetOptions{})
if err != nil {
if ignoreNotFound {
return nil
}
return err
}
return deployments.Delete(ctx, name, metav1.DeleteOptions{})
return statefulSets.Delete(ctx, name, metav1.DeleteOptions{})
}

// DeleteKubernetesService deletes a kubernetes service
Expand Down Expand Up @@ -517,44 +516,21 @@ func (c *controller) GetKnativeServiceURL(ctx context.Context, svcName string, n
return url
}

// ApplyPersistentVolumeClaim creates a PVC in the given namespace.
// If the PVC already exists, it will update the existing PVC.
func (c *controller) ApplyPersistentVolumeClaim(
// DeletePVCs deletes all PVCs specified by the given list options in the given namespace.
func (c *controller) DeletePVCs(
ctx context.Context,
namespace string,
pvcCfg *PersistentVolumeClaim,
) error {
pvcs := c.k8sCoreClient.PersistentVolumeClaims(namespace)
existingPVC, err := pvcs.Get(ctx, pvcCfg.Name, metav1.GetOptions{})
pvc := pvcCfg.BuildPersistentVolumeClaim()

// If not exists, create
if err != nil {
_, err := pvcs.Create(ctx, pvc, metav1.CreateOptions{})
return err
}
// If exists, update
existingPVC.Spec.Resources = pvc.Spec.Resources
_, err = pvcs.Update(ctx, existingPVC, metav1.UpdateOptions{})
return err
}

// DeletePersistentVolumeClaim deletes the PVC in the given namespace.
func (c *controller) DeletePersistentVolumeClaim(
ctx context.Context,
pvcName string,
listOptions metav1.ListOptions,
namespace string,
ignoreNotFound bool,
) error {
pvcs := c.k8sCoreClient.PersistentVolumeClaims(namespace)
_, err := pvcs.Get(ctx, pvcName, metav1.GetOptions{})
_, err := c.k8sCoreClient.PersistentVolumeClaims(namespace).List(ctx, listOptions)
if err != nil {
if ignoreNotFound {
return nil
}
return fmt.Errorf("unable to get pvc with name %s: %s", pvcName, err.Error())
return err
}
return pvcs.Delete(ctx, pvcName, metav1.DeleteOptions{})
return c.k8sCoreClient.PersistentVolumeClaims(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions)
}

func (c *controller) ListPods(ctx context.Context, namespace string, labelSelector string) (*apicorev1.PodList, error) {
Expand Down Expand Up @@ -813,49 +789,45 @@ func (c *controller) getKnativePodTerminationMessage(ctx context.Context, svcNam
return terminationMessage
}

// waitDeploymentReady waits for the given k8s deployment to become ready, until the
// waitStatefulSetReady waits for the given k8s stateful set to become ready, until the
// default timeout
func (c *controller) waitDeploymentReady(
func (c *controller) waitStatefulSetReady(
ctx context.Context,
deploymentName string,
statefulSetName string,
namespace string,
) error {
// Init ticker to check status every second
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

// Init knative ServicesGetter
deployments := c.k8sAppsClient.Deployments(namespace)
// Init stateful set getter
statefulSets := c.k8sAppsClient.StatefulSets(namespace)

for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for deployment %s to be ready", deploymentName)
return fmt.Errorf("timeout waiting for stateful set %s to be ready", statefulSetName)
case <-ticker.C:
deployment, err := deployments.Get(ctx, deploymentName, metav1.GetOptions{})
statefulSet, err := statefulSets.Get(ctx, statefulSetName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("unable to get deployment status for %s: %v", deploymentName, err)
return fmt.Errorf("unable to get stateful set status for %s: %v", statefulSetName, err)
}

if deploymentReady(deployment) {
if statefulSetReady(statefulSet) {
// Service is completely ready
return nil
}
}
}
}

func deploymentReady(deployment *apiappsv1.Deployment) bool {
if deployment.Generation <= deployment.Status.ObservedGeneration {
cond := deployment.Status.Conditions[0]
ready := cond.Type == apiappsv1.DeploymentAvailable
if deployment.Spec.Replicas != nil {
func statefulSetReady(statefulSet *apiappsv1.StatefulSet) bool {
if statefulSet.Generation <= statefulSet.Status.ObservedGeneration {
if statefulSet.Spec.Replicas != nil {
// Account for replica surge during updates
ready = ready &&
deployment.Status.ReadyReplicas == *deployment.Spec.Replicas &&
deployment.Status.Replicas == *deployment.Spec.Replicas
return statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas &&
statefulSet.Status.Replicas == *statefulSet.Spec.Replicas
}
return ready
}
return false
}
Expand All @@ -876,10 +848,10 @@ func knServiceSemanticEquals(desiredService, service *knservingv1.Service) bool
equality.Semantic.DeepEqual(desiredService.ObjectMeta.Labels, service.ObjectMeta.Labels)
}

func k8sDeploymentSemanticEquals(desiredDeployment, deployment *apiappsv1.Deployment) bool {
return equality.Semantic.DeepEqual(desiredDeployment.Spec.Template, deployment.Spec.Template) &&
equality.Semantic.DeepEqual(desiredDeployment.ObjectMeta.Labels, deployment.ObjectMeta.Labels) &&
desiredDeployment.Spec.Replicas == deployment.Spec.Replicas
func k8sStatefulSetSemanticEquals(desiredStatefulSet, statefulSet *apiappsv1.StatefulSet) bool {
return equality.Semantic.DeepEqual(desiredStatefulSet.Spec.Template, statefulSet.Spec.Template) &&
equality.Semantic.DeepEqual(desiredStatefulSet.ObjectMeta.Labels, statefulSet.ObjectMeta.Labels) &&
desiredStatefulSet.Spec.Replicas == statefulSet.Spec.Replicas
}

func k8sServiceSemanticEquals(desiredService, service *apicorev1.Service) bool {
Expand Down
Loading

0 comments on commit 7515693

Please sign in to comment.