Skip to content

Commit

Permalink
Merge pull request #7649 from kawych/dws-htn
Browse files Browse the repository at this point in the history
Minor refactor to scale-up orchestrator for more re-usability
  • Loading branch information
k8s-ci-robot authored Jan 21, 2025
2 parents eb7a849 + 5245a5b commit abf3e44
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type AsyncNodeGroupInitializer struct {
atomicScaleUp bool
}

func newAsyncNodeGroupInitializer(
// NewAsyncNodeGroupInitializer creates a new AsyncNodeGroupInitializer instance.
func NewAsyncNodeGroupInitializer(
nodeGroup cloudprovider.NodeGroup,
nodeInfo *framework.NodeInfo,
scaleUpExecutor *scaleUpExecutor,
Expand Down
63 changes: 1 addition & 62 deletions cluster-autoscaler/core/scaleup/orchestrator/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package orchestrator

import (
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -138,7 +136,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
failedNodeGroups[i] = result.info.Group
scaleUpErrors[i] = result.err
}
return combineConcurrentScaleUpErrors(scaleUpErrors), failedNodeGroups
return errors.Combine(scaleUpErrors), failedNodeGroups
}
return nil, nil
}
Expand Down Expand Up @@ -188,65 +186,6 @@ func (e *scaleUpExecutor) executeScaleUp(
return nil
}

func combineConcurrentScaleUpErrors(errs []errors.AutoscalerError) errors.AutoscalerError {
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
uniqueMessages := make(map[string]bool)
uniqueTypes := make(map[errors.AutoscalerErrorType]bool)
for _, err := range errs {
uniqueTypes[err.Type()] = true
uniqueMessages[err.Error()] = true
}
if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 {
return errs[0]
}
// sort to stabilize the results and easier log aggregation
sort.Slice(errs, func(i, j int) bool {
errA := errs[i]
errB := errs[j]
if errA.Type() == errB.Type() {
return errs[i].Error() < errs[j].Error()
}
return errA.Type() < errB.Type()
})
firstErr := errs[0]
printErrorTypes := len(uniqueTypes) > 1
message := formatMessageFromConcurrentErrors(errs, printErrorTypes)
return errors.NewAutoscalerError(firstErr.Type(), message)
}

func formatMessageFromConcurrentErrors(errs []errors.AutoscalerError, printErrorTypes bool) string {
firstErr := errs[0]
var builder strings.Builder
builder.WriteString(firstErr.Error())
builder.WriteString(" ...and other concurrent errors: [")
formattedErrs := map[errors.AutoscalerError]bool{
firstErr: true,
}
for _, err := range errs {
if _, has := formattedErrs[err]; has {
continue
}
formattedErrs[err] = true
var message string
if printErrorTypes {
message = fmt.Sprintf("[%s] %s", err.Type(), err.Error())
} else {
message = err.Error()
}
if len(formattedErrs) > 2 {
builder.WriteString(", ")
}
builder.WriteString(fmt.Sprintf("%q", message))
}
builder.WriteString("]")
return builder.String()
}

// Checks if all groups are scaled only once.
// Scaling one group multiple times concurrently may cause problems.
func checkUniqueNodeGroups(scaleUpInfos []nodegroupset.ScaleUpInfo) errors.AutoscalerError {
Expand Down
127 changes: 0 additions & 127 deletions cluster-autoscaler/core/scaleup/orchestrator/executor_test.go

This file was deleted.

72 changes: 47 additions & 25 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,13 @@ func (o *ScaleUpOrchestrator) ScaleUp(
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, allOrNothing)
oldId := bestOption.NodeGroup.Id()
if o.autoscalingContext.AsyncNodeGroupsEnabled {
initializer := NewAsyncNodeGroupInitializer(bestOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing)
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroupAsync(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, initializer)
} else {
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
}
if aErr != nil {
return scaleUpStatus, aErr
}
Expand Down Expand Up @@ -501,46 +507,62 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
podEquivalenceGroups []*equivalence.PodGroup,
daemonSets []*appsv1.DaemonSet,
allOrNothing bool,
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
oldId := initialOption.NodeGroup.Id()
res, aErr := o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, initialOption.NodeGroup)
return o.processCreateNodeGroupResult(initialOption, oldId, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, res, aErr)
}

// CreateNodeGroupAsync will try to create a new node group asynchronously based on the initialOption.
func (o *ScaleUpOrchestrator) CreateNodeGroupAsync(
initialOption *expander.Option,
nodeInfos map[string]*framework.NodeInfo,
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
podEquivalenceGroups []*equivalence.PodGroup,
daemonSets []*appsv1.DaemonSet,
initializer nodegroups.AsyncNodeGroupInitializer,
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
oldId := initialOption.NodeGroup.Id()
var createNodeGroupResult nodegroups.CreateNodeGroupResult
var aErr errors.AutoscalerError
if o.autoscalingContext.AsyncNodeGroupsEnabled {
initializer := newAsyncNodeGroupInitializer(initialOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing)
createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroupAsync(o.autoscalingContext, initialOption.NodeGroup, initializer)
} else {
createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, initialOption.NodeGroup)
}
res, aErr := o.processors.NodeGroupManager.CreateNodeGroupAsync(o.autoscalingContext, initialOption.NodeGroup, initializer)
return o.processCreateNodeGroupResult(initialOption, oldId, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, res, aErr)
}

