Skip to content

Commit

Permalink
Implement unit tests for batch processing of check capacity class
Browse files Browse the repository at this point in the history
  • Loading branch information
Duke0404 committed Sep 16, 2024
1 parent d7e6883 commit 11703e9
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 18 deletions.
27 changes: 27 additions & 0 deletions cluster-autoscaler/processors/provreq/testutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/utils/clock/testing"
)

// NewFakePodsInjector creates a new instance of ProvisioningRequestPodsInjector with the given client and clock for testing.
func NewFakePodsInjector(client *provreqclient.ProvisioningRequestClient, clock *testing.FakePassiveClock) *ProvisioningRequestPodsInjector {
return &ProvisioningRequestPodsInjector{client: client, clock: clock}
}
219 changes: 201 additions & 18 deletions cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
Expand All @@ -44,8 +45,9 @@ import (
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
"k8s.io/client-go/kubernetes/fake"
clocktesting "k8s.io/utils/clock/testing"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

Expand Down Expand Up @@ -78,6 +80,15 @@ func TestScaleUp(t *testing.T) {
Class: v1.ProvisioningClassCheckCapacity,
})

anotherCheckCapacityCpuProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
provreqwrapper.TestProvReqOptions{
Name: "anotherCheckCapacityCpuProvReq",
CPU: "5m",
Memory: "5",
PodCount: int32(100),
Class: v1.ProvisioningClassCheckCapacity,
})

newCheckCapacityMemProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
provreqwrapper.TestProvReqOptions{
Name: "newCheckCapacityMemProvReq",
Expand All @@ -86,6 +97,23 @@ func TestScaleUp(t *testing.T) {
PodCount: int32(100),
Class: v1.ProvisioningClassCheckCapacity,
})
impossibleCheckCapacityReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
provreqwrapper.TestProvReqOptions{
Name: "impossibleCheckCapacityRequest",
CPU: "1m",
Memory: "1",
PodCount: int32(5001),
Class: v1.ProvisioningClassCheckCapacity,
})

anotherImpossibleCheckCapacityReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
provreqwrapper.TestProvReqOptions{
Name: "anotherImpossibleCheckCapacityRequest",
CPU: "1m",
Memory: "1",
PodCount: int32(5001),
Class: v1.ProvisioningClassCheckCapacity,
})

// Active atomic scale up requests.
atomicScaleUpProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
Expand Down Expand Up @@ -169,6 +197,10 @@ func TestScaleUp(t *testing.T) {
scaleUpResult status.ScaleUpResult
autoprovisioning bool
err bool
batchProcessing bool
maxBatchSize int
batchTimebox time.Duration
numProvisioned int
}{
{
name: "no ProvisioningRequests",
Expand Down Expand Up @@ -236,10 +268,129 @@ func TestScaleUp(t *testing.T) {
autoprovisioning: true,
scaleUpResult: status.ScaleUpSuccessful,
},
// Batch processing tests
{
name: "batch processing of check capacity requests with one request",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 1,
},
{
name: "batch processing of check capacity requests with less requests than max batch size",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
{
name: "batch processing of check capacity requests with requests equal to max batch size",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 2,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
{
name: "batch processing of check capacity requests with more requests than max batch size",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq, anotherCheckCapacityCpuProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 2,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
{
name: "batch processing of check capacity requests where cluster contains already provisioned requests",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, bookedCapacityProvReq, anotherCheckCapacityCpuProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 2,
batchTimebox: 5 * time.Minute,
numProvisioned: 3,
},
{
name: "batch processing of check capacity requests where timebox is exceeded",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 5,
batchTimebox: 1 * time.Nanosecond,
numProvisioned: 1,
},
{
name: "batch processing of check capacity requests where max batch size is invalid",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 0,
batchTimebox: 5 * time.Minute,
numProvisioned: 1,
},
{
name: "batch processing of check capacity requests where best effort atomic scale-up request is also present in cluster",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, newCheckCapacityCpuProvReq, atomicScaleUpProvReq},
provReqToScaleUp: newCheckCapacityMemProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 2,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
{
name: "process atomic scale-up requests where batch processing of check capacity requests is enabled",
provReqs: []*provreqwrapper.ProvisioningRequest{possibleAtomicScaleUpReq},
provReqToScaleUp: possibleAtomicScaleUpReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 1,
},
{
name: "process atomic scale-up requests where batch processing of check capacity requests is enabled and check capacity requests are present in cluster",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, newCheckCapacityCpuProvReq, atomicScaleUpProvReq},
provReqToScaleUp: atomicScaleUpProvReq,
scaleUpResult: status.ScaleUpNotNeeded,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 1,
},
{
name: "batch processing of check capacity requests where some requests' capacity is not available",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, impossibleCheckCapacityReq, newCheckCapacityCpuProvReq},
provReqToScaleUp: newCheckCapacityMemProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 3,
},
{
name: "batch processing of check capacity requests where all requests' capacity is not available",
provReqs: []*provreqwrapper.ProvisioningRequest{impossibleCheckCapacityReq, anotherImpossibleCheckCapacityReq},
provReqToScaleUp: impossibleCheckCapacityReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
}
for _, tc := range testCases {
tc := tc
allNodes := allNodes
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

Expand All @@ -252,7 +403,8 @@ func TestScaleUp(t *testing.T) {
}
return fmt.Errorf("unexpected scale-up of %s by %d", name, n)
}
orchestrator, nodeInfos := setupTest(t, allNodes, tc.provReqs, onScaleUpFunc, tc.autoprovisioning)
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
orchestrator, nodeInfos := setupTest(t, client, allNodes, onScaleUpFunc, tc.autoprovisioning, tc.batchProcessing, tc.maxBatchSize, tc.batchTimebox)

st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nodeInfos, false)
if !tc.err {
Expand All @@ -263,14 +415,26 @@ func TestScaleUp(t *testing.T) {
t.Errorf("noScaleUpInfo: %#v", st.PodsRemainUnschedulable[0].RejectedNodeGroups)
}
assert.Equal(t, tc.scaleUpResult, st.Result)

// Stopgap solution to ensure that fake client has time to update the status of the provisioned requests.
time.Sleep(1 * time.Millisecond)

provReqsAfterScaleUp, err := client.ProvisioningRequests()
assert.NoError(t, err)
assert.Equal(t, len(tc.provReqs), len(provReqsAfterScaleUp))

if tc.batchProcessing {
// Since batch processing returns aggregated result, we need to check the number of provisioned requests which have the provisioned condition.
assert.Equal(t, tc.numProvisioned, NumProvisioningRequestsWithCondition(provReqsAfterScaleUp, v1.Provisioned))
}
} else {
assert.Error(t, err)
}
})
}
}

