Skip to content

Commit

Permalink
ScaleUp for check-capacity ProvisioningRequestClass (#6451)
Browse files Browse the repository at this point in the history
* ScaleUp for check-capacity ProvisioningRequestClass

* update condition logic

* Update tests

* Naming update

* Update cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go

Co-authored-by: Bartek Wróblewski <[email protected]>

---------

Co-authored-by: Bartek Wróblewski <[email protected]>
  • Loading branch information
yaroslava-serdiuk and BigDarkClown authored Jan 30, 2024
1 parent cf171a7 commit ed6ebbe
Show file tree
Hide file tree
Showing 18 changed files with 1,395 additions and 50 deletions.
32 changes: 13 additions & 19 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
}

loggingQuota := klogx.PodsLoggingQuota()
Expand All @@ -103,7 +103,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(

upcomingNodes, aErr := o.UpcomingNodes(nodeInfos)
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: "))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: "))
}
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))

Expand All @@ -112,7 +112,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
var err error
nodeGroups, nodeInfos, err = o.processors.NodeGroupListProcessor.Process(o.autoscalingContext, nodeGroups, nodeInfos, unschedulablePods)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
}
}

Expand All @@ -121,7 +121,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(

resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes)
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
}

now := time.Now()
Expand Down Expand Up @@ -186,15 +186,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(

newNodes, aErr := o.GetCappedNewNodeCount(bestOption.NodeCount, len(nodes)+len(upcomingNodes))
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
}

createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
oldId := bestOption.NodeGroup.Id()
createNodeGroupResult, aErr := o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, bestOption.NodeGroup)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{bestOption.NodeGroup}, PodsTriggeredScaleUp: bestOption.Pods},
aErr)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
if !found {
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
klog.Errorf("No node info for: %s", bestOption.NodeGroup.Id())
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
errors.NewAutoscalerError(
errors.CloudProviderError,
Expand All @@ -263,7 +263,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
// Apply upper limits for CPU and memory.
newNodes, aErr = o.resourceManager.ApplyLimits(o.autoscalingContext, newNodes, resourcesLeft, nodeInfo, bestOption.NodeGroup)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr)
}
Expand All @@ -283,15 +283,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(

scaleUpInfos, aErr := o.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(o.autoscalingContext, targetNodeGroups, newNodes)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr)
}

klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
CreateNodeGroupResults: createNodeGroupResults,
FailedResizeNodeGroups: failedNodeGroups,
Expand Down Expand Up @@ -322,7 +322,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
}

now := time.Now()
Expand All @@ -331,7 +331,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(

resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes)
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
}

for _, ng := range nodeGroups {
Expand Down Expand Up @@ -397,7 +397,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
if aErr != nil {
return scaleUpError(
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
FailedResizeNodeGroups: failedNodeGroups,
},
Expand Down Expand Up @@ -717,9 +717,3 @@ func GetPodsAwaitingEvaluation(egs []*equivalence.PodGroup, bestOption string) [
}
return awaitsEvaluation
}

func scaleUpError(s *status.ScaleUpStatus, err errors.AutoscalerError) (*status.ScaleUpStatus, errors.AutoscalerError) {
s.ScaleUpError = &err
s.Result = status.ScaleUpError
return s, err
}
112 changes: 112 additions & 0 deletions cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/rest"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// WrapperOrchestrator is an orchestrator which wraps Scale Up for ProvisioningRequests and regular pods.
// Each loop WrapperOrchestrator split out regular and pods from ProvisioningRequest, pick one group that
// wasn't picked in the last loop and run ScaleUp for it.
type WrapperOrchestrator struct {
// scaleUpRegularPods indicates that ScaleUp for regular pods will be run in the current CA loop, if they are present.
scaleUpRegularPods bool
scaleUpOrchestrator scaleup.Orchestrator
provReqOrchestrator scaleup.Orchestrator
}

// NewWrapperOrchestrator return WrapperOrchestrator
func NewWrapperOrchestrator(kubeConfig *rest.Config) (scaleup.Orchestrator, error) {
provReqOrchestrator, err := checkcapacity.New(kubeConfig)
if err != nil {
return nil, fmt.Errorf("failed create ScaleUp orchestrator for ProvisioningRequests, error: %v", err)
}
return &WrapperOrchestrator{
scaleUpOrchestrator: New(),
provReqOrchestrator: provReqOrchestrator,
}, nil
}

// Initialize initializes the orchestrator object with required fields.
func (o *WrapperOrchestrator) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
taintConfig taints.TaintConfig,
) {
o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig)
o.provReqOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig)
}

