Skip to content

Commit

Permalink
use worker type in logger and scope for autoscaler
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Jan 7, 2025
1 parent 69f7e3f commit 9bc9785
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func createPollRetryPolicy() backoff.RetryPolicy {

func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType})
metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType)

concurrency := &worker.ConcurrencyLimit{
PollerPermit: worker.NewResizablePermit(options.pollerCount),
Expand All @@ -192,8 +194,8 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
logger: logger,
metricsScope: metricsScope,
concurrency: concurrency,
concurrencyAutoScaler: concurrencyAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
Expand Down

0 comments on commit 9bc9785

Please sign in to comment.