From 8222a62258befb39565006ad711c84ecb07d85d9 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Tue, 8 Oct 2024 15:24:35 -0700 Subject: [PATCH 1/7] enforce having larger than one decision poller for sticky execution enabled workers --- evictiontest/workflow_cache_eviction_test.go | 3 +- internal/internal_poller_autoscaler.go | 3 +- internal/internal_worker.go | 4 +- internal/internal_worker_test.go | 3 +- internal/worker.go | 19 +++++- internal/worker_test.go | 68 ++++++++++++++++++++ worker/worker.go | 6 +- 7 files changed, 97 insertions(+), 9 deletions(-) create mode 100644 internal/worker_test.go diff --git a/evictiontest/workflow_cache_eviction_test.go b/evictiontest/workflow_cache_eviction_test.go index fe0f66ffa..4107935e6 100644 --- a/evictiontest/workflow_cache_eviction_test.go +++ b/evictiontest/workflow_cache_eviction_test.go @@ -174,11 +174,12 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() { // so if our worker puts *cacheSize* entries in the cache, it should evict exactly one s.service.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(mockResetStickyTaskList).Times(1) - workflowWorker := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{ + workflowWorker, err := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{ DisableActivityWorker: true, Logger: zaptest.NewLogger(s.T()), IsolationGroup: "zone-1", }) + s.Require().NoError(err) // this is an arbitrary workflow we use for this test // NOTE: a simple helloworld that doesn't execute an activity // won't work because the workflow will simply just complete diff --git a/internal/internal_poller_autoscaler.go b/internal/internal_poller_autoscaler.go index 333f03933..6fbd4ccbc 100644 --- a/internal/internal_poller_autoscaler.go +++ b/internal/internal_poller_autoscaler.go @@ -37,7 +37,8 @@ import ( const ( defaultPollerAutoScalerCooldown = time.Minute defaultPollerAutoScalerTargetUtilization = 0.6 - defaultMinConcurrentPollerSize = 1 + defaultMinConcurrentActivityPollerSize = 1 + defaultMinConcurrentDecisionPollerSize = 2 ) var ( diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 78911d0a4..69a57527a 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1286,10 +1286,10 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions { options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize } if options.MinConcurrentActivityTaskPollers == 0 { - options.MinConcurrentActivityTaskPollers = defaultMinConcurrentPollerSize + options.MinConcurrentActivityTaskPollers = defaultMinConcurrentActivityPollerSize } if options.MinConcurrentDecisionTaskPollers == 0 { - options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentPollerSize + options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentDecisionPollerSize } if options.PollerAutoScalerCooldown == 0 { options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 8b02cf329..fd150761c 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -414,11 +414,12 @@ func createWorkerWithThrottle( workerOptions.EnableSessionWorker = true // Start Worker. - worker := NewWorker( + worker, err := NewWorker( service, domain, "testGroupName2", workerOptions) + require.NoError(t, err) return worker } diff --git a/internal/worker.go b/internal/worker.go index 1d2fbe4af..babce644e 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -23,6 +23,7 @@ package internal import ( "context" + "fmt" "time" "go.uber.org/cadence/internal/common/debug" @@ -108,7 +109,7 @@ type ( // optional: Sets the minimum number of goroutines that will concurrently poll the // cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true, // changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list. - // Default value is 1 + // Default value is 2 MinConcurrentDecisionTaskPollers int // optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count @@ -333,8 +334,11 @@ func NewWorker( domain string, taskList string, options WorkerOptions, -) *aggregatedWorker { - return newAggregatedWorker(service, domain, taskList, options) +) (*aggregatedWorker, error) { + if err := options.Validate(); err != nil { + return nil, fmt.Errorf("worker options validation error: %w", err) + } + return newAggregatedWorker(service, domain, taskList, options), nil } // ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it. @@ -383,3 +387,12 @@ func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName s r := NewWorkflowReplayer() return r.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID) } + +// Validate sanity validation of WorkerOptions +func (o WorkerOptions) Validate() error { + // decision task pollers must be >= 2 or unset if sticky tasklist is enabled https://github.com/uber-go/cadence-client/issues/1369 + if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1 || o.MinConcurrentDecisionTaskPollers == 1) { + return fmt.Errorf("DecisionTaskPollers must be >= 2 or use default value") + } + return nil +} diff --git a/internal/worker_test.go b/internal/worker_test.go new file mode 100644 index 000000000..950955efa --- /dev/null +++ b/internal/worker_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewWorker(t *testing.T) { + tests := []struct { + name string + options WorkerOptions + expectErr string + }{ + { + name: "happy with default value", + options: WorkerOptions{}, + expectErr: "", + }, + { + name: "happy with explicit decision task poller set to 1 if sticky task list is disabled", + options: WorkerOptions{ + MaxConcurrentDecisionTaskPollers: 1, + DisableStickyExecution: true, + }, + expectErr: "", + }, + { + name: "invalid worker with explicit decision task poller set to 1", + options: WorkerOptions{ + MaxConcurrentDecisionTaskPollers: 1, + }, + expectErr: "DecisionTaskPollers must be >= 2 or use default value", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w, err := NewWorker(nil, "test-domain", "test-tasklist", tt.options) + if tt.expectErr != "" { + assert.ErrorContains(t, err, tt.expectErr) + assert.Nil(t, w) + } else { + assert.NoError(t, err) + assert.NotNil(t, w) + } + }) + } +} diff --git a/worker/worker.go b/worker/worker.go index 13cdf3164..3778ab143 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -283,7 +283,11 @@ func New( taskList string, options Options, ) Worker { - return internal.NewWorker(service, domain, taskList, options) + w, err := internal.NewWorker(service, domain, taskList, options) + if err != nil { + panic(err) + } + return w } // NewWorkflowReplayer creates a WorkflowReplayer instance. From e05cf96b539b9db4e5b46c2f683ac49c8a4b6b7f Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Tue, 8 Oct 2024 15:41:45 -0700 Subject: [PATCH 2/7] fix test for augment --- internal/internal_worker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index fd150761c..3984f2275 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1389,7 +1389,7 @@ func Test_augmentWorkerOptions(t *testing.T) { MaxConcurrentDecisionTaskExecutionSize: 1000, WorkerDecisionTasksPerSecond: 100000, MaxConcurrentDecisionTaskPollers: 2, - MinConcurrentDecisionTaskPollers: 1, + MinConcurrentDecisionTaskPollers: 2, PollerAutoScalerCooldown: time.Minute, PollerAutoScalerTargetUtilization: 0.6, PollerAutoScalerDryRun: false, From df9aa89c391e7a7026d628304ae5e206a78575e6 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Wed, 9 Oct 2024 09:09:50 -0700 Subject: [PATCH 3/7] add new line to retrigger stucked buildkite --- evictiontest/workflow_cache_eviction_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/evictiontest/workflow_cache_eviction_test.go b/evictiontest/workflow_cache_eviction_test.go index 4107935e6..6c92762b9 100644 --- a/evictiontest/workflow_cache_eviction_test.go +++ b/evictiontest/workflow_cache_eviction_test.go @@ -180,6 +180,7 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() { IsolationGroup: "zone-1", }) s.Require().NoError(err) + // this is an arbitrary workflow we use for this test // NOTE: a simple helloworld that doesn't execute an activity // won't work because the workflow will simply just complete From 52d63fd5d2c800d215eed5839935bb2e12faa84b Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Wed, 9 Oct 2024 15:51:01 -0700 Subject: [PATCH 4/7] add worker.NewV2 interface --- internal/internal_worker.go | 8 ++++++-- internal/internal_worker_test.go | 6 ++++-- internal/internal_workers_test.go | 17 +++++++++++------ internal/worker.go | 5 +---- worker/worker.go | 20 ++++++++++++++++++++ 5 files changed, 42 insertions(+), 14 deletions(-) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 69a57527a..e765e3515 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1025,8 +1025,12 @@ func newAggregatedWorker( domain string, taskList string, options WorkerOptions, -) (worker *aggregatedWorker) { +) (worker *aggregatedWorker, err error) { wOptions := AugmentWorkerOptions(options) + if err := options.Validate(); err != nil { + return nil, fmt.Errorf("worker options validation error: %w", err) + } + ctx := wOptions.BackgroundActivityContext if ctx == nil { ctx = context.Background() @@ -1156,7 +1160,7 @@ func newAggregatedWorker( logger: logger, registry: registry, workerstats: workerParams.WorkerStats, - } + }, nil } // tagScope with one or multiple tags, like diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 3984f2275..65792bb45 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1076,7 +1076,8 @@ func TestActivityNilArgs(t *testing.T) { func TestWorkerOptionDefaults(t *testing.T) { domain := "worker-options-test" taskList := "worker-options-tl" - aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{}) + aggWorker, err := newAggregatedWorker(nil, domain, taskList, WorkerOptions{}) + require.NoError(t, err) decisionWorker := aggWorker.workflowWorker require.True(t, decisionWorker.executionParameters.Identity != "") require.NotNil(t, decisionWorker.executionParameters.Logger) @@ -1144,7 +1145,8 @@ func TestWorkerOptionNonDefaults(t *testing.T) { Tracer: opentracing.NoopTracer{}, } - aggWorker := newAggregatedWorker(nil, domain, taskList, options) + aggWorker, err := newAggregatedWorker(nil, domain, taskList, options) + require.NoError(t, err) decisionWorker := aggWorker.workflowWorker require.True(t, len(decisionWorker.executionParameters.ContextPropagators) > 0) diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 49d4041b2..8b6e08049 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -340,7 +340,8 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() { DisableActivityWorker: true, Identity: "test-worker-identity", } - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( longDecisionWorkflowFn, RegisterWorkflowOptions{Name: "long-running-decision-workflow-type"}, @@ -515,7 +516,8 @@ func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() { // See the mock function for the second PollForDecisionTask call above. MaxConcurrentDecisionTaskExecutionSize: 1, } - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( queryWorkflowFn, RegisterWorkflowOptions{Name: workflowType}, @@ -638,7 +640,8 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() { DisableActivityWorker: true, Identity: "test-worker-identity", } - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( longDecisionWorkflowFn, RegisterWorkflowOptions{Name: "multiple-local-activities-workflow-type"}, @@ -748,7 +751,8 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() { Logger: zaptest.NewLogger(s.T()), Identity: "test-worker-identity", } - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( workflowFn, RegisterWorkflowOptions{Name: workflowType}, @@ -859,14 +863,15 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() { return nil }).MinTimes(1) - worker := newAggregatedWorker(s.service, domain, taskList, options) + worker, err := newAggregatedWorker(s.service, domain, taskList, options) + s.Require().NoError(err) worker.RegisterWorkflowWithOptions( workflowFn, RegisterWorkflowOptions{Name: workflowType}, ) worker.RegisterActivityWithOptions(activitySleep, RegisterActivityOptions{Name: "activitySleep"}) s.NotNil(worker.locallyDispatchedActivityWorker) - err := worker.Start() + err = worker.Start() s.NoError(err, "worker failed to start") // wait for test to complete diff --git a/internal/worker.go b/internal/worker.go index babce644e..e246ab080 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -335,10 +335,7 @@ func NewWorker( taskList string, options WorkerOptions, ) (*aggregatedWorker, error) { - if err := options.Validate(); err != nil { - return nil, fmt.Errorf("worker options validation error: %w", err) - } - return newAggregatedWorker(service, domain, taskList, options), nil + return newAggregatedWorker(service, domain, taskList, options) } // ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it. diff --git a/worker/worker.go b/worker/worker.go index 3778ab143..44ecf380b 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -277,6 +277,8 @@ const ( // identifies group of workflow and activity implementations that are // hosted by a single worker process // options - configure any worker specific options like logger, metrics, identity +// +// DEPRCATED: use NewV2 instead since this implementation will panic on error func New( service workflowserviceclient.Interface, domain string, @@ -290,6 +292,24 @@ func New( return w } +// NewV2 returns an instance of worker for managing workflow and activity executions and an error. +// +// service - thrift connection to the cadence server +// domain - the name of the cadence domain +// taskList - is the task list name you use to identify your client worker, also +// identifies group of workflow and activity implementations that are +// hosted by a single worker process +// options - configure any worker specific options like logger, metrics, identity +// Returns an error if the worker cannot be created. +func NewV2( + service workflowserviceclient.Interface, + domain string, + taskList string, + options Options, +) (Worker, error) { + return internal.NewWorker(service, domain, taskList, options) +} + // NewWorkflowReplayer creates a WorkflowReplayer instance. func NewWorkflowReplayer() WorkflowReplayer { return internal.NewWorkflowReplayer() From 52aafbcc6c0c25e05524ea05d23477ba42221ebd Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 10 Oct 2024 09:08:44 -0700 Subject: [PATCH 5/7] go doc fix --- CHANGELOG.md | 11 +++++++---- worker/worker.go | 12 +----------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dcd2bbe8..caa2b9e29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- Added worker.NewV2 with validation on decision poller count (#1370) + ## [v1.2.10] - 2024-07-10 ### Added - Revert "Handle panics while polling for tasks (#1352)" (#1357) @@ -83,16 +86,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed in TestEnv workflow interceptor is not propagated correctly for child workflows #1289 ## [v1.0.2] - 2023-09-25 -### Added +### Added - Add a structured error for non-determinism failures -### Changed -- Do not log when automatic heart beating fails due to cancellations +### Changed +- Do not log when automatic heart beating fails due to cancellations ## [v1.0.1] - 2023-08-14 ### Added - Emit cadence worker's hardware utilization inside worker once per host by @timl3136 in #1260 -### Changed +### Changed - Updated supported Go version to 1.19 - Log when the automatic heartbeating fails - Updated golang.org/x/net and github.com/prometheus/client_golang diff --git a/worker/worker.go b/worker/worker.go index 44ecf380b..d1436ccdf 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -269,16 +269,7 @@ const ( ShadowModeContinuous = internal.ShadowModeContinuous ) -// New creates an instance of worker for managing workflow and activity executions. -// -// service - thrift connection to the cadence server -// domain - the name of the cadence domain -// taskList - is the task list name you use to identify your client worker, also -// identifies group of workflow and activity implementations that are -// hosted by a single worker process -// options - configure any worker specific options like logger, metrics, identity -// -// DEPRCATED: use NewV2 instead since this implementation will panic on error +// Deprecated: use NewV2 instead since this implementation will panic on error func New( service workflowserviceclient.Interface, domain string, @@ -300,7 +291,6 @@ func New( // identifies group of workflow and activity implementations that are // hosted by a single worker process // options - configure any worker specific options like logger, metrics, identity -// Returns an error if the worker cannot be created. func NewV2( service workflowserviceclient.Interface, domain string, From 49fb8e3bec8f80c7199f65285b33bc6c2e0a193f Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 10 Oct 2024 11:30:13 -0700 Subject: [PATCH 6/7] lint --- worker/worker.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index d1436ccdf..3dd3cee85 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -285,12 +285,12 @@ func New( // NewV2 returns an instance of worker for managing workflow and activity executions and an error. // -// service - thrift connection to the cadence server -// domain - the name of the cadence domain -// taskList - is the task list name you use to identify your client worker, also -// identifies group of workflow and activity implementations that are -// hosted by a single worker process -// options - configure any worker specific options like logger, metrics, identity +// service - thrift connection to the cadence server +// domain - the name of the cadence domain +// taskList - is the task list name you use to identify your client worker, also +// identifies group of workflow and activity implementations that are +// hosted by a single worker process +// options - configure any worker specific options like logger, metrics, identity func NewV2( service workflowserviceclient.Interface, domain string, From 9d1627c7dcf4bb8cff6f0c84b689cc8b165fe861 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 10 Oct 2024 13:00:26 -0700 Subject: [PATCH 7/7] fix unit test --- internal/internal_poller_autoscaler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/internal_poller_autoscaler_test.go b/internal/internal_poller_autoscaler_test.go index 48337284b..3e9757979 100644 --- a/internal/internal_poller_autoscaler_test.go +++ b/internal/internal_poller_autoscaler_test.go @@ -61,7 +61,7 @@ func Test_pollerAutoscaler(t *testing.T) { taskPoll: 0, unrelated: 0, initialPollerCount: 10, - minPollerCount: 1, + minPollerCount: 2, maxPollerCount: 10, targetMilliUsage: 500, cooldownTime: coolDownTime,