Skip to content

Commit

Permalink
populate task for empty polls instead
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Jan 16, 2025
1 parent 9783bf3 commit d675887
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 21 deletions.
4 changes: 2 additions & 2 deletions internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ func (m *pollerUsageEstimator) CollectUsage(data interface{}) error {
func isTaskEmpty(task interface{}) (bool, error) {
switch t := task.(type) {
case *workflowTask:
return t == nil || t.task == nil, nil
return t == nil || isEmptyDecisionTask(t.task), nil
case *activityTask:
return t == nil || t.task == nil, nil
return t == nil || isEmptyActivityTask(t.task), nil
case *localActivityTask:
return t == nil || t.workflowTask == nil, nil
default:
Expand Down
4 changes: 1 addition & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type (
// workflowTask wraps a decision task.
workflowTask struct {
task *s.PollForDecisionTaskResponse
autoConfigHint *s.AutoConfigHint
historyIterator HistoryIterator
doneCh chan struct{}
laResultCh chan *localActivityResult
Expand All @@ -84,7 +83,6 @@ type (
// activityTask wraps a activity task.
activityTask struct {
task *s.PollForActivityTaskResponse
autoConfigHint *s.AutoConfigHint
pollStartTime time.Time
}

Expand Down Expand Up @@ -746,7 +744,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
workflowTask *workflowTask,
heartbeatFunc decisionHeartbeatFunc,
) (completeRequest interface{}, errRet error) {
if workflowTask == nil || workflowTask.task == nil {
if workflowTask == nil || isEmptyDecisionTask(workflowTask.task) {
return nil, errors.New("nil workflow task provided")
}
task := workflowTask.task
Expand Down
11 changes: 3 additions & 8 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,9 +848,7 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
if response == nil || len(response.TaskToken) == 0 {
wtp.metricsScope.Counter(metrics.DecisionPollNoTaskCounter).Inc(1)
wtp.updateBacklog(request.TaskList.GetKind(), 0)
return &workflowTask{
autoConfigHint: response.GetAutoConfigHint(),
}, nil
return &workflowTask{task: response}, nil
}

wtp.updateBacklog(request.TaskList.GetKind(), response.GetBacklogCountHint())
Expand Down Expand Up @@ -910,7 +908,6 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *s.PollForDecisionTaskRes
task := &workflowTask{
task: response,
historyIterator: historyIterator,
autoConfigHint: response.GetAutoConfigHint(),
}
return task
}
Expand Down Expand Up @@ -1119,9 +1116,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
return nil, err
}
if response == nil || len(response.TaskToken) == 0 {
return &activityTask{
autoConfigHint: response.GetAutoConfigHint(),
}, nil
return &activityTask{task: response}, nil
}

workflowType := response.WorkflowType.GetName()
Expand All @@ -1133,7 +1128,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
scheduledToStartLatency := time.Duration(response.GetStartedTimestamp() - response.GetScheduledTimestampOfThisAttempt())
metricsScope.Timer(metrics.ActivityScheduledToStartLatency).Record(scheduledToStartLatency)

return &activityTask{task: response, pollStartTime: startTime, autoConfigHint: response.GetAutoConfigHint()}, nil
return &activityTask{task: response, pollStartTime: startTime}, nil
}

// PollTask polls a new task
Expand Down
10 changes: 2 additions & 8 deletions internal/internal_task_pollers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,13 @@ func TestWorkflowTaskPoller(t *testing.T) {
task,
&workflowTask{
task: task,
autoConfigHint: task.AutoConfigHint,
},
},
{
"success with empty task",
emptyTask,
&workflowTask{
task: nil,
autoConfigHint: task.AutoConfigHint,
task: emptyTask,
},
},
} {
Expand All @@ -107,7 +105,6 @@ func TestWorkflowTaskPoller(t *testing.T) {
resultTask, ok := result.(*workflowTask)
assert.True(t, ok)
assert.Equal(t, tt.expected.task, resultTask.task)
assert.Equal(t, tt.expected.autoConfigHint, resultTask.autoConfigHint)
})
}
})
Expand Down Expand Up @@ -139,15 +136,13 @@ func TestActivityTaskPoller(t *testing.T) {
task,
&activityTask{
task: task,
autoConfigHint: task.AutoConfigHint,
},
},
{
"success with empty task",
emptyTask,
&activityTask{
task: nil,
autoConfigHint: task.AutoConfigHint,
task: emptyTask,
},
},
} {
Expand All @@ -159,7 +154,6 @@ func TestActivityTaskPoller(t *testing.T) {
resultTask, ok := result.(*activityTask)
assert.True(t, ok)
assert.Equal(t, tt.expected.task, resultTask.task)
assert.Equal(t, tt.expected.autoConfigHint, resultTask.autoConfigHint)
})
}
})
Expand Down
8 changes: 8 additions & 0 deletions internal/internal_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,11 @@ func getLengthOfStringPointer(s *string) int {
}
return len(*s)
}

func isEmptyDecisionTask(r *s.PollForDecisionTaskResponse) bool {
return r == nil || len(r.TaskToken) == 0
}

func isEmptyActivityTask(r *s.PollForActivityTaskResponse) bool {
return r == nil || len(r.TaskToken) == 0
}

0 comments on commit d675887

Please sign in to comment.