func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.ProvisioningRequest, onScaleUpFunc func(string, int) error, autoprovisioning bool) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) {
func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, nodes []*apiv1.Node, onScaleUpFunc func(string, int) error, autoprovisioning bool, batchProcessing bool, maxBatchSize int, batchTimebox time.Duration) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) {
provider := testprovider.NewTestCloudProvider(onScaleUpFunc, nil)
if autoprovisioning {
machineTypes := []string{"large-machine"}
Expand All @@ -292,11 +456,18 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio

podLister := kube_util.NewTestPodLister(nil)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
autoscalingContext, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, listers, provider, nil, nil)

options := config.AutoscalingOptions{}
if batchProcessing {
options.CheckCapacityBatchProcessing = true
options.MaxBatchSize = maxBatchSize
options.BatchTimebox = batchTimebox
}

autoscalingContext, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)

clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, nodes, nil)
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, prs...)
processors := NewTestProcessors(&autoscalingContext)
if autoprovisioning {
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
Expand All @@ -307,29 +478,41 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingContext, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)

options := config.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory * units.GiB,
MinCoresTotal: 0,
MinMemoryTotal: 0,
NodeAutoprovisioningEnabled: autoprovisioning,
MaxAutoprovisionedNodeGroupCount: 10,
}
estimatorBuilder, _ := estimator.NewEstimatorBuilder(
estimator.BinpackingEstimatorName,
estimator.NewThresholdBasedEstimationLimiter(nil),
estimator.NewDecreasingPodOrderer(),
nil,
)

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingContext.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
clusterState.UpdateNodes(nodes, nodeInfos, now)

var injector *provreq.ProvisioningRequestPodsInjector
if batchProcessing {
injector = provreq.NewFakePodsInjector(client, clocktesting.NewFakePassiveClock(now))
}

orchestrator := &provReqOrchestrator{
client: client,
provisioningClasses: []ProvisioningClass{checkcapacity.New(client, nil), besteffortatomic.New(client)},
provisioningClasses: []ProvisioningClass{checkcapacity.New(client, injector), besteffortatomic.New(client)},
}

orchestrator.Initialize(&autoscalingContext, processors, clusterState, estimatorBuilder, taints.TaintConfig{})
return orchestrator, nodeInfos
}

func NumProvisioningRequestsWithCondition(prList []*provreqwrapper.ProvisioningRequest, conditionType string) int {
count := 0

for _, pr := range prList {
for _, c := range pr.Status.Conditions {
if c.Type == conditionType {
count++
break
}
}
}

return count
}

0 comments on commit 11703e9

Please sign in to comment.