func (o *ScaleUpOrchestrator) processCreateNodeGroupResult(
initialOption *expander.Option,
initialOptionId string,
nodeInfos map[string]*framework.NodeInfo,
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
podEquivalenceGroups []*equivalence.PodGroup,
daemonSets []*appsv1.DaemonSet,
result nodegroups.CreateNodeGroupResult,
aErr errors.AutoscalerError,
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
if aErr != nil {
status, err := status.UpdateScaleUpError(
&status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{initialOption.NodeGroup}, PodsTriggeredScaleUp: initialOption.Pods},
aErr)
return createNodeGroupResults, status, err
return []nodegroups.CreateNodeGroupResult{}, status, err
}

createNodeGroupResults = append(createNodeGroupResults, createNodeGroupResult)
initialOption.NodeGroup = createNodeGroupResult.MainCreatedNodeGroup
initialOption.NodeGroup = result.MainCreatedNodeGroup

// If possible replace candidate node-info with node info based on crated node group. The latter
// one should be more in line with nodes which will be created by node group.
mainCreatedNodeInfo, aErr := simulator.SanitizedTemplateNodeInfoFromNodeGroup(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, o.taintConfig)
mainCreatedNodeInfo, aErr := simulator.SanitizedTemplateNodeInfoFromNodeGroup(result.MainCreatedNodeGroup, daemonSets, o.taintConfig)
if aErr == nil {
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo
schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, createNodeGroupResult.MainCreatedNodeGroup, mainCreatedNodeInfo)
nodeInfos[result.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo
schedulablePodGroups[result.MainCreatedNodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, result.MainCreatedNodeGroup, mainCreatedNodeInfo)
} else {
klog.Warningf("Cannot build node info for newly created main node group %v; balancing similar node groups may not work; err=%v", createNodeGroupResult.MainCreatedNodeGroup.Id(), aErr)
klog.Warningf("Cannot build node info for newly created main node group %v; balancing similar node groups may not work; err=%v", result.MainCreatedNodeGroup.Id(), aErr)
// Use node info based on expansion candidate but update Id which likely changed when node group was created.
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = nodeInfos[oldId]
schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = schedulablePodGroups[oldId]
nodeInfos[result.MainCreatedNodeGroup.Id()] = nodeInfos[initialOptionId]
schedulablePodGroups[result.MainCreatedNodeGroup.Id()] = schedulablePodGroups[initialOptionId]
}
if oldId != createNodeGroupResult.MainCreatedNodeGroup.Id() {
delete(nodeInfos, oldId)
delete(schedulablePodGroups, oldId)
if initialOptionId != result.MainCreatedNodeGroup.Id() {
delete(nodeInfos, initialOptionId)
delete(schedulablePodGroups, initialOptionId)
}
for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
for _, nodeGroup := range result.ExtraCreatedNodeGroups {
nodeInfo, aErr := simulator.SanitizedTemplateNodeInfoFromNodeGroup(nodeGroup, daemonSets, o.taintConfig)
if aErr != nil {
klog.Warningf("Cannot build node info for newly created extra node group %v; balancing similar node groups will not work; err=%v", nodeGroup.Id(), aErr)
Expand All @@ -554,7 +576,7 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
// TODO(lukaszos) when pursuing scalability update this call with one which takes list of changed node groups so we do not
// do extra API calls. (the call at the bottom of ScaleUp() could be also changed then)
o.clusterStateRegistry.Recalculate()
return createNodeGroupResults, nil, nil
return []nodegroups.CreateNodeGroupResult{result}, nil, nil
}

// SchedulablePodGroups returns a list of pods that could be scheduled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func TestCloudProviderFailingToScaleUpGroups(t *testing.T) {
assert.False(t, result.ScaleUpStatus.WasSuccessful())
assert.Equal(t, errors.CloudProviderError, result.ScaleUpError.Type())
assert.Equal(t, tc.expectedTotalTargetSizes, result.GroupTargetSizes["ng1"]+result.GroupTargetSizes["ng2"])
assert.Equal(t, tc.expectConcurrentErrors, strings.Contains(result.ScaleUpError.Error(), "...and other concurrent errors"))
assert.Equal(t, tc.expectConcurrentErrors, strings.Contains(result.ScaleUpError.Error(), "...and other errors"))
})
}
}
Expand Down
Loading

0 comments on commit abf3e44

Please sign in to comment.