diff --git a/cluster-autoscaler/processors/provreq/testutils.go b/cluster-autoscaler/processors/provreq/testutils.go new file mode 100644 index 000000000000..cdc5984cef5e --- /dev/null +++ b/cluster-autoscaler/processors/provreq/testutils.go @@ -0,0 +1,27 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provreq + +import ( + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/utils/clock/testing" +) + +// NewFakePodsInjector creates a new instance of ProvisioningRequestPodsInjector with the given client and clock for testing. +func NewFakePodsInjector(client *provreqclient.ProvisioningRequestClient, clock *testing.FakePassiveClock) *ProvisioningRequestPodsInjector { + return &ProvisioningRequestPodsInjector{client: client, clock: clock} +} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index a1af1d5e1f6c..d71bf03e5624 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" + "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" @@ -44,8 +45,9 @@ import ( kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" - "k8s.io/autoscaler/cluster-autoscaler/utils/units" "k8s.io/client-go/kubernetes/fake" + clocktesting "k8s.io/utils/clock/testing" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -78,6 +80,15 @@ func TestScaleUp(t *testing.T) { Class: v1.ProvisioningClassCheckCapacity, }) + anotherCheckCapacityCpuProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "anotherCheckCapacityCpuProvReq", + CPU: "5m", + Memory: "5", + PodCount: int32(100), + Class: v1.ProvisioningClassCheckCapacity, + }) + newCheckCapacityMemProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( provreqwrapper.TestProvReqOptions{ Name: "newCheckCapacityMemProvReq", @@ -86,6 +97,23 @@ func TestScaleUp(t *testing.T) { PodCount: int32(100), Class: v1.ProvisioningClassCheckCapacity, }) + impossibleCheckCapacityReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "impossibleCheckCapacityRequest", + CPU: "1m", + Memory: "1", + PodCount: int32(5001), + Class: v1.ProvisioningClassCheckCapacity, + }) + + anotherImpossibleCheckCapacityReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "anotherImpossibleCheckCapacityRequest", + CPU: "1m", + Memory: "1", + PodCount: int32(5001), + Class: v1.ProvisioningClassCheckCapacity, + }) // Active atomic scale up requests. atomicScaleUpProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( @@ -169,6 +197,10 @@ func TestScaleUp(t *testing.T) { scaleUpResult status.ScaleUpResult autoprovisioning bool err bool + batchProcessing bool + maxBatchSize int + batchTimebox time.Duration + numProvisioned int }{ { name: "no ProvisioningRequests", @@ -236,10 +268,129 @@ func TestScaleUp(t *testing.T) { autoprovisioning: true, scaleUpResult: status.ScaleUpSuccessful, }, + // Batch processing tests + { + name: "batch processing of check capacity requests with one request", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 1, + }, + { + name: "batch processing of check capacity requests with less requests than max batch size", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, + { + name: "batch processing of check capacity requests with requests equal to max batch size", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 2, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, + { + name: "batch processing of check capacity requests with more requests than max batch size", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq, anotherCheckCapacityCpuProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 2, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, + { + name: "batch processing of check capacity requests where cluster contains already provisioned requests", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, bookedCapacityProvReq, anotherCheckCapacityCpuProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 2, + batchTimebox: 5 * time.Minute, + numProvisioned: 3, + }, + { + name: "batch processing of check capacity requests where timebox is exceeded", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 5, + batchTimebox: 1 * time.Nanosecond, + numProvisioned: 1, + }, + { + name: "batch processing of check capacity requests where max batch size is invalid", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq}, + provReqToScaleUp: newCheckCapacityCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 0, + batchTimebox: 5 * time.Minute, + numProvisioned: 1, + }, + { + name: "batch processing of check capacity requests where best effort atomic scale-up request is also present in cluster", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, newCheckCapacityCpuProvReq, atomicScaleUpProvReq}, + provReqToScaleUp: newCheckCapacityMemProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 2, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, + { + name: "process atomic scale-up requests where batch processing of check capacity requests is enabled", + provReqs: []*provreqwrapper.ProvisioningRequest{possibleAtomicScaleUpReq}, + provReqToScaleUp: possibleAtomicScaleUpReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 1, + }, + { + name: "process atomic scale-up requests where batch processing of check capacity requests is enabled and check capacity requests are present in cluster", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, newCheckCapacityCpuProvReq, atomicScaleUpProvReq}, + provReqToScaleUp: atomicScaleUpProvReq, + scaleUpResult: status.ScaleUpNotNeeded, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 1, + }, + { + name: "batch processing of check capacity requests where some requests' capacity is not available", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, impossibleCheckCapacityReq, newCheckCapacityCpuProvReq}, + provReqToScaleUp: newCheckCapacityMemProvReq, + scaleUpResult: status.ScaleUpSuccessful, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 3, + }, + { + name: "batch processing of check capacity requests where all requests' capacity is not available", + provReqs: []*provreqwrapper.ProvisioningRequest{impossibleCheckCapacityReq, anotherImpossibleCheckCapacityReq}, + provReqToScaleUp: impossibleCheckCapacityReq, + scaleUpResult: status.ScaleUpNoOptionsAvailable, + batchProcessing: true, + maxBatchSize: 3, + batchTimebox: 5 * time.Minute, + numProvisioned: 2, + }, } for _, tc := range testCases { - tc := tc - allNodes := allNodes t.Run(tc.name, func(t *testing.T) { t.Parallel() @@ -252,7 +403,8 @@ func TestScaleUp(t *testing.T) { } return fmt.Errorf("unexpected scale-up of %s by %d", name, n) } - orchestrator, nodeInfos := setupTest(t, allNodes, tc.provReqs, onScaleUpFunc, tc.autoprovisioning) + client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) + orchestrator, nodeInfos := setupTest(t, client, allNodes, onScaleUpFunc, tc.autoprovisioning, tc.batchProcessing, tc.maxBatchSize, tc.batchTimebox) st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nodeInfos, false) if !tc.err { @@ -263,6 +415,18 @@ func TestScaleUp(t *testing.T) { t.Errorf("noScaleUpInfo: %#v", st.PodsRemainUnschedulable[0].RejectedNodeGroups) } assert.Equal(t, tc.scaleUpResult, st.Result) + + // Stopgap solution to ensure that fake client has time to update the status of the provisioned requests. + time.Sleep(1 * time.Millisecond) + + provReqsAfterScaleUp, err := client.ProvisioningRequests() + assert.NoError(t, err) + assert.Equal(t, len(tc.provReqs), len(provReqsAfterScaleUp)) + + if tc.batchProcessing { + // Since batch processing returns aggregated result, we need to check the number of provisioned requests which have the provisioned condition. + assert.Equal(t, tc.numProvisioned, NumProvisioningRequestsWithCondition(provReqsAfterScaleUp, v1.Provisioned)) + } } else { assert.Error(t, err) } @@ -270,7 +434,7 @@ func TestScaleUp(t *testing.T) { } } -func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.ProvisioningRequest, onScaleUpFunc func(string, int) error, autoprovisioning bool) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) { +func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, nodes []*apiv1.Node, onScaleUpFunc func(string, int) error, autoprovisioning bool, batchProcessing bool, maxBatchSize int, batchTimebox time.Duration) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) { provider := testprovider.NewTestCloudProvider(onScaleUpFunc, nil) if autoprovisioning { machineTypes := []string{"large-machine"} @@ -292,11 +456,18 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio podLister := kube_util.NewTestPodLister(nil) listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil) - autoscalingContext, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, listers, provider, nil, nil) + + options := config.AutoscalingOptions{} + if batchProcessing { + options.CheckCapacityBatchProcessing = true + options.MaxBatchSize = maxBatchSize + options.BatchTimebox = batchTimebox + } + + autoscalingContext, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, nodes, nil) - client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, prs...) processors := NewTestProcessors(&autoscalingContext) if autoprovisioning { processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} @@ -307,15 +478,6 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingContext, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) - options := config.AutoscalingOptions{ - EstimatorName: estimator.BinpackingEstimatorName, - MaxCoresTotal: config.DefaultMaxClusterCores, - MaxMemoryTotal: config.DefaultMaxClusterMemory * units.GiB, - MinCoresTotal: 0, - MinMemoryTotal: 0, - NodeAutoprovisioningEnabled: autoprovisioning, - MaxAutoprovisionedNodeGroupCount: 10, - } estimatorBuilder, _ := estimator.NewEstimatorBuilder( estimator.BinpackingEstimatorName, estimator.NewThresholdBasedEstimationLimiter(nil), @@ -323,13 +485,34 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio nil, ) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingContext.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) clusterState.UpdateNodes(nodes, nodeInfos, now) + var injector *provreq.ProvisioningRequestPodsInjector + if batchProcessing { + injector = provreq.NewFakePodsInjector(client, clocktesting.NewFakePassiveClock(now)) + } + orchestrator := &provReqOrchestrator{ client: client, - provisioningClasses: []ProvisioningClass{checkcapacity.New(client, nil), besteffortatomic.New(client)}, + provisioningClasses: []ProvisioningClass{checkcapacity.New(client, injector), besteffortatomic.New(client)}, } + orchestrator.Initialize(&autoscalingContext, processors, clusterState, estimatorBuilder, taints.TaintConfig{}) return orchestrator, nodeInfos } + +func NumProvisioningRequestsWithCondition(prList []*provreqwrapper.ProvisioningRequest, conditionType string) int { + count := 0 + + for _, pr := range prList { + for _, c := range pr.Status.Conditions { + if c.Type == conditionType { + count++ + break + } + } + } + + return count +}