Skip to content

Commit

Permalink
Unit tests for activity task handler (#1378)
Browse files Browse the repository at this point in the history
* Unit tests for activity task handler
  • Loading branch information
3vilhamster authored Nov 1, 2024
1 parent 41c2a12 commit 5b97f10
Show file tree
Hide file tree
Showing 12 changed files with 1,015 additions and 321 deletions.
1 change: 1 addition & 0 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0
submodules: 'true'

- name: Set up Go
uses: actions/setup-go@v4
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang-jwt/jwt/v5 v5.2.0
github.com/golang/mock v1.5.0
github.com/jonboulle/clockwork v0.4.0
github.com/marusama/semaphore/v2 v2.5.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pborman/uuid v0.0.0-20160209185913-a97ce2ca70fa
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down
2 changes: 2 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"go.uber.org/cadence/.gen/go/shared"
)

//go:generate mockery --name ServiceInvoker --inpackage --with-expecter --case snake --filename service_invoker_mock.go --boilerplate-file ../LICENSE

type (
// RegistryActivityInfo
RegistryActivityInfo interface {
Expand Down
214 changes: 214 additions & 0 deletions internal/activity_task_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"context"
"fmt"
"strings"
"time"

"github.com/jonboulle/clockwork"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
s "go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common/debug"
"go.uber.org/cadence/internal/common/metrics"
)

type (
// activityTaskHandlerImpl is the implementation of ActivityTaskHandler
activityTaskHandlerImpl struct {
clock clockwork.Clock
taskListName string
identity string
service workflowserviceclient.Interface
metricsScope *metrics.TaggedScope
logger *zap.Logger
userContext context.Context
registry *registry
activityProvider activityProvider
dataConverter DataConverter
workerStopCh <-chan struct{}
contextPropagators []ContextPropagator
tracer opentracing.Tracer
featureFlags FeatureFlags
activityTracker debug.ActivityTracker
}
)

func newActivityTaskHandler(
service workflowserviceclient.Interface,
params workerExecutionParameters,
registry *registry,
) ActivityTaskHandler {
return newActivityTaskHandlerWithCustomProvider(service, params, registry, nil, clockwork.NewRealClock())
}

func newActivityTaskHandlerWithCustomProvider(
service workflowserviceclient.Interface,
params workerExecutionParameters,
registry *registry,
activityProvider activityProvider,
clock clockwork.Clock,
) ActivityTaskHandler {
if params.Tracer == nil {
params.Tracer = opentracing.NoopTracer{}
}
if params.WorkerStats.ActivityTracker == nil {
params.WorkerStats.ActivityTracker = debug.NewNoopActivityTracker()
}
return &activityTaskHandlerImpl{
clock: clock,
taskListName: params.TaskList,
identity: params.Identity,
service: service,
logger: params.Logger,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
userContext: params.UserContext,
registry: registry,
activityProvider: activityProvider,
dataConverter: params.DataConverter,
workerStopCh: params.WorkerStopChannel,
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
featureFlags: params.FeatureFlags,
activityTracker: params.WorkerStats.ActivityTracker,
}
}

// Execute executes an implementation of the activity.
func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivityTaskResponse) (result interface{}, err error) {
traceLog(func() {
ath.logger.Debug("Processing new activity task",
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
zap.String(tagActivityType, t.ActivityType.GetName()))
})

rootCtx := ath.userContext
if rootCtx == nil {
rootCtx = context.Background()
}
canCtx, cancel := context.WithCancel(rootCtx)
defer cancel()

workflowType := t.WorkflowType.GetName()
activityType := t.ActivityType.GetName()
invoker := newServiceInvoker(t.TaskToken, ath.identity, ath.service, cancel, t.GetHeartbeatTimeoutSeconds(), ath.workerStopCh, ath.featureFlags, ath.logger, workflowType, activityType)
defer func() {
_, activityCompleted := result.(*s.RespondActivityTaskCompletedRequest)
invoker.Close(!activityCompleted) // flush buffered heartbeat if activity was not successfully completed.
}()

metricsScope := getMetricsScopeForActivity(ath.metricsScope, workflowType, activityType)
ctx := WithActivityTask(canCtx, t, taskList, invoker, ath.logger, metricsScope, ath.dataConverter, ath.workerStopCh, ath.contextPropagators, ath.tracer)

activityImplementation := ath.getActivity(activityType)
if activityImplementation == nil {
// Couldn't find the activity implementation.
supported := strings.Join(ath.getRegisteredActivityNames(), ", ")
return nil, fmt.Errorf("unable to find activityType=%v. Supported types: [%v]", activityType, supported)
}

// panic handler
defer func() {
if p := recover(); p != nil {
topLine := fmt.Sprintf("activity for %s [panic]:", ath.taskListName)
st := getStackTraceRaw(topLine, 7, 0)
ath.logger.Error("Activity panic.",
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
zap.String(tagActivityType, activityType),
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
metricsScope.Counter(metrics.ActivityTaskPanicCounter).Inc(1)
panicErr := newPanicError(p, st)
result, err = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr, ath.dataConverter), nil
}
}()

// propagate context information into the activity context from the headers
for _, ctxProp := range ath.contextPropagators {
var err error
if ctx, err = ctxProp.Extract(ctx, NewHeaderReader(t.Header)); err != nil {
return nil, fmt.Errorf("unable to propagate context %w", err)
}
}

info := ctx.Value(activityEnvContextKey).(*activityEnvironment)
ctx, dlCancelFunc := context.WithDeadline(ctx, info.deadline)
defer dlCancelFunc()

ctx, span := createOpenTracingActivitySpan(ctx, ath.tracer, time.Now(), activityType, t.WorkflowExecution.GetWorkflowId(), t.WorkflowExecution.GetRunId())
defer span.Finish()

if activityImplementation.GetOptions().EnableAutoHeartbeat && t.HeartbeatTimeoutSeconds != nil && *t.HeartbeatTimeoutSeconds > 0 {
heartBeater := newHeartbeater(ath.workerStopCh, invoker, ath.logger, ath.clock, activityType, t.WorkflowExecution)
go heartBeater.Run(ctx, time.Duration(*t.HeartbeatTimeoutSeconds)*time.Second)
}
activityInfo := debug.ActivityInfo{
TaskList: ath.taskListName,
ActivityType: activityType,
}
defer ath.activityTracker.Start(activityInfo).Stop()
output, err := activityImplementation.Execute(ctx, t.Input)

dlCancelFunc()
if <-ctx.Done(); ctx.Err() == context.DeadlineExceeded {
ath.logger.Warn("Activity timeout.",
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
zap.String(tagActivityType, activityType),
)
return nil, ctx.Err()
}
if err != nil && err != ErrActivityResultPending {
ath.logger.Error("Activity error.",
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
zap.String(tagActivityType, activityType),
zap.Error(err),
)
}
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, output, err, ath.dataConverter), nil
}

func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
if ath.activityProvider != nil {
return ath.activityProvider(name)
}

if a, ok := ath.registry.GetActivity(name); ok {
return a
}

return nil
}

func (ath *activityTaskHandlerImpl) getRegisteredActivityNames() (activityNames []string) {
for _, a := range ath.registry.getRegisteredActivities() {
activityNames = append(activityNames, a.ActivityType().Name)
}
return
}
Loading

0 comments on commit 5b97f10

Please sign in to comment.