diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 74e411948..5aa3916a8 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -933,7 +933,10 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu } lar.task.attempt++ - w.laTunnel.sendTask(lar.task) + + if !w.laTunnel.sendTask(lar.task) { + lar.task.attempt-- + } }) return true } @@ -1012,13 +1015,18 @@ func (w *workflowExecutionContextImpl) CompleteDecisionTask(workflowTask *workfl if w.hasPendingLocalActivityWork() && w.laTunnel != nil { if len(eventHandler.unstartedLaTasks) > 0 { // start new local activity tasks + unstartedLaTasks := make(map[string]struct{}) for activityID := range eventHandler.unstartedLaTasks { task := eventHandler.pendingLaTasks[activityID] task.wc = w task.workflowTask = workflowTask - w.laTunnel.sendTask(task) + if !w.laTunnel.sendTask(task) { + unstartedLaTasks[activityID] = struct{}{} + task.wc = nil + task.workflowTask = nil + } } - eventHandler.unstartedLaTasks = make(map[string]struct{}) + eventHandler.unstartedLaTasks = unstartedLaTasks } // cannot complete decision task as there are pending local activities if waitLocalActivities { diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index c88683de9..076c89db0 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -884,18 +884,18 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() { } task := createWorkflowTask(testEvents, 0, "RetryLocalActivityWorkflow") + stopCh := make(chan struct{}) params := workerExecutionParameters{ - TaskList: testWorkflowTaskTasklist, - Identity: "test-id-1", - Logger: t.logger, - Tracer: opentracing.NoopTracer{}, + TaskList: testWorkflowTaskTasklist, + Identity: "test-id-1", + Logger: t.logger, + Tracer: opentracing.NoopTracer{}, + WorkerStopChannel: stopCh, } + defer close(stopCh) taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment()) - laTunnel := &localActivityTunnel{ - taskCh: make(chan *localActivityTask, 1000), - resultCh: make(chan interface{}), - } + laTunnel := newLocalActivityTunnel(params.WorkerStopChannel) taskHandlerImpl, ok := taskHandler.(*workflowTaskHandlerImpl) t.True(ok) taskHandlerImpl.laTunnel = laTunnel diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 2bb971450..c0af4cace 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -130,20 +130,34 @@ type ( localActivityTunnel struct { taskCh chan *localActivityTask resultCh chan interface{} + stopCh <-chan struct{} } ) -func (lat *localActivityTunnel) getTask(stopC <-chan struct{}) *localActivityTask { +func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel { + return &localActivityTunnel{ + taskCh: make(chan *localActivityTask, 1000), + resultCh: make(chan interface{}), + stopCh: stopCh, + } +} + +func (lat *localActivityTunnel) getTask() *localActivityTask { select { case task := <-lat.taskCh: return task - case <-stopC: + case <-lat.stopCh: return nil } } -func (lat *localActivityTunnel) sendTask(task *localActivityTask) { - lat.taskCh <- task +func (lat *localActivityTunnel) sendTask(task *localActivityTask) bool { + select { + case lat.taskCh <- task: + return true + case <-lat.stopCh: + return false + } } func isClientSideError(err error) bool { @@ -408,7 +422,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct } func (latp *localActivityTaskPoller) PollTask() (interface{}, error) { - return latp.laTunnel.getTask(latp.shutdownC), nil + return latp.laTunnel.getTask(), nil } func (latp *localActivityTaskPoller) ProcessTask(task interface{}) error { diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 7a7229453..86ef25933 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -308,10 +308,7 @@ func newWorkflowTaskWorkerInternal( ) // laTunnel is the glue that hookup 3 parts - laTunnel := &localActivityTunnel{ - taskCh: make(chan *localActivityTask, 1000), - resultCh: make(chan interface{}), - } + laTunnel := newLocalActivityTunnel(params.WorkerStopChannel) // 1) workflow handler will send local activity task to laTunnel if handlerImpl, ok := taskHandler.(*workflowTaskHandlerImpl); ok {