Skip to content

Commit

Permalink
use informers for pod events instead of Listing
Browse files Browse the repository at this point in the history
  • Loading branch information
hakuna-matatah committed Nov 16, 2023
1 parent 69ceb1b commit c3bd24d
Showing 1 changed file with 74 additions and 37 deletions.
111 changes: 74 additions & 37 deletions clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func createPodStartupLatencyMeasurement() measurement.Measurement {
podStartupEntries: measurementutil.NewObjectTransitionTimes(podStartupLatencyMeasurementName),
podMetadata: measurementutil.NewPodsMetadata(podStartupLatencyMeasurementName),
eventQueue: workqueue.New(),
schedEventQueue: workqueue.New(),
}
}

Expand All @@ -70,12 +71,14 @@ type eventData struct {
}

type podStartupLatencyMeasurement struct {
selector *util.ObjectSelector
isRunning bool
stopCh chan struct{}
selector *util.ObjectSelector
isRunning bool
stopCh chan struct{}
stopSchedCh chan struct{}
// This queue can potentially grow indefinitely, beacause we put all changes here.
// Usually it's not recommended pattern, but we need it for measuring PodStartupLatency.
eventQueue *workqueue.Type
schedEventQueue *workqueue.Type
podStartupEntries *measurementutil.ObjectTransitionTimes
podMetadata *measurementutil.PodsMetadata
threshold time.Duration
Expand All @@ -91,7 +94,7 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me
if err != nil {
return nil, err
}

schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName)
switch action {
case "start":
if err := p.selector.Parse(config.Params); err != nil {
Expand All @@ -101,9 +104,8 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me
if err != nil {
return nil, err
}
return nil, p.start(config.ClusterFramework.GetClientSets().GetClient())
return nil, p.start(config.ClusterFramework.GetClientSets().GetClient(), schedulerName)
case "gather":
schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName)
if err != nil {
return nil, err
}
Expand All @@ -124,7 +126,7 @@ func (p *podStartupLatencyMeasurement) String() string {
return podStartupLatencyMeasurementName + ": " + p.selector.String()
}

func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
func (p *podStartupLatencyMeasurement) start(c clientset.Interface, schedulerName string) error {
if p.isRunning {
klog.V(2).Infof("%s: pod startup latancy measurement already running", p)
return nil
Expand All @@ -146,6 +148,29 @@ func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
p.addEvent,
)
go p.processEvents()

selector := fields.Set{
"involvedObject.kind": "Pod",
"source": schedulerName,
}.AsSelector().String()

p.stopSchedCh = make(chan struct{})

e := informer.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = selector
return c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = selector
return c.CoreV1().Events(p.selector.Namespace).Watch(context.TODO(), options)
},
},
p.addSchedEvent,
)
go p.processSchedEvents()
go e.Run(p.stopSchedCh)
return informer.StartAndSync(i, p.stopCh, informerSyncTimeout)
}

Expand All @@ -154,11 +179,36 @@ func (p *podStartupLatencyMeasurement) addEvent(_, obj interface{}) {
p.eventQueue.Add(event)
}

func (p *podStartupLatencyMeasurement) addSchedEvent(_, obj interface{}) {
event := &eventData{obj: obj, recvTime: time.Now()}
p.schedEventQueue.Add(event)
}

func (p *podStartupLatencyMeasurement) processEvents() {
for p.processNextWorkItem() {
}
}

func (p *podStartupLatencyMeasurement) processSchedEvents() {
for p.processNextSchedWorkItem() {
}
}

func (p *podStartupLatencyMeasurement) processNextSchedWorkItem() bool {
item, quit := p.schedEventQueue.Get()
if quit {
return false
}
defer p.schedEventQueue.Done(item)
event, ok := item.(*eventData)
if !ok {
klog.Warningf("Couldn't convert work item to eventData: %v", item)
return true
}
p.processSchedEvent(event)
return true
}

func (p *podStartupLatencyMeasurement) processNextWorkItem() bool {
item, quit := p.eventQueue.Get()
if quit {
Expand All @@ -179,7 +229,9 @@ func (p *podStartupLatencyMeasurement) stop() {
if p.isRunning {
p.isRunning = false
close(p.stopCh)
close(p.stopSchedCh)
p.eventQueue.ShutDown()
p.schedEventQueue.ShutDown()
}
}

Expand Down Expand Up @@ -230,10 +282,6 @@ func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier

p.stop()

if err := p.gatherScheduleTimes(c, schedulerName); err != nil {
return nil, err
}

checks := []podStartupLatencyCheck{
{
namePrefix: "",
Expand Down Expand Up @@ -270,33 +318,23 @@ func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier
return summaries, err
}

// TODO(#2006): gatherScheduleTimes is currently listing events at the end of the test.
//
// Given that events by default have 1h TTL, for measurements across longer periods
// it just returns incomplete results.
// Given that we don't 100% accuracy, we should switch to a mechanism that is similar
// to the one that slo-monitor is using (added in #1477).
func (p *podStartupLatencyMeasurement) gatherScheduleTimes(c clientset.Interface, schedulerName string) error {
selector := fields.Set{
"involvedObject.kind": "Pod",
"source": schedulerName,
}.AsSelector().String()
options := metav1.ListOptions{FieldSelector: selector}
schedEvents, err := c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), options)
if err != nil {
return err
func (p *podStartupLatencyMeasurement) processSchedEvent(event *eventData) {

obj := event.obj
if obj == nil {
return
}
for _, event := range schedEvents.Items {
key := createMetaNamespaceKey(event.InvolvedObject.Namespace, event.InvolvedObject.Name)
if _, exists := p.podStartupEntries.Get(key, createPhase); exists {
if !event.EventTime.IsZero() {
p.podStartupEntries.Set(key, schedulePhase, event.EventTime.Time)
} else {
p.podStartupEntries.Set(key, schedulePhase, event.FirstTimestamp.Time)
}
}
e, ok := obj.(*corev1.Event)
if !ok {
return
}
key := createMetaNamespaceKey(e.InvolvedObject.Namespace, e.InvolvedObject.Name)

if !e.EventTime.IsZero() {
p.podStartupEntries.Set(key, schedulePhase, e.EventTime.Time)
} else {
p.podStartupEntries.Set(key, schedulePhase, e.FirstTimestamp.Time)
}
return nil
}

func (p *podStartupLatencyMeasurement) processEvent(event *eventData) {
Expand All @@ -311,7 +349,6 @@ func (p *podStartupLatencyMeasurement) processEvent(event *eventData) {

key := createMetaNamespaceKey(pod.Namespace, pod.Name)
p.podMetadata.SetStateless(key, isPodStateless(pod))

if pod.Status.Phase == corev1.PodRunning {
if _, found := p.podStartupEntries.Get(key, createPhase); !found {
p.podStartupEntries.Set(key, watchPhase, recvTime)
Expand Down

0 comments on commit c3bd24d

Please sign in to comment.