// ScaleUp run scaleUp function for regular pods of pods from ProvisioningRequest.
func (o *WrapperOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
defer func() { o.scaleUpRegularPods = !o.scaleUpRegularPods }()

provReqPods, regularPods := splitOut(unschedulablePods)
if len(provReqPods) == 0 {
o.scaleUpRegularPods = true
} else if len(regularPods) == 0 {
o.scaleUpRegularPods = false
}

if o.scaleUpRegularPods {
return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
}
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
}

func splitOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) {
for _, pod := range unschedulablePods {
if _, ok := pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey]; ok {
provReqPods = append(provReqPods, pod)
} else {
regularPods = append(regularPods, pod)
}
}
return
}

// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
// than the configured min size. The source of truth for the current node group
// size is the TargetSize queried directly from cloud providers. Returns
// appropriate status or error if an unexpected error occurred.
func (o *WrapperOrchestrator) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return o.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

import (
"testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
provisioningRequestErrorMsg = "provisioningRequestError"
regularPodsErrorMsg = "regularPodsError"
)

func TestScaleUp(t *testing.T) {
o := WrapperOrchestrator{
provReqOrchestrator: &fakeScaleUp{provisioningRequestErrorMsg},
scaleUpOrchestrator: &fakeScaleUp{regularPodsErrorMsg},
}
regularPods := []*apiv1.Pod{
BuildTestPod("pod-1", 1, 100),
BuildTestPod("pod-2", 1, 100),
}
provReqPods := []*apiv1.Pod{
BuildTestPod("pr-pod-1", 1, 100),
BuildTestPod("pr-pod-2", 1, 100),
}
for _, pod := range provReqPods {
pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey] = "true"
}
unschedulablePods := append(regularPods, provReqPods...)
_, err := o.ScaleUp(unschedulablePods, nil, nil, nil)
assert.Equal(t, err.Error(), provisioningRequestErrorMsg)
_, err = o.ScaleUp(unschedulablePods, nil, nil, nil)
assert.Equal(t, err.Error(), regularPodsErrorMsg)
}

type fakeScaleUp struct {
errorMsg string
}

func (f *fakeScaleUp) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return nil, errors.NewAutoscalerError(errors.InternalError, f.errorMsg)
}

func (f *fakeScaleUp) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
taintConfig taints.TaintConfig,
) {
}

func (f *fakeScaleUp) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return nil, nil
}
11 changes: 11 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand Down Expand Up @@ -468,6 +469,15 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil {
return nil, err
}
}

opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
Expand All @@ -477,6 +487,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
}

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
provisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"
// ProvisioningRequestPodAnnotationKey is an annotation on pod that indicate that pod was created by ProvisioningRequest.
ProvisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"
maxProvReqEvent = 50
)

Expand Down Expand Up @@ -101,6 +102,6 @@ func provisioningRequestName(pod *v1.Pod) (string, bool) {
if pod == nil || pod.Annotations == nil {
return "", false
}
provReqName, found := pod.Annotations[provisioningRequestPodAnnotationKey]
provReqName, found := pod.Annotations[ProvisioningRequestPodAnnotationKey]
return provReqName, found
}
Loading

0 comments on commit ed6ebbe

Please sign in to comment.