Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff mechanism for ProvReq retry #7182

Merged
merged 5 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ type AutoscalingOptions struct {
ProvisioningRequestEnabled bool
// AsyncNodeGroupsEnabled tells if CA creates/deletes node groups asynchronously.
AsyncNodeGroupsEnabled bool
// ProvisioningRequestInitialBackoffTime is the initial time for ProvisioningRequest be considered by CA after failed ScaleUp request.
ProvisioningRequestInitialBackoffTime time.Duration
// ProvisioningRequestMaxBackoffTime is the max time for ProvisioningRequest be considered by CA after failed ScaleUp request.
ProvisioningRequestMaxBackoffTime time.Duration
// ProvisioningRequestMaxCacheSize is the max size for ProvisioningRequest cache that is stored for retry backoff.
ProvisioningRequestMaxBackoffCacheSize int
}

// KubeClientOptions specify options for kube client
Expand Down
18 changes: 12 additions & 6 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,14 @@ var (
"--max-graceful-termination-sec flag should not be set when this flag is set. Not setting this flag will use unordered evictor by default."+
"Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+
"Eg. flag usage: '10000:20,1000:100,0:60'")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
provisioningRequestInitialBackoffTime = flag.Duration("provisioning-request-initial-backoff-time", 1*time.Minute, "Initial backoff time for ProvisioningRequest retry after failed ScaleUp.")
provisioningRequestMaxBackoffTime = flag.Duration("provisioning-request-max-backoff-time", 10*time.Minute, "Max backoff time for ProvisioningRequest retry after failed ScaleUp.")
provisioningRequestMaxBackoffCacheSize = flag.Int("provisioning-request-max-backoff-cache-size", 1000, "Max size for ProvisioningRequest cache size used for retry backoff mechanism.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -446,6 +449,9 @@ func createAutoscalingOptions() config.AutoscalingOptions {
BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers),
ProvisioningRequestEnabled: *provisioningRequestsEnabled,
AsyncNodeGroupsEnabled: *asyncNodeGroupsEnabled,
ProvisioningRequestInitialBackoffTime: *provisioningRequestInitialBackoffTime,
ProvisioningRequestMaxBackoffTime: *provisioningRequestMaxBackoffTime,
ProvisioningRequestMaxBackoffCacheSize: *provisioningRequestMaxBackoffCacheSize,
}
}

Expand Down Expand Up @@ -524,7 +530,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize)
if err != nil {
return nil, err
}
Expand Down
49 changes: 34 additions & 15 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
apiv1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
Expand All @@ -33,16 +33,16 @@ import (
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

const (
defaultRetryTime = 10 * time.Minute
"k8s.io/utils/lru"
)

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
initialRetryTime time.Duration
maxBackoffTime time.Duration
backoffDuration *lru.Cache
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand All @@ -53,7 +53,17 @@ func (p *ProvisioningRequestPodsInjector) IsAvailableForProvisioning(pr *provreq
}
provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned)
if provisioned != nil {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) {
if provisioned.Status != metav1.ConditionFalse {
return false
}
val, found := p.backoffDuration.Get(key(pr))
retryTime, ok := val.(time.Duration)
if !found || !ok {
retryTime = p.initialRetryTime
}
if provisioned.LastTransitionTime.Add(retryTime).Before(p.clock.Now()) {
p.backoffDuration.Remove(key(pr))
p.backoffDuration.Add(key(pr), min(2*retryTime, p.maxBackoffTime))
return true
}
return false
Expand Down Expand Up @@ -87,29 +97,31 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
if err != nil {
return nil, err
}

for _, pr := range provReqs {
if !isSupportedClass(pr) {
continue
}
conditions := pr.Status.Conditions
if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) {
p.backoffDuration.Remove(key(pr))
continue
}

//TODO(yaroslava): support exponential backoff
// Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime
if !p.IsAvailableForProvisioning(pr) {
continue
}

podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr)
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name)
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
continue
}

if err := p.MarkAsAccepted(pr); err != nil {
continue
}
return podsFromProvReq, nil
return provreqpods, nil
}
return nil, nil
}
Expand All @@ -122,6 +134,9 @@ func (p *ProvisioningRequestPodsInjector) Process(
podsFromProvReq, err := p.GetPodsFromNextRequest(
func(pr *provreqwrapper.ProvisioningRequest) bool {
_, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]
if !found {
klog.Warningf("Provisioning Class %s is not supported for ProvReq %s/%s", pr.Spec.ProvisioningClassName, pr.Namespace, pr.Name)
}
return found
})

Expand All @@ -136,10 +151,14 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (pods.PodListProcessor, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
return &ProvisioningRequestPodsInjector{initialRetryTime: initialBackoffTime, maxBackoffTime: maxBackoffTime, backoffDuration: lru.New(maxCacheSize), client: client, clock: clock.RealClock{}}, nil
}

func key(pr *provreqwrapper.ProvisioningRequest) string {
return string(pr.UID)
}
17 changes: 12 additions & 5 deletions cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (

apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
clock "k8s.io/utils/clock/testing"
"k8s.io/utils/lru"
)

func TestProvisioningRequestPodsInjector(t *testing.T) {
now := time.Now()
minAgo := now.Add(-1 * time.Minute)
minAgo := now.Add(-1 * time.Minute).Add(-1 * time.Second)
hourAgo := now.Add(-1 * time.Hour)

accepted := metav1.Condition{
Expand Down Expand Up @@ -104,11 +105,15 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
},
{
name: "Provisioned=True, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq},
},
{
name: "Provisioned=False, ProvReq is backed off, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedRecentlyProvReqB},
},
{
name: "Provisioned=Unknown, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{unknownProvisionedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
provReqs: []*provreqwrapper.ProvisioningRequest{unknownProvisionedProvReqB, failedProvReq},
},
{
name: "ProvisionedClass is unknown, no pods are injected",
Expand All @@ -124,7 +129,9 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)}
backoffTime := lru.New(100)
backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute)
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down
Loading