From 9bc9785af961b7ee743cdeb61977f5c04c241610 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Tue, 7 Jan 2025 15:25:15 -0800 Subject: [PATCH] use worker type in logger and scope for autoscaler --- internal/internal_worker_base.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 9b7b9908d..e1fb4e74b 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -168,6 +168,8 @@ func createPollRetryPolicy() backoff.RetryPolicy { func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker { ctx, cancel := context.WithCancel(context.Background()) + logger = logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}) + metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType) concurrency := &worker.ConcurrencyLimit{ PollerPermit: worker.NewResizablePermit(options.pollerCount), @@ -192,8 +194,8 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t shutdownCh: make(chan struct{}), taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), - logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), - metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), + logger: logger, + metricsScope: metricsScope, concurrency: concurrency, concurrencyAutoScaler: concurrencyAS, taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.