From 98b6655730ae6b38aed0a2ea3c49e96f95752fa1 Mon Sep 17 00:00:00 2001 From: Hermawan Wijaya Date: Fri, 5 Jan 2024 17:12:03 +0700 Subject: [PATCH] feat: configurable sync worker job timeout --- compass.yaml.example | 3 +++ internal/workermanager/discovery_worker.go | 7 +++---- internal/workermanager/worker_manager.go | 9 +++++++++ 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/compass.yaml.example b/compass.yaml.example index 1a36eff4..24e21ea9 100644 --- a/compass.yaml.example +++ b/compass.yaml.example @@ -65,6 +65,9 @@ worker: username: compass password: compass_password job_manager_port: 8085 + sync_job_timeout: 15m + index_job_timeout: 5s + delete_job_timeout: 5s client: host: localhost:8081 diff --git a/internal/workermanager/discovery_worker.go b/internal/workermanager/discovery_worker.go index 8d9252b2..f3210893 100644 --- a/internal/workermanager/discovery_worker.go +++ b/internal/workermanager/discovery_worker.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "strings" - "time" "github.com/goto/compass/core/asset" "github.com/goto/compass/pkg/worker" @@ -41,7 +40,7 @@ func (m *Manager) indexAssetHandler() worker.JobHandler { Handle: m.IndexAsset, JobOpts: worker.JobOptions{ MaxAttempts: 3, - Timeout: 5 * time.Second, + Timeout: m.indexTimeout, BackoffStrategy: worker.DefaultExponentialBackoff, }, } @@ -52,7 +51,7 @@ func (m *Manager) syncAssetHandler() worker.JobHandler { Handle: m.SyncAssets, JobOpts: worker.JobOptions{ MaxAttempts: 1, - Timeout: 5 * time.Minute, + Timeout: m.syncTimeout, BackoffStrategy: worker.DefaultExponentialBackoff, }, } @@ -143,7 +142,7 @@ func (m *Manager) deleteAssetHandler() worker.JobHandler { Handle: m.DeleteAsset, JobOpts: worker.JobOptions{ MaxAttempts: 3, - Timeout: 5 * time.Second, + Timeout: m.deleteTimeout, BackoffStrategy: worker.DefaultExponentialBackoff, }, } diff --git a/internal/workermanager/worker_manager.go b/internal/workermanager/worker_manager.go index 0855b2dc..0cd15679 100644 --- a/internal/workermanager/worker_manager.go +++ b/internal/workermanager/worker_manager.go @@ -26,6 +26,9 @@ type Manager struct { discoveryRepo DiscoveryRepository assetRepo asset.Repository logger log.Logger + syncTimeout time.Duration + indexTimeout time.Duration + deleteTimeout time.Duration } //go:generate mockery --name=Worker -r --case underscore --with-expecter --structname Worker --filename worker_mock.go --output=./mocks @@ -44,6 +47,9 @@ type Config struct { ActivePollPercent float64 `mapstructure:"active_poll_percent" default:"20"` PGQ pgq.Config `mapstructure:"pgq"` JobManagerPort int `mapstructure:"job_manager_port"` + SyncJobTimeout time.Duration `mapstructure:"sync_job_timeout" default:"15m"` + IndexJobTimeout time.Duration `mapstructure:"index_job_timeout" default:"5s"` + DeleteJobTimeout time.Duration `mapstructure:"delete_job_timeout" default:"5s"` } type Deps struct { @@ -77,6 +83,9 @@ func New(ctx context.Context, deps Deps) (*Manager, error) { discoveryRepo: deps.DiscoveryRepo, assetRepo: deps.AssetRepo, logger: deps.Logger, + syncTimeout: cfg.SyncJobTimeout, + indexTimeout: cfg.IndexJobTimeout, + deleteTimeout: cfg.DeleteJobTimeout, }, nil }