Skip to content

Commit

Permalink
Autoscaling Workflow Enhancement - Part 3 (#101)
Browse files Browse the repository at this point in the history
* Enable updating replicas based on KPA algorithm in PodAutoscaler

* fix PR review comments
  • Loading branch information
kr11 authored Aug 29, 2024
1 parent 96ff193 commit 9fa738e
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 65 deletions.
38 changes: 23 additions & 15 deletions docs/tutorial/podautoscaler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,14 @@ Note: The reactive speed of the default HPA is limited; AIBrix plans to optimize

# [WIP] Case 2: Create KPA-based AIBrix PodAutoscaler

Create Nginx App:
Create Nginx App, the default replicas is 1:
```shell
kubectl apply -f config/samples/autoscaling_v1alpha1_demo_nginx.yaml
kubectl get deployments -n default
```
```log
NAME READY UP-TO-DATE AVAILABLE AGE
nginx-deployment 1/1 1 1 24s
```

Create an autoscaler with type of KPA:
Expand All @@ -211,35 +216,38 @@ kubectl get podautoscalers --all-namespaces
>>> default podautoscaler-example-kpa 5m1s
```

**Note that**: Since we pass CPU target value 0 to KPA, it will try to scale the pod down to 0.

You can see logs like `KPA algorithm run...` in `aibrix-controller-manager`:

```shell
kubectl get pods -n aibrix-system -o name | grep aibrix-controller-manager | head -n 1 | xargs -I {} kubectl logs {} -n aibrix-system
```

```log
deployment nginx-deployment does not have a model, labels: map[]
I0826 08:47:48.965426 1 kpa.go:247] "Operating in stable mode."
2024-08-26T08:47:48Z DEBUG events KPA algorithm run. desiredReplicas: 0, currentReplicas: 1 {"type": "Normal", "object": {"kind":"PodAutoscaler","namespace":"default","name":"podautoscaler-example-kpa","uid":"a76f80e6-bdeb-462f-85c1-97192005d9fb","apiVersion":"autoscaling.aibrix.ai/v1alpha1","resourceVersion":"2245812"}, "reason": "KPAAlgorithmRun"}
2024-08-26T08:47:48Z DEBUG events We set rescale=False temporarily to skip scaling action {"type": "Warning", "object": {"kind":"PodAutoscaler","namespace":"default","name":"podautoscaler-example-kpa","uid":"a76f80e6-bdeb-462f-85c1-97192005d9fb","apiVersion":"autoscaling.aibrix.ai/v1alpha1","resourceVersion":"2245812"}, "reason": "PipelineWIP"}
I0826 08:47:48.968666 1 kpa.go:247] "Operating in stable mode."
2024-08-26T08:47:48Z DEBUG events KPA algorithm run. desiredReplicas: 0, currentReplicas: 1 {"type": "Normal", "object": {"kind":"PodAutoscaler","namespace":"default","name":"podautoscaler-example-kpa","uid":"a76f80e6-bdeb-462f-85c1-97192005d9fb","apiVersion":"autoscaling.aibrix.ai/v1alpha1","resourceVersion":"2245814"}, "reason": "KPAAlgorithmRun"}
2024-08-26T08:47:48Z DEBUG events We set rescale=False temporarily to skip scaling action {"type": "Warning", "object": {"kind":"PodAutoscaler","namespace":"default","name":"podautoscaler-example-kpa","uid":"a76f80e6-bdeb-462f-85c1-97192005d9fb","apiVersion":"autoscaling.aibrix.ai/v1alpha1","resourceVersion":"2245814"}, "reason": "PipelineWIP"}
2024-08-28T03:52:01Z INFO Obtained selector and get ReadyPodsCount {"controller": "podautoscaler", "controllerGroup": "autoscaling.aibrix.ai", "controllerKind": "PodAutoscaler", "PodAutoscaler": {"name":"podautoscaler-example-kpa","namespace":"default"}, "namespace": "default", "name": "podautoscaler-example-kpa", "reconcileID": "2a811d63-9181-46b5-ac63-a512cb6c4e17", "selector": "app=nginx", "originalReadyPodsCount": 1}
I0828 03:52:01.735190 1 kpa.go:245] "Operating in stable mode."
2024-08-28T03:52:01Z INFO Successfully called Scale Algorithm {"controller": "podautoscaler", "controllerGroup": "autoscaling.aibrix.ai", "controllerKind": "PodAutoscaler", "PodAutoscaler": {"name":"podautoscaler-example-kpa","namespace":"default"}, "namespace": "default", "name": "podautoscaler-example-kpa", "reconcileID": "2a811d63-9181-46b5-ac63-a512cb6c4e17", "scaleResult": {"DesiredPodCount":0,"ExcessBurstCapacity":98,"ScaleValid":true}}
2024-08-28T03:52:01Z INFO Proposing desired replicas {"controller": "podautoscaler", "controllerGroup": "autoscaling.aibrix.ai", "controllerKind": "PodAutoscaler", "PodAutoscaler": {"name":"podautoscaler-example-kpa","namespace":"default"}, "namespace": "default", "name": "podautoscaler-example-kpa", "reconcileID": "2a811d63-9181-46b5-ac63-a512cb6c4e17", "desiredReplicas": 0, "metric": "", "timestamp": "2024-08-28T03:52:01Z", "scaleTarget": "Deployment/default/nginx-deployment"}
2024-08-28T03:52:01Z DEBUG events KPA algorithm run. desiredReplicas: 0, currentReplicas: 1 {"type": "Normal", "object": {"kind":"PodAutoscaler","namespace":"default","name":"podautoscaler-example-kpa","uid":"f1dda4ea-4627-47d8-aaa8-a5499ed20698","apiVersion":"autoscaling.aibrix.ai/v1alpha1","resourceVersion":"2348298"}, "reason": "KPAAlgorithmRun"}
2024-08-28T03:52:01Z INFO Successfully rescaled {"controller": "podautoscaler", "controllerGroup": "autoscaling.aibrix.ai", "controllerKind": "PodAutoscaler", "PodAutoscaler": {"name":"podautoscaler-example-kpa","namespace":"default"}, "namespace": "default", "name": "podautoscaler-example-kpa", "reconcileID": "2a811d63-9181-46b5-ac63-a512cb6c4e17", "currentReplicas": 1, "desiredReplicas": 0, "reason": "All metrics below target"}
2024-08-28T03:52:01Z DEBUG events New size: 0; reason: All metrics below target {"type": "Normal", "object": {"kind":"PodAutoscaler","namespace":"default","name":"podautoscaler-example-kpa","uid":"f1dda4ea-4627-47d8-aaa8-a5499ed20698","apiVersion":"autoscaling.aibrix.ai/v1alpha1","resourceVersion":"2348298"}, "reason": "SuccessfulRescale"}
2024-08-28T03:52:01Z DEBUG events KPA algorithm run. desiredReplicas: 0, currentReplicas: 0 {"type": "Normal", "object": {"kind":"PodAutoscaler","namespace":"default","name":"podautoscaler-example-kpa","uid":"f1dda4ea-4627-47d8-aaa8-a5499ed20698","apiVersion":"autoscaling.aibrix.ai/v1alpha1","resourceVersion":"2348301"}, "reason": "KPAAlgorithmRun"}
2024-08-28T03:58:41Z DEBUG events KPA algorithm run. desiredReplicas: 0, currentReplicas: 0 {"type": "Normal", "object": {"kind":"PodAutoscaler","namespace":"default","name":"podautoscaler-example-kpa","uid":"f1dda4ea-4627-47d8-aaa8-a5499ed20698","apiVersion":"autoscaling.aibrix.ai/v1alpha1","resourceVersion":"2348301"}, "reason": "KPAAlgorithmRun"}
```



Some logs from `podautoscaler-example-kpa` are shown below, where you can observe events like `KPAAlgorithmRun` and `PipelineWIP`:
Some events from `podautoscaler-example-kpa` are shown below, KPA succeeded to scale down the pods to 0:

```shell
kubectl describe podautoscalers podautoscaler-example-kpa
```
```log
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal KPAAlgorithmRun 6m15s (x2 over 6m15s) PodAutoscaler KPA algorithm run. desiredReplicas: 0, currentReplicas: 1
Warning PipelineWIP 6m15s (x2 over 6m15s) PodAutoscaler We set rescale=False temporarily to skip scaling action
Type Reason Age From Message
---- ------ ---- ---- -------
Normal KPAAlgorithmRun 2m23s PodAutoscaler KPA algorithm run. desiredReplicas: 0, currentReplicas: 1
Normal SuccessfulRescale 2m23s PodAutoscaler New size: 0; reason: All metrics below target
Normal KPAAlgorithmRun 2m23s PodAutoscaler KPA algorithm run. desiredReplicas: 0, currentReplicas: 0
```


Expand Down
126 changes: 76 additions & 50 deletions pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,22 @@ import (
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"

scaler "github.com/aibrix/aibrix/pkg/controller/podautoscaler/scaler"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
podutils "github.com/aibrix/aibrix/pkg/utils"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
podutils "github.com/aibrix/aibrix/pkg/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -233,7 +229,17 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
setCondition(&pa, "AbleToScale", metav1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")

// current scale's replica count
currentReplicas := *scale.Spec.Replicas
currentReplicasInt64, found, err := unstructured.NestedInt64(scale.Object, "spec", "replicas")
if !found {
r.EventRecorder.Eventf(&pa, corev1.EventTypeWarning, "ReplicasNotFound", "The 'replicas' field is missing from the scale object")
return ctrl.Result{}, fmt.Errorf("the 'replicas' field was not found in the scale object")
}
if err != nil {
r.EventRecorder.Eventf(&pa, corev1.EventTypeWarning, "FailedGetScale", "Error retrieving 'replicas' from scale: %v", err)
return ctrl.Result{}, fmt.Errorf("failed to get 'replicas' from scale: %v", err)
}
currentReplicas := int32(currentReplicasInt64)

// desired replica count
desiredReplicas := int32(0)
rescaleReason := ""
Expand All @@ -245,7 +251,7 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
minReplicas = 1
}

rescale := true //nolint:ineffassign
rescale := true
r.EventRecorder.Eventf(&pa, corev1.EventTypeNormal, "KPAAlgorithmRun",
"KPA algorithm run. desiredReplicas: %d, currentReplicas: %d",
desiredReplicas, currentReplicas)
Expand All @@ -255,7 +261,7 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
if currentReplicas == int32(0) && minReplicas != 0 {
// if the replica is 0, then we should not enable autoscaling
desiredReplicas = 0
rescale = false //nolint:ineffassign
rescale = false
} else if currentReplicas > pa.Spec.MaxReplicas {
desiredReplicas = pa.Spec.MaxReplicas
} else if currentReplicas < minReplicas {
Expand All @@ -271,7 +277,7 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
return ctrl.Result{}, fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", scaleReference, err)
}

logger.V(4).Info("Proposing desired replicas",
logger.Info("Proposing desired replicas",
"desiredReplicas", metricDesiredReplicas,
"metric", metricName,
"timestamp", metricTimestamp,
Expand All @@ -288,25 +294,18 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
if desiredReplicas < currentReplicas {
rescaleReason = "All metrics below target"
}
rescale = desiredReplicas != currentReplicas //nolint:ineffassign,staticcheck
rescale = desiredReplicas != currentReplicas
}

r.EventRecorder.Event(&pa, corev1.EventTypeWarning, "PipelineWIP", "We set rescale=False temporarily to skip scaling action")

// TODO: Remove the following line after debugging metrics, algorithm, and scaling actions is complete.
// After completion, remember to delete all `nolint:ineffassign`
rescale = false //nolint:ineffassign

if rescale {
scale.Spec.Replicas = &desiredReplicas
r.EventRecorder.Eventf(&pa, corev1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
setCondition(&pa, "AbleToScale", metav1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
r.setCurrentReplicasAndMetricsInStatus(&pa, currentReplicas)
if err := r.updateStatusIfNeeded(ctx, paStatusOriginal, &pa); err != nil {
utilruntime.HandleError(err)
}

if err := r.updateScale(ctx, pa.Namespace, targetGR, scale); err != nil {
if err := r.updateScale(ctx, pa.Namespace, targetGR, scale, desiredReplicas); err != nil {
r.EventRecorder.Eventf(&pa, corev1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err)
setCondition(&pa, "AbleToScale", metav1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
r.setCurrentReplicasAndMetricsInStatus(&pa, currentReplicas)
if err := r.updateStatusIfNeeded(ctx, paStatusOriginal, &pa); err != nil {
utilruntime.HandleError(err)
}
return ctrl.Result{}, fmt.Errorf("failed to rescale %s: %v", scaleReference, err)
}

Expand All @@ -315,6 +314,8 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali
// return ctrl.Result{}, fmt.Errorf("failed to rescale %s: %v", scaleReference, err)
//}

r.EventRecorder.Eventf(&pa, corev1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)

logger.Info("Successfully rescaled",
//"PodAutoscaler", klog.KObj(pa),
"currentReplicas", currentReplicas,
Expand All @@ -340,14 +341,24 @@ func (r *PodAutoscalerReconciler) reconcileAPA(ctx context.Context, pa autoscali
// scaleForResourceMappings attempts to fetch the scale for the resource with the given name and namespace,
// trying each RESTMapping in turn until a working one is found. If none work, the first error is returned.
// It returns both the scale, as well as the group-resource from the working mapping.
func (r *PodAutoscalerReconciler) scaleForResourceMappings(ctx context.Context, namespace, name string, mappings []*apimeta.RESTMapping) (*appsv1.Deployment, schema.GroupResource, error) {
func (r *PodAutoscalerReconciler) scaleForResourceMappings(ctx context.Context, namespace, name string, mappings []*apimeta.RESTMapping) (*unstructured.Unstructured, schema.GroupResource, error) {
var firstErr error
for i, mapping := range mappings {
targetGR := mapping.Resource.GroupResource()
deployment := &appsv1.Deployment{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, deployment)

gvk := schema.GroupVersionKind{
Group: mapping.GroupVersionKind.Group,
Version: mapping.GroupVersionKind.Version,
Kind: mapping.GroupVersionKind.Kind,
}
scale := &unstructured.Unstructured{}
scale.SetGroupVersionKind(gvk)
scale.SetNamespace(namespace)
scale.SetName(name)

err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: name}, scale)
if err == nil {
return deployment, targetGR, nil
return scale, targetGR, nil
}

if firstErr == nil {
Expand All @@ -369,22 +380,14 @@ func (r *PodAutoscalerReconciler) scaleForResourceMappings(ctx context.Context,
return nil, schema.GroupResource{}, firstErr
}

func (r *PodAutoscalerReconciler) updateScale(ctx context.Context, namespace string, targetGR schema.GroupResource, scale *appsv1.Deployment) error {
// Get GVK
gvk, err := apiutil.GVKForObject(scale, r.Client.Scheme())
func (r *PodAutoscalerReconciler) updateScale(ctx context.Context, namespace string, targetGR schema.GroupResource, scale *unstructured.Unstructured, replicas int32) error {
err := unstructured.SetNestedField(scale.Object, int64(replicas), "spec", "replicas")
if err != nil {
return err
}

// Get unstructured object
scaleObj := &unstructured.Unstructured{}
scaleObj.SetGroupVersionKind(gvk)
scaleObj.SetNamespace(namespace)
scaleObj.SetName(scale.Name)

// Update scale object
//err = r.Client.Patch(ctx, scale, client.Apply, client.FieldOwner("operator-name"))
err = r.Client.Patch(ctx, scale, client.Apply)
err = r.Update(ctx, scale)
if err != nil {
return err
}
Expand Down Expand Up @@ -445,26 +448,49 @@ func (r *PodAutoscalerReconciler) updateStatus(ctx context.Context, pa *autoscal
// It may return both valid metricDesiredReplicas and an error,
// when some metrics still work and HPA should perform scaling based on them.
// If PodAutoscaler cannot do anything due to error, it returns -1 in metricDesiredReplicas as a failure signal.
func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *appsv1.Deployment) (replicas int32, metrics string, timestamp time.Time, err error) {
func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured) (replicas int32, metrics string, timestamp time.Time, err error) {
logger := klog.FromContext(ctx)
currentTimestamp := time.Now()
// Retrieve the count of ready pods based on the label selector.
labelSelector, err := metav1.LabelSelectorAsSelector(scale.Spec.Selector)

// Retrieve the selector string from the Scale object's Status.
selectorMap, found, err := unstructured.NestedMap(scale.Object, "spec", "selector")
if err != nil || !found {
if !found {
return 0, "", currentTimestamp, fmt.Errorf("the 'spec.selector' field was not found in the scale object")
}
return 0, "", currentTimestamp, fmt.Errorf("failed to get 'spec.selector' from scale: %v", err)
}

// Convert selectorMap to *metav1.LabelSelector object
selector := &metav1.LabelSelector{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(selectorMap, selector)
if err != nil {
return 0, "", currentTimestamp, fmt.Errorf("failed to convert 'spec.selector' to LabelSelector: %v", err)
}

// Convert *metav1.LabelSelector object to labels.Selector structure
labelsSelector, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return 0, "", currentTimestamp, fmt.Errorf("error converting label selector: %w", err)
return 0, "", currentTimestamp, fmt.Errorf("failed to convert LabelSelector to labels.Selector: %v", err)
}
originalReadyPodsCount, err := scaler.GetReadyPodsCount(ctx, r.Client, scale.Namespace, labelSelector)

originalReadyPodsCount, err := scaler.GetReadyPodsCount(ctx, r.Client, pa.Namespace, labelsSelector)

if err != nil {
return 0, "", currentTimestamp, fmt.Errorf("error getting ready pods count: %w", err)
}

logger.Info("Obtained selector and get ReadyPodsCount", "selector", labelsSelector, "originalReadyPodsCount", originalReadyPodsCount)

// TODO: Complete the remaining items - Retrieve the following metrics from metricClient:
// 1. observedStableValue: Average over the past stableWindow period.
// 2. observedPanicValue: Average over the past panicWindow period.
// Calculate the desired number of pods using the autoscaler logic.
scaleResult := r.Autoscaler.Scale(int(originalReadyPodsCount), 0, 0, currentTimestamp)
if scaleResult.ScaleValid {
logger.Info("Successfully called Scale Algorithm", "scaleResult", scaleResult)
return scaleResult.DesiredPodCount, "", currentTimestamp, nil
}

return 0, "", currentTimestamp, fmt.Errorf("can not calculate metrics for scale %s", scale.Name)
return 0, "", currentTimestamp, fmt.Errorf("can not calculate metrics for scale %s", pa.Spec.ScaleTargetRef.Name)
}

0 comments on commit 9fa738e

Please sign in to comment.