Skip to content

Commit

Permalink
Fix nits and add rationale behind using injector for recordng LastPro…
Browse files Browse the repository at this point in the history
…visioningRequestProcessTime
  • Loading branch information
Duke0404 committed Sep 16, 2024
1 parent d3bb89d commit 53115f8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
14 changes: 9 additions & 5 deletions cluster-autoscaler/loop/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,24 @@ type scalingTimesGetter interface {
LastScaleDownDeleteTime() time.Time
}

type provisioningRequestProcessTimeGetter interface {
LastProvisioningRequestProcessedTime() time.Time
// provisioningRequestProcessingTimesGetter exposes recent provisioning request processing activity regardless of wether the
// ProvisioningRequest was marked as accepted or failed. This is because a ProvisioningRequest being processed indicates that
// there are other ProvisioningRequests that require processing regardless of the outcome of the current one. Thus, the next iteration
// should be started immediately.
type provisioningRequestProcessingTimesGetter interface {
LastProvisioningRequestProcessTime() time.Time
}

// LoopTrigger object implements criteria used to start new autoscaling iteration
type LoopTrigger struct {
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
provisioningRequestProcessTimeGetter provisioningRequestProcessTimeGetter
provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter
}

// NewLoopTrigger creates a LoopTrigger object
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration, provisioningRequestProcessTimeGetter provisioningRequestProcessTimeGetter) *LoopTrigger {
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter, scanInterval time.Duration) *LoopTrigger {
return &LoopTrigger{
podObserver: podObserver,
scanInterval: scanInterval,
Expand All @@ -74,7 +78,7 @@ func (t *LoopTrigger) Wait(lastRun time.Time) {
// immediately if the previous one was productive.
if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) ||
!t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) ||
!t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessedTime().Before(lastRun) {
!t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessTime().Before(lastRun) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
Expand Down
13 changes: 8 additions & 5 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
}()
}

func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter, autoscalingOptions config.AutoscalingOptions, restConfig *rest.Config, injector pods.PodListProcessor) (core.Autoscaler, error) {
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter, autoscalingOptions config.AutoscalingOptions, restConfig *rest.Config, ProvisioningRequestInjector pods.PodListProcessor) (core.Autoscaler, error) {
kubeClient := kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts)

// Informer transform to trim ManagedFields for memory efficiency.
Expand Down Expand Up @@ -519,7 +519,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(ProvisioningRequestInjector)
podListProcessor.AddProcessor(provreqProcesor)
}

Expand Down Expand Up @@ -607,12 +607,12 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)

injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
ProvisioningRequestInjector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
klog.Fatalf("Failed to create provisioning request pods injector: %v", err)
}

autoscaler, err := buildAutoscaler(debuggingSnapshotter, autoscalingOptions, restConfig, injector)
autoscaler, err := buildAutoscaler(debuggingSnapshotter, autoscalingOptions, restConfig, ProvisioningRequestInjector)
if err != nil {
klog.Fatalf("Failed to create autoscaler: %v", err)
}
Expand All @@ -633,7 +633,10 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
defer cancel()
if *frequentLoopsEnabled {
podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts))
trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval, injector)
// A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a
// ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods
// also marks the ProvisioningRequest as accepted or failed.
trigger := loop.NewLoopTrigger(podObserver, autoscaler, ProvisioningRequestInjector, *scanInterval)
lastRun := time.Now()
for {
trigger.Wait(lastRun)
Expand Down
5 changes: 2 additions & 3 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
// "k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
Expand Down Expand Up @@ -147,7 +146,7 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (*ProvisioningR
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
}

// LastProvisioningRequestProcessedTime returns the time when the last provisioning request was processed.
func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessedTime() time.Time {
// LastProvisioningRequestProcessTime returns the time when the last provisioning request was processed.
func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessTime() time.Time {
return p.lastProvisioningRequestProcessedTime
}

0 comments on commit 53115f8

Please sign in to comment.