diff --git a/pkg/apply/event/event.go b/pkg/apply/event/event.go index 12de4751..85c70fba 100644 --- a/pkg/apply/event/event.go +++ b/pkg/apply/event/event.go @@ -21,6 +21,7 @@ const ( StatusType PruneType DeleteType + WaitType ) // Event is the type of the objects that will be returned through @@ -58,6 +59,9 @@ type Event struct { // DeleteEvent contains information about object that have been // deleted. DeleteEvent DeleteEvent + + // WaitEvent contains information about any errors encountered in a WaitTask. + WaitEvent WaitEvent } type InitEvent struct { @@ -85,6 +89,11 @@ type ErrorEvent struct { Err error } +type WaitEvent struct { + GroupName string + Error error +} + //go:generate stringer -type=ActionGroupEventType type ActionGroupEventType int diff --git a/pkg/apply/event/type_string.go b/pkg/apply/event/type_string.go index b37aa8b6..df9ac414 100644 --- a/pkg/apply/event/type_string.go +++ b/pkg/apply/event/type_string.go @@ -15,11 +15,12 @@ func _() { _ = x[StatusType-4] _ = x[PruneType-5] _ = x[DeleteType-6] + _ = x[WaitType-7] } -const _Type_name = "InitTypeErrorTypeActionGroupTypeApplyTypeStatusTypePruneTypeDeleteType" +const _Type_name = "InitTypeErrorTypeActionGroupTypeApplyTypeStatusTypePruneTypeDeleteTypeWaitType" -var _Type_index = [...]uint8{0, 8, 17, 32, 41, 51, 60, 70} +var _Type_index = [...]uint8{0, 8, 17, 32, 41, 51, 60, 70, 78} func (i Type) String() string { if i < 0 || i >= Type(len(_Type_index)-1) { diff --git a/pkg/apply/taskrunner/runner.go b/pkg/apply/taskrunner/runner.go index b7c4a69d..35f0f937 100644 --- a/pkg/apply/taskrunner/runner.go +++ b/pkg/apply/taskrunner/runner.go @@ -219,8 +219,9 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task, } } // A message on the taskChannel means that the current task - // has either completed or failed. If it has failed, we return - // the error. If the abort flag is true, which means something + // has either completed or failed. + // If it has failed, we return the error. + // If the abort flag is true, which means something // else has gone wrong and we are waiting for the current task to // finish, we exit. // If everything is ok, we fetch and start the next task. @@ -235,7 +236,6 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task, }, } if msg.Err != nil { - b.amendTimeoutError(taskContext, msg.Err) return msg.Err } if abort { @@ -258,24 +258,6 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task, } } -func (b *baseRunner) amendTimeoutError(taskContext *TaskContext, err error) { - if timeoutErr, ok := err.(*TimeoutError); ok { - var timedOutResources []TimedOutResource - for _, id := range timeoutErr.Identifiers { - result := taskContext.ResourceCache().Get(id) - if timeoutErr.Condition.Meets(result.Status) { - continue - } - timedOutResources = append(timedOutResources, TimedOutResource{ - Identifier: id, - Status: result.Status, - Message: result.StatusMessage, - }) - } - timeoutErr.TimedOutResources = timedOutResources - } -} - // completeIfWaitTask checks if the current task is a wait task. If so, // we invoke the complete function to complete it. func completeIfWaitTask(currentTask Task, taskContext *TaskContext) { diff --git a/pkg/apply/taskrunner/runner_test.go b/pkg/apply/taskrunner/runner_test.go index 8a24a009..b2e5ef1b 100644 --- a/pkg/apply/taskrunner/runner_test.go +++ b/pkg/apply/taskrunner/runner_test.go @@ -45,9 +45,8 @@ func TestBaseRunner(t *testing.T) { statusEventsDelay time.Duration statusEvents []pollevent.Event expectedEventTypes []event.Type - expectedError error expectedTimedOutResources []TimedOutResource - expectedErrorMsg string + expectedTimeoutErrorMsg string }{ "wait task runs until condition is met": { tasks: []Task{ @@ -112,9 +111,11 @@ func TestBaseRunner(t *testing.T) { }, }, expectedEventTypes: []event.Type{ + event.ActionGroupType, event.StatusType, + event.WaitType, + event.ActionGroupType, }, - expectedError: &TimeoutError{}, expectedTimedOutResources: []TimedOutResource{ { Identifier: depID, @@ -122,7 +123,7 @@ func TestBaseRunner(t *testing.T) { Message: "resource not cached", }, }, - expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent", + expectedTimeoutErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent", }, "wait task times out eventually (InProgress)": { tasks: []Task{ @@ -147,16 +148,19 @@ func TestBaseRunner(t *testing.T) { }, }, expectedEventTypes: []event.Type{ + event.ActionGroupType, + event.StatusType, event.StatusType, + event.WaitType, + event.ActionGroupType, }, - expectedError: &TimeoutError{}, expectedTimedOutResources: []TimedOutResource{ { Identifier: depID, Status: status.InProgressStatus, }, }, - expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent", + expectedTimeoutErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent", }, "tasks run in order": { tasks: []Task{ @@ -244,18 +248,13 @@ func TestBaseRunner(t *testing.T) { close(eventChannel) wg.Wait() - if tc.expectedError != nil { - assert.IsType(t, tc.expectedError, err) - if timeoutError, ok := err.(*TimeoutError); ok { - assert.ElementsMatch(t, tc.expectedTimedOutResources, - timeoutError.TimedOutResources) - assert.Equal(t, timeoutError.Error(), tc.expectedErrorMsg) - } - return - } else if err != nil { + if err != nil { t.Errorf("expected no error, but got %v", err) } + for _, event := range events { + t.Log(event) + } if want, got := len(tc.expectedEventTypes), len(events); want != got { t.Errorf("expected %d events, but got %d", want, got) } @@ -265,6 +264,14 @@ func TestBaseRunner(t *testing.T) { t.Errorf("expected event type %s, but got %s", want, got) } + if e.Type == event.WaitType { + err := e.WaitEvent.Error + if timeoutError, ok := err.(*TimeoutError); ok { + assert.ElementsMatch(t, tc.expectedTimedOutResources, + timeoutError.TimedOutResources) + assert.Equal(t, timeoutError.Error(), tc.expectedTimeoutErrorMsg) + } + } } }) } diff --git a/pkg/apply/taskrunner/task.go b/pkg/apply/taskrunner/task.go index e4737a4f..a48a707c 100644 --- a/pkg/apply/taskrunner/task.go +++ b/pkg/apply/taskrunner/task.go @@ -99,7 +99,7 @@ func (w *WaitTask) Start(taskContext *TaskContext) { // setTimer creates the timer with the timeout value taken from // the WaitTask struct. Once the timer expires, it will send -// a message on the TaskChannel provided in the taskContext. +// a message on the EventChannel provided in the taskContext. func (w *WaitTask) setTimer(taskContext *TaskContext) { timer := time.NewTimer(w.Timeout) go func() { @@ -108,16 +108,23 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) { // Timeout is cancelled. <-timer.C select { - // We only send the taskResult if no one has gotten + // We only send the TimeoutError to the eventChannel if no one has gotten // to the token first. case <-w.token: - taskContext.TaskChannel() <- TaskResult{ - Err: &TimeoutError{ - Identifiers: w.Ids, - Timeout: w.Timeout, - Condition: w.Condition, + err := &TimeoutError{ + Identifiers: w.Ids, + Timeout: w.Timeout, + Condition: w.Condition, + } + amendTimeoutError(taskContext, err) + taskContext.EventChannel() <- event.Event{ + Type: event.WaitType, + WaitEvent: event.WaitEvent{ + GroupName: w.Name(), + Error: err, }, } + taskContext.TaskChannel() <- TaskResult{} default: return } @@ -127,6 +134,24 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) { } } +func amendTimeoutError(taskContext *TaskContext, err error) { + if timeoutErr, ok := err.(*TimeoutError); ok { + var timedOutResources []TimedOutResource + for _, id := range timeoutErr.Identifiers { + result := taskContext.ResourceCache().Get(id) + if timeoutErr.Condition.Meets(result.Status) { + continue + } + timedOutResources = append(timedOutResources, TimedOutResource{ + Identifier: id, + Status: result.Status, + Message: result.StatusMessage, + }) + } + timeoutErr.TimedOutResources = timedOutResources + } +} + // checkCondition checks whether the condition set in the task // is currently met given the status of resources in the cache. func (w *WaitTask) checkCondition(taskContext *TaskContext) bool { diff --git a/pkg/apply/taskrunner/task_test.go b/pkg/apply/taskrunner/task_test.go index 0c267ffd..3504ea8d 100644 --- a/pkg/apply/taskrunner/task_test.go +++ b/pkg/apply/taskrunner/task_test.go @@ -15,7 +15,8 @@ import ( ) func TestWaitTask_TimeoutTriggered(t *testing.T) { - task := NewWaitTask("wait", object.ObjMetadataSet{}, AllCurrent, + taskName := "wait" + task := NewWaitTask(taskName, object.ObjMetadataSet{}, AllCurrent, 2*time.Second, testutil.NewFakeRESTMapper()) eventChannel := make(chan event.Event) @@ -28,9 +29,16 @@ func TestWaitTask_TimeoutTriggered(t *testing.T) { timer := time.NewTimer(3 * time.Second) select { - case res := <-taskContext.TaskChannel(): - if _, ok := IsTimeoutError(res.Err); !ok { - t.Errorf("expected timeout error, but got %v", res.Err) + case e := <-taskContext.EventChannel(): + if e.Type != event.WaitType { + t.Errorf("expected a WaitType event, but got a %v event", e.Type) + } + if e.WaitEvent.GroupName != taskName { + t.Errorf("expected WaitEvent.GroupName = %q, but got %q", taskName, e.WaitEvent.GroupName) + } + err := e.WaitEvent.Error + if _, ok := IsTimeoutError(err); !ok { + t.Errorf("expected timeout error, but got %v", err) } return case <-timer.C: diff --git a/pkg/testutil/events.go b/pkg/testutil/events.go index ed5d738b..2379588c 100644 --- a/pkg/testutil/events.go +++ b/pkg/testutil/events.go @@ -23,6 +23,7 @@ type ExpEvent struct { StatusEvent *ExpStatusEvent PruneEvent *ExpPruneEvent DeleteEvent *ExpDeleteEvent + WaitEvent *ExpWaitEvent } type ExpInitEvent struct { @@ -67,6 +68,11 @@ type ExpDeleteEvent struct { Error error } +type ExpWaitEvent struct { + GroupName string + Error error +} + func VerifyEvents(expEvents []ExpEvent, events []event.Event) error { if len(expEvents) == 0 && len(events) == 0 { return nil @@ -234,6 +240,24 @@ func isMatch(ee ExpEvent, e event.Event) bool { return de.Error != nil } return de.Error == nil + + case event.WaitType: + wee := ee.WaitEvent + if wee == nil { + return true + } + we := e.WaitEvent + + if wee.GroupName != "" { + if wee.GroupName != we.GroupName { + return false + } + } + + if wee.Error != nil { + return cmp.Equal(wee.Error, we.Error, cmpopts.EquateErrors()) + } + return we.Error == nil } return true } @@ -317,6 +341,15 @@ func EventToExpEvent(e event.Event) ExpEvent { Error: e.DeleteEvent.Error, }, } + + case event.WaitType: + return ExpEvent{ + EventType: event.WaitType, + WaitEvent: &ExpWaitEvent{ + GroupName: e.WaitEvent.GroupName, + Error: e.WaitEvent.Error, + }, + } } return ExpEvent{} }