diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 9d567347a130..f6709ccdfde6 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -295,6 +295,12 @@ type AutoscalingOptions struct { ProvisioningRequestEnabled bool // AsyncNodeGroupsEnabled tells if CA creates/deletes node groups asynchronously. AsyncNodeGroupsEnabled bool + // ProvisioningRequestInitialBackoffTime is the initial time for ProvisioningRequest be considered by CA after failed ScaleUp request. + ProvisioningRequestInitialBackoffTime time.Duration + // ProvisioningRequestMaxBackoffTime is the max time for ProvisioningRequest be considered by CA after failed ScaleUp request. + ProvisioningRequestMaxBackoffTime time.Duration + // ProvisioningRequestMaxCacheSize is the max size for ProvisioningRequest cache that is stored for retry backoff. + ProvisioningRequestMaxBackoffCacheSize int } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index dbfd0cd1bcd8..be9c8bf38a13 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -269,11 +269,14 @@ var ( "--max-graceful-termination-sec flag should not be set when this flag is set. Not setting this flag will use unordered evictor by default."+ "Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+ "Eg. flag usage: '10000:20,1000:100,0:60'") - provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.") - frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed") - asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.") - proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false") - podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.") + provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.") + provisioningRequestInitialBackoffTime = flag.Duration("provisioning-request-initial-backoff-time", 1*time.Minute, "Initial backoff time for ProvisioningRequest retry after failed ScaleUp.") + provisioningRequestMaxBackoffTime = flag.Duration("provisioning-request-max-backoff-time", 10*time.Minute, "Max backoff time for ProvisioningRequest retry after failed ScaleUp.") + provisioningRequestMaxBackoffCacheSize = flag.Int("provisioning-request-max-backoff-cache-size", 1000, "Max size for ProvisioningRequest cache size used for retry backoff mechanism.") + frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed") + asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.") + proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false") + podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.") ) func isFlagPassed(name string) bool { @@ -446,6 +449,9 @@ func createAutoscalingOptions() config.AutoscalingOptions { BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers), ProvisioningRequestEnabled: *provisioningRequestsEnabled, AsyncNodeGroupsEnabled: *asyncNodeGroupsEnabled, + ProvisioningRequestInitialBackoffTime: *provisioningRequestInitialBackoffTime, + ProvisioningRequestMaxBackoffTime: *provisioningRequestMaxBackoffTime, + ProvisioningRequestMaxBackoffCacheSize: *provisioningRequestMaxBackoffCacheSize, } } @@ -524,7 +530,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter return nil, err } opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) - injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig) + injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize) if err != nil { return nil, err } diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index 538563d24cbe..5fc88380bb0a 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -22,7 +22,7 @@ import ( apiv1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" + v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" @@ -33,16 +33,16 @@ import ( "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/utils/clock" -) - -const ( - defaultRetryTime = 10 * time.Minute + "k8s.io/utils/lru" ) // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - client *provreqclient.ProvisioningRequestClient - clock clock.PassiveClock + initialRetryTime time.Duration + maxBackoffTime time.Duration + backoffDuration *lru.Cache + clock clock.PassiveClock + client *provreqclient.ProvisioningRequestClient } // IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently. @@ -53,7 +53,17 @@ func (p *ProvisioningRequestPodsInjector) IsAvailableForProvisioning(pr *provreq } provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned) if provisioned != nil { - if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) { + if provisioned.Status != metav1.ConditionFalse { + return false + } + val, found := p.backoffDuration.Get(key(pr)) + retryTime, ok := val.(time.Duration) + if !found || !ok { + retryTime = p.initialRetryTime + } + if provisioned.LastTransitionTime.Add(retryTime).Before(p.clock.Now()) { + p.backoffDuration.Remove(key(pr)) + p.backoffDuration.Add(key(pr), min(2*retryTime, p.maxBackoffTime)) return true } return false @@ -87,29 +97,31 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( if err != nil { return nil, err } - for _, pr := range provReqs { if !isSupportedClass(pr) { continue } + conditions := pr.Status.Conditions + if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) { + p.backoffDuration.Remove(key(pr)) + continue + } - //TODO(yaroslava): support exponential backoff // Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime if !p.IsAvailableForProvisioning(pr) { continue } - podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr) + provreqpods, err := provreqpods.PodsForProvisioningRequest(pr) if err != nil { klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name) p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error()) continue } - if err := p.MarkAsAccepted(pr); err != nil { continue } - return podsFromProvReq, nil + return provreqpods, nil } return nil, nil } @@ -122,6 +134,9 @@ func (p *ProvisioningRequestPodsInjector) Process( podsFromProvReq, err := p.GetPodsFromNextRequest( func(pr *provreqwrapper.ProvisioningRequest) bool { _, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName] + if !found { + klog.Warningf("Provisioning Class %s is not supported for ProvReq %s/%s", pr.Spec.ProvisioningClassName, pr.Namespace, pr.Name) + } return found }) @@ -136,10 +151,14 @@ func (p *ProvisioningRequestPodsInjector) Process( func (p *ProvisioningRequestPodsInjector) CleanUp() {} // NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor. -func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) { +func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (pods.PodListProcessor, error) { client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) if err != nil { return nil, err } - return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil + return &ProvisioningRequestPodsInjector{initialRetryTime: initialBackoffTime, maxBackoffTime: maxBackoffTime, backoffDuration: lru.New(maxCacheSize), client: client, clock: clock.RealClock{}}, nil +} + +func key(pr *provreqwrapper.ProvisioningRequest) string { + return string(pr.UID) } diff --git a/cluster-autoscaler/processors/provreq/injector_test.go b/cluster-autoscaler/processors/provreq/injector_test.go index 533c2b979bdf..37e4e720168d 100644 --- a/cluster-autoscaler/processors/provreq/injector_test.go +++ b/cluster-autoscaler/processors/provreq/injector_test.go @@ -23,15 +23,16 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" + v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" clock "k8s.io/utils/clock/testing" + "k8s.io/utils/lru" ) func TestProvisioningRequestPodsInjector(t *testing.T) { now := time.Now() - minAgo := now.Add(-1 * time.Minute) + minAgo := now.Add(-1 * time.Minute).Add(-1 * time.Second) hourAgo := now.Add(-1 * time.Hour) accepted := metav1.Condition{ @@ -104,11 +105,15 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { }, { name: "Provisioned=True, no pods are injected", - provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB}, + provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq}, + }, + { + name: "Provisioned=False, ProvReq is backed off, no pods are injected", + provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedRecentlyProvReqB}, }, { name: "Provisioned=Unknown, no pods are injected", - provReqs: []*provreqwrapper.ProvisioningRequest{unknownProvisionedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB}, + provReqs: []*provreqwrapper.ProvisioningRequest{unknownProvisionedProvReqB, failedProvReq}, }, { name: "ProvisionedClass is unknown, no pods are injected", @@ -124,7 +129,9 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { } for _, tc := range testCases { client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) - injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)} + backoffTime := lru.New(100) + backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute) + injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client} getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount)) if err != nil { t.Errorf("%s failed: injector.Process return error %v", tc.name, err)