Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add support for frequent loops when provisioningrequest is encountered in last iteration" #7410

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 15 additions & 43 deletions cluster-autoscaler/loop/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,19 @@ type scalingTimesGetter interface {
LastScaleDownDeleteTime() time.Time
}

// provisioningRequestProcessingTimesGetter exposes recent provisioning request processing activity regardless of wether the
// ProvisioningRequest was marked as accepted or failed. This is because a ProvisioningRequest being processed indicates that
// there are other ProvisioningRequests that require processing regardless of the outcome of the current one. Thus, the next iteration
// should be started immediately.
type provisioningRequestProcessingTimesGetter interface {
LastProvisioningRequestProcessTime() time.Time
}

// LoopTrigger object implements criteria used to start new autoscaling iteration
type LoopTrigger struct {
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
}

// NewLoopTrigger creates a LoopTrigger object
func NewLoopTrigger(scalingTimesGetter scalingTimesGetter, provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter, podObserver *UnschedulablePodObserver, scanInterval time.Duration) *LoopTrigger {
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger {
return &LoopTrigger{
podObserver: podObserver,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
provisioningRequestProcessTimeGetter: provisioningRequestProcessTimeGetter,
podObserver: podObserver,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
}
}

Expand All @@ -76,18 +66,14 @@ func (t *LoopTrigger) Wait(lastRun time.Time) {

// To improve scale-up throughput, Cluster Autoscaler starts new iteration
// immediately if the previous one was productive.
if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) {
t.logTriggerReason("Autoscaler loop triggered immediately after a scale up")
return
}

if !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) {
t.logTriggerReason("Autoscaler loop triggered immediately after a scale down")
return
}

if t.provisioningRequestWasProcessed(lastRun) {
t.logTriggerReason("Autoscaler loop triggered immediately after a provisioning request was processed")
if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) ||
!t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
default:
klog.Infof("Autoscaler loop triggered immediately after a productive iteration")
}
return
}

Expand Down Expand Up @@ -132,20 +118,6 @@ func StartPodObserver(ctx context.Context, kubeClient kube_client.Interface) *Un
}
}

// logTriggerReason logs a message if the next iteration was not triggered by unschedulable pods appearing, else it logs a message that the next iteration was triggered by unschedulable pods appearing
func (t *LoopTrigger) logTriggerReason(message string) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
default:
klog.Infof(message)
}
}

func (t *LoopTrigger) provisioningRequestWasProcessed(lastRun time.Time) bool {
return t.provisioningRequestProcessTimeGetter != nil && !t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessTime().Before(lastRun)
}

// isRecentUnschedulablePod checks if the object is an unschedulable pod observed recently.
func isRecentUnschedulablePod(obj any) bool {
pod, ok := obj.(*apiv1.Pod)
Expand Down
32 changes: 13 additions & 19 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
}()
}

func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) {
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
// Create basic config from flags.
autoscalingOptions := createAutoscalingOptions()

Expand All @@ -487,7 +487,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot

predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(informerFactory, autoscalingOptions.SchedulerConfig)
if err != nil {
return nil, nil, err
return nil, err
}
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)
Expand All @@ -508,14 +508,13 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker, scheduling.ScheduleAnywhere)

var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector
if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, nil, err
return nil, err
}
provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{
checkcapacity.New(client),
Expand All @@ -526,11 +525,11 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
ProvisioningRequestInjector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize)
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize)
if err != nil {
return nil, nil, err
return nil, err
}
podListProcessor.AddProcessor(ProvisioningRequestInjector)
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(provreqProcesor)
}

Expand Down Expand Up @@ -595,30 +594,21 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
// Create autoscaler.
autoscaler, err := core.NewAutoscaler(opts, informerFactory)
if err != nil {
return nil, nil, err
return nil, err
}

// Start informers. This must come after fully constructing the autoscaler because
// additional informers might have been registered in the factory during NewAutoscaler.
stop := make(chan struct{})
informerFactory.Start(stop)

podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts))

// A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a
// ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods
// also marks the ProvisioningRequest as accepted or failed.
trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, podObserver, *scanInterval)

return autoscaler, trigger, nil
return autoscaler, nil
}

func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
metrics.RegisterAll(*emitPerNodeGroupMetrics)
context, cancel := ctx.WithCancel(ctx.Background())
defer cancel()

autoscaler, trigger, err := buildAutoscaler(context, debuggingSnapshotter)
autoscaler, err := buildAutoscaler(debuggingSnapshotter)
if err != nil {
klog.Fatalf("Failed to create autoscaler: %v", err)
}
Expand All @@ -635,7 +625,11 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
}

// Autoscale ad infinitum.
context, cancel := ctx.WithCancel(ctx.Background())
defer cancel()
if *frequentLoopsEnabled {
podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts))
trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval)
lastRun := time.Now()
for {
trigger.Wait(lastRun)
Expand Down
26 changes: 9 additions & 17 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
Expand All @@ -37,12 +38,11 @@ import (

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
initialRetryTime time.Duration
maxBackoffTime time.Duration
backoffDuration *lru.Cache
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
lastProvisioningRequestProcessTime time.Time
initialRetryTime time.Duration
maxBackoffTime time.Duration
backoffDuration *lru.Cache
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand Down Expand Up @@ -78,7 +78,6 @@ func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.Prov
klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
return err
}
p.lastProvisioningRequestProcessTime = p.clock.Now()
return nil
}

Expand All @@ -88,7 +87,6 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
}
p.lastProvisioningRequestProcessTime = p.clock.Now()
}

// GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it.
Expand All @@ -114,7 +112,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
continue
}

podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr)
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name)
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
Expand All @@ -123,8 +121,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
if err := p.MarkAsAccepted(pr); err != nil {
continue
}

return podsFromProvReq, nil
return provreqpods, nil
}
return nil, nil
}
Expand Down Expand Up @@ -154,7 +151,7 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (*ProvisioningRequestPodsInjector, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (pods.PodListProcessor, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
Expand All @@ -165,8 +162,3 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffT
func key(pr *provreqwrapper.ProvisioningRequest) string {
return string(pr.UID)
}

// LastProvisioningRequestProcessTime returns the time when the last provisioning request was processed.
func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessTime() time.Time {
return p.lastProvisioningRequestProcessTime
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
backoffTime := lru.New(100)
backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute)
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now}
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down
Loading