diff --git a/cluster-autoscaler/processors/provreq/testutils.go b/cluster-autoscaler/processors/provreq/testutils.go new file mode 100644 index 000000000000..cfda155ad67b --- /dev/null +++ b/cluster-autoscaler/processors/provreq/testutils.go @@ -0,0 +1,10 @@ +package provreq + +import ( + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/utils/clock/testing" +) + +func NewFakePodsInjector(client *provreqclient.ProvisioningRequestClient, clock *testing.FakePassiveClock) *ProvisioningRequestPodsInjector { + return &ProvisioningRequestPodsInjector{client: client, clock: clock} +} diff --git a/cluster-autoscaler/processors/status/scale_up_status_processor.go b/cluster-autoscaler/processors/status/scale_up_status_processor.go index 2bd48ba1ce45..0dee23dd0e19 100644 --- a/cluster-autoscaler/processors/status/scale_up_status_processor.go +++ b/cluster-autoscaler/processors/status/scale_up_status_processor.go @@ -17,6 +17,10 @@ limitations under the License. package status import ( + "fmt" + "sort" + "strings" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -143,3 +147,214 @@ func UpdateScaleUpError(s *ScaleUpStatus, err errors.AutoscalerError) (*ScaleUpS s.Result = ScaleUpError return s, err } + +type combinedStatusSet struct { + Result ScaleUpResult + ScaleupErrors map[*errors.AutoscalerError]bool + ScaleUpInfosSet map[nodegroupset.ScaleUpInfo]bool + PodsTriggeredScaleUpSet map[*apiv1.Pod]bool + PodsRemainUnschedulableSet map[*NoScaleUpInfo]bool + PodsAwaitEvaluationSet map[*apiv1.Pod]bool + CreateNodeGroupResultsSet map[*nodegroups.CreateNodeGroupResult]bool + ConsideredNodeGroupsSet map[cloudprovider.NodeGroup]bool + FailedCreationNodeGroupsSet map[cloudprovider.NodeGroup]bool + FailedResizeNodeGroupsSet map[cloudprovider.NodeGroup]bool +} + +func (c *combinedStatusSet) Add(status *ScaleUpStatus) { + // This relies on the fact that the ScaleUpResult enum is ordered in a way that the higher the value, the worse the result. This way we can just take the minimum of the results. If new results are added, either the enum should be updated keeping the order, or a different approach should be used to combine the results. + if c.Result > status.Result { + c.Result = status.Result + } + if status.ScaleUpError != nil { + if _, found := c.ScaleupErrors[status.ScaleUpError]; !found { + c.ScaleupErrors[status.ScaleUpError] = true + } + } + if status.ScaleUpInfos != nil { + for _, scaleUpInfo := range status.ScaleUpInfos { + if _, found := c.ScaleUpInfosSet[scaleUpInfo]; !found { + c.ScaleUpInfosSet[scaleUpInfo] = true + } + } + } + if status.PodsTriggeredScaleUp != nil { + for _, pod := range status.PodsTriggeredScaleUp { + if _, found := c.PodsTriggeredScaleUpSet[pod]; !found { + c.PodsTriggeredScaleUpSet[pod] = true + } + } + } + if status.PodsRemainUnschedulable != nil { + for _, pod := range status.PodsRemainUnschedulable { + if _, found := c.PodsRemainUnschedulableSet[&pod]; !found { + c.PodsRemainUnschedulableSet[&pod] = true + } + } + } + if status.PodsAwaitEvaluation != nil { + for _, pod := range status.PodsAwaitEvaluation { + if _, found := c.PodsAwaitEvaluationSet[pod]; !found { + c.PodsAwaitEvaluationSet[pod] = true + } + } + } + if status.CreateNodeGroupResults != nil { + for _, createNodeGroupResult := range status.CreateNodeGroupResults { + if _, found := c.CreateNodeGroupResultsSet[&createNodeGroupResult]; !found { + c.CreateNodeGroupResultsSet[&createNodeGroupResult] = true + } + } + } + if status.ConsideredNodeGroups != nil { + for _, nodeGroup := range status.ConsideredNodeGroups { + if _, found := c.ConsideredNodeGroupsSet[nodeGroup]; !found { + c.ConsideredNodeGroupsSet[nodeGroup] = true + } + } + } + if status.FailedCreationNodeGroups != nil { + for _, nodeGroup := range status.FailedCreationNodeGroups { + if _, found := c.FailedCreationNodeGroupsSet[nodeGroup]; !found { + c.FailedCreationNodeGroupsSet[nodeGroup] = true + } + } + } + if status.FailedResizeNodeGroups != nil { + for _, nodeGroup := range status.FailedResizeNodeGroups { + if _, found := c.FailedResizeNodeGroupsSet[nodeGroup]; !found { + c.FailedResizeNodeGroupsSet[nodeGroup] = true + } + } + } +} + +func (c *combinedStatusSet) formatMessageFromBatchErrors(errs []errors.AutoscalerError, printErrorTypes bool) string { + firstErr := errs[0] + var builder strings.Builder + builder.WriteString(firstErr.Error()) + builder.WriteString(" ...and other concurrent errors: [") + formattedErrs := map[errors.AutoscalerError]bool{ + firstErr: true, + } + for _, err := range errs { + if _, has := formattedErrs[err]; has { + continue + } + formattedErrs[err] = true + var message string + if printErrorTypes { + message = fmt.Sprintf("[%s] %s", err.Type(), err.Error()) + } else { + message = err.Error() + } + if len(formattedErrs) > 2 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("%q", message)) + } + builder.WriteString("]") + return builder.String() +} + +func (c *combinedStatusSet) combineBatchScaleUpErrors() *errors.AutoscalerError { + if len(c.ScaleupErrors) == 0 { + return nil + } + if len(c.ScaleupErrors) == 1 { + for err := range c.ScaleupErrors { + return err + } + } + uniqueMessages := make(map[string]bool) + uniqueTypes := make(map[errors.AutoscalerErrorType]bool) + for err := range c.ScaleupErrors { + uniqueTypes[(*err).Type()] = true + uniqueMessages[(*err).Error()] = true + } + if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 { + for err := range c.ScaleupErrors { + return err + } + } + // sort to stabilize the results and easier log aggregation + errs := make([]errors.AutoscalerError, 0, len(c.ScaleupErrors)) + for err := range c.ScaleupErrors { + errs = append(errs, *err) + } + sort.Slice(errs, func(i, j int) bool { + errA := errs[i] + errB := errs[j] + if errA.Type() == errB.Type() { + return errs[i].Error() < errs[j].Error() + } + return errA.Type() < errB.Type() + }) + firstErr := errs[0] + printErrorTypes := len(uniqueTypes) > 1 + message := c.formatMessageFromBatchErrors(errs, printErrorTypes) + combinedErr := errors.NewAutoscalerError(firstErr.Type(), message) + return &combinedErr +} + +func (c *combinedStatusSet) Export() *ScaleUpStatus { + result := &ScaleUpStatus{Result: c.Result} + if len(c.ScaleupErrors) > 0 { + result.ScaleUpError = c.combineBatchScaleUpErrors() + } + if len(c.ScaleUpInfosSet) > 0 { + for scaleUpInfo := range c.ScaleUpInfosSet { + result.ScaleUpInfos = append(result.ScaleUpInfos, scaleUpInfo) + } + } + if len(c.PodsTriggeredScaleUpSet) > 0 { + for pod := range c.PodsTriggeredScaleUpSet { + result.PodsTriggeredScaleUp = append(result.PodsTriggeredScaleUp, pod) + } + } + if len(c.PodsRemainUnschedulableSet) > 0 { + for pod := range c.PodsRemainUnschedulableSet { + result.PodsRemainUnschedulable = append(result.PodsRemainUnschedulable, *pod) + } + } + if len(c.PodsAwaitEvaluationSet) > 0 { + for pod := range c.PodsAwaitEvaluationSet { + result.PodsAwaitEvaluation = append(result.PodsAwaitEvaluation, pod) + } + } + if len(c.CreateNodeGroupResultsSet) > 0 { + for createNodeGroupResult := range c.CreateNodeGroupResultsSet { + result.CreateNodeGroupResults = append(result.CreateNodeGroupResults, *createNodeGroupResult) + } + } + if len(c.ConsideredNodeGroupsSet) > 0 { + for nodeGroup := range c.ConsideredNodeGroupsSet { + result.ConsideredNodeGroups = append(result.ConsideredNodeGroups, nodeGroup) + } + } + if len(c.FailedCreationNodeGroupsSet) > 0 { + for nodeGroup := range c.FailedCreationNodeGroupsSet { + result.FailedCreationNodeGroups = append(result.FailedCreationNodeGroups, nodeGroup) + } + } + if len(c.FailedResizeNodeGroupsSet) > 0 { + for nodeGroup := range c.FailedResizeNodeGroupsSet { + result.FailedResizeNodeGroups = append(result.FailedResizeNodeGroups, nodeGroup) + } + } + return result +} + +func NewCombinedStatusSet() combinedStatusSet { + return combinedStatusSet{ + ScaleupErrors: make(map[*errors.AutoscalerError]bool), + ScaleUpInfosSet: make(map[nodegroupset.ScaleUpInfo]bool), + PodsTriggeredScaleUpSet: make(map[*apiv1.Pod]bool), + PodsRemainUnschedulableSet: make(map[*NoScaleUpInfo]bool), + PodsAwaitEvaluationSet: make(map[*apiv1.Pod]bool), + CreateNodeGroupResultsSet: make(map[*nodegroups.CreateNodeGroupResult]bool), + ConsideredNodeGroupsSet: make(map[cloudprovider.NodeGroup]bool), + FailedCreationNodeGroupsSet: make(map[cloudprovider.NodeGroup]bool), + FailedResizeNodeGroupsSet: make(map[cloudprovider.NodeGroup]bool), + } +} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index a1af1d5e1f6c..a1fa7187097c 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -26,7 +26,7 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" + v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" @@ -34,6 +34,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" + "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" @@ -44,8 +45,9 @@ import ( kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" - "k8s.io/autoscaler/cluster-autoscaler/utils/units" "k8s.io/client-go/kubernetes/fake" + clocktesting "k8s.io/utils/clock/testing" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -78,6 +80,15 @@ func TestScaleUp(t *testing.T) { Class: v1.ProvisioningClassCheckCapacity, }) + anotherCheckCapacityCpuProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "anotherCheckCapacityCpuProvReq", + CPU: "5m", + Memory: "5", + PodCount: int32(100), + Class: v1.ProvisioningClassCheckCapacity, + }) + newCheckCapacityMemProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( provreqwrapper.TestProvReqOptions{ Name: "newCheckCapacityMemProvReq", @@ -86,6 +97,23 @@ func TestScaleUp(t *testing.T) { PodCount: int32(100), Class: v1.ProvisioningClassCheckCapacity, }) + impossibleCheckCapacityReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "impossibleCheckCapacityRequest", + CPU: "1m", + Memory: "1", + PodCount: int32(5001), + Class: v1.ProvisioningClassCheckCapacity, + }) + + anotherImpossibleCheckCapacityReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "anotherImpossibleCheckCapacityRequest", + CPU: "1m", + Memory: "1", + PodCount: int32(5001), + Class: v1.ProvisioningClassCheckCapacity, + }) // Active atomic scale up requests. atomicScaleUpProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( @@ -169,6 +197,10 @@ func TestScaleUp(t *testing.T) { scaleUpResult status.ScaleUpResult autoprovisioning bool err bool + batchProcessing bool + maxBatchSize int + batchTimebox time.Duration + numProvisioned int }{ { name: "no ProvisioningRequests", @@ -236,10 +268,129 @@ func TestScaleUp(t *testing.T) { autoprovisioning: true, scaleUpResult: status.ScaleUpSuccessful, }, + // Batch processing tests + { + name: "batch processing of check capacity requests with one request", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 1, + }, + { + name: "batch processing of check capacity requests with less requests than max batch size", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, + { + name: "batch processing of check capacity requests with requests equal to max batch size", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 2, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, + { + name: "batch processing of check capacity requests with more requests than max batch size", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq, anotherCheckCapacityCpuProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 2, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, + { + name: "batch processing of check capacity requests where cluster contains already provisioned requests", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, bookedCapacityProvReq, anotherCheckCapacityCpuProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 2, + batchTimebox: 5 * time.Minute, + numProvisioned: 3, + }, + { + name: "batch processing of check capacity requests where timebox is exceeded", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 5, + batchTimebox: 1 * time.Nanosecond, + numProvisioned: 1, + }, + { + name: "batch processing of check capacity requests where max batch size is invalid", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 0, + batchTimebox: 5 * time.Minute, + numProvisioned: 1, + }, + { + name: "batch processing of check capacity requests where best effort atomic scale-up request is also present in cluster", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, newCheckCapacityCpuProvReq, atomicScaleUpProvReq}, + provReqToScaleUp: newCheckCapacityMemProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 2, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, + { + name: "process atomic scale-up requests where batch processing of check capacity requests is enabled", + provReqs: []*provreqwrapper.ProvisioningRequest{possibleAtomicScaleUpReq}, + provReqToScaleUp: possibleAtomicScaleUpReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 1, + }, + { + name: "process atomic scale-up requests where batch processing of check capacity requests is enabled and check capacity requests are present in cluster", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, newCheckCapacityCpuProvReq, atomicScaleUpProvReq}, + provReqToScaleUp: atomicScaleUpProvReq, + scaleUpResult: status.ScaleUpNotNeeded, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 1, + }, + { + name: "batch processing of check capacity requests where some requests' capacity is not available", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, impossibleCheckCapacityReq, newCheckCapacityCpuProvReq}, + provReqToScaleUp: newCheckCapacityMemProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 3, + }, + { + name: "batch processing of check capacity requests where all requests' capacity is not available", + provReqs: []*provreqwrapper.ProvisioningRequest{impossibleCheckCapacityReq, anotherImpossibleCheckCapacityReq}, + provReqToScaleUp: impossibleCheckCapacityReq, + scaleUpResult: status.ScaleUpNoOptionsAvailable, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, } for _, tc := range testCases { - tc := tc - allNodes := allNodes t.Run(tc.name, func(t *testing.T) { t.Parallel() @@ -252,7 +403,8 @@ func TestScaleUp(t *testing.T) { } return fmt.Errorf("unexpected scale-up of %s by %d", name, n) } - orchestrator, nodeInfos := setupTest(t, allNodes, tc.provReqs, onScaleUpFunc, tc.autoprovisioning) + client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) + orchestrator, nodeInfos := setupTest(t, client, allNodes, onScaleUpFunc, tc.autoprovisioning, tc.batchProcessing, tc.maxBatchSize, tc.batchTimebox) st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nodeInfos, false) if !tc.err { @@ -263,6 +415,18 @@ func TestScaleUp(t *testing.T) { t.Errorf("noScaleUpInfo: %#v", st.PodsRemainUnschedulable[0].RejectedNodeGroups) } assert.Equal(t, tc.scaleUpResult, st.Result) + + // Stopgap solution to ensure that fake client has time to update the status of the provisioned requests. + time.Sleep(1 * time.Millisecond) + + provReqsAfterScaleUp, err := client.ProvisioningRequests() + assert.NoError(t, err) + assert.Equal(t, len(tc.provReqs), len(provReqsAfterScaleUp)) + + if tc.batchProcessing { + // Since batch processing returns aggregated result, we need to check the number of provisioned requests which have the provisioned condition. + assert.Equal(t, tc.numProvisioned, NumProvisioningRequestsWithCondition(provReqsAfterScaleUp, v1.Provisioned)) + } } else { assert.Error(t, err) } @@ -270,7 +434,7 @@ func TestScaleUp(t *testing.T) { } } -func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.ProvisioningRequest, onScaleUpFunc func(string, int) error, autoprovisioning bool) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) { +func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, nodes []*apiv1.Node, onScaleUpFunc func(string, int) error, autoprovisioning bool, batchProcessing bool, maxBatchSize int, batchTimebox time.Duration) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) { provider := testprovider.NewTestCloudProvider(onScaleUpFunc, nil) if autoprovisioning { machineTypes := []string{"large-machine"} @@ -292,11 +456,18 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio podLister := kube_util.NewTestPodLister(nil) listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil) - autoscalingContext, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, listers, provider, nil, nil) + + options := config.AutoscalingOptions{} + if batchProcessing { + options.CheckCapacityBatchProcessing = true + options.MaxBatchSize = maxBatchSize + options.BatchTimebox = batchTimebox + } + + autoscalingContext, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, nodes, nil) - client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, prs...) processors := NewTestProcessors(&autoscalingContext) if autoprovisioning { processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} @@ -307,15 +478,6 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingContext, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) - options := config.AutoscalingOptions{ - EstimatorName: estimator.BinpackingEstimatorName, - MaxCoresTotal: config.DefaultMaxClusterCores, - MaxMemoryTotal: config.DefaultMaxClusterMemory * units.GiB, - MinCoresTotal: 0, - MinMemoryTotal: 0, - NodeAutoprovisioningEnabled: autoprovisioning, - MaxAutoprovisionedNodeGroupCount: 10, - } estimatorBuilder, _ := estimator.NewEstimatorBuilder( estimator.BinpackingEstimatorName, estimator.NewThresholdBasedEstimationLimiter(nil), @@ -323,13 +485,34 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio nil, ) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingContext.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) clusterState.UpdateNodes(nodes, nodeInfos, now) + var injector *provreq.ProvisioningRequestPodsInjector + if batchProcessing { + injector = provreq.NewFakePodsInjector(client, clocktesting.NewFakePassiveClock(now)) + } + orchestrator := &provReqOrchestrator{ client: client, - provisioningClasses: []ProvisioningClass{checkcapacity.New(client, nil), besteffortatomic.New(client)}, + provisioningClasses: []ProvisioningClass{checkcapacity.New(client, injector), besteffortatomic.New(client)}, } + orchestrator.Initialize(&autoscalingContext, processors, clusterState, estimatorBuilder, taints.TaintConfig{}) return orchestrator, nodeInfos } + +func NumProvisioningRequestsWithCondition(prList []*provreqwrapper.ProvisioningRequest, conditionType string) int { + count := 0 + + for _, pr := range prList { + for _, c := range pr.Status.Conditions { + if c.Type == conditionType { + count++ + break + } + } + } + + return count +}