From bb08dba775236e90ad2bd0e6f0b4f01104aca29b Mon Sep 17 00:00:00 2001 From: Hermawan Wijaya Date: Wed, 6 Dec 2023 16:31:34 +0700 Subject: [PATCH] feat: add batching logic --- core/asset/service.go | 4 +- .../elasticsearch/discovery_repository.go | 7 +++ .../discovery_repository_test.go | 48 +++++++++++++++++++ internal/workermanager/discovery_worker.go | 42 ++++++++++++++-- internal/workermanager/in_situ_worker.go | 38 ++++++++++++++- 5 files changed, 133 insertions(+), 6 deletions(-) diff --git a/core/asset/service.go b/core/asset/service.go index 8a8ad088..189ee719 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -232,7 +232,9 @@ func (s *Service) SuggestAssets(ctx context.Context, cfg SearchConfig) (suggesti func (s *Service) SyncAssets(ctx context.Context, services []string) error { for _, service := range services { - s.worker.EnqueueSyncAssetJob(ctx, service) + if err := s.worker.EnqueueSyncAssetJob(ctx, service); err != nil { + return err + } } return nil } diff --git a/internal/store/elasticsearch/discovery_repository.go b/internal/store/elasticsearch/discovery_repository.go index a0025d6e..35ed44ed 100644 --- a/internal/store/elasticsearch/discovery_repository.go +++ b/internal/store/elasticsearch/discovery_repository.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "net/http" "net/url" "strings" "time" @@ -224,6 +225,12 @@ func createUpsertBody(ast asset.Asset) (io.Reader, error) { } func (repo *DiscoveryRepository) clone(ctx context.Context, indexName, clonedIndexName string) error { + indexExistsFn := repo.cli.client.Indices.Exists + resp, _ := indexExistsFn([]string{clonedIndexName}) + if resp.StatusCode == http.StatusOK { + return nil // skip clone when backup already created + } + cloneFn := repo.cli.client.Indices.Clone resp, err := cloneFn(indexName, clonedIndexName, cloneFn.WithContext(ctx)) if err != nil { diff --git a/internal/store/elasticsearch/discovery_repository_test.go b/internal/store/elasticsearch/discovery_repository_test.go index cb230a2d..d777130f 100644 --- a/internal/store/elasticsearch/discovery_repository_test.go +++ b/internal/store/elasticsearch/discovery_repository_test.go @@ -390,3 +390,51 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) { assert.NoError(t, err) }) } + +func TestDiscoveryRepository_SyncAssets(t *testing.T) { + t.Run("should return success", func(t *testing.T) { + var ( + ctx = context.Background() + indexName = "bigquery-test" + ) + + ast1 := asset.Asset{ + ID: "id1", + Type: asset.TypeTable, + Service: indexName, + URN: "urn1", + } + ast2 := asset.Asset{ + ID: "id2", + Type: asset.TypeTable, + Service: indexName, + URN: "urn2", + } + ast3 := asset.Asset{ + ID: "id3", + Type: asset.TypeTable, + Service: indexName, + URN: "urn3", + } + + assets := []asset.Asset{ast1, ast2, ast3} + + cli, err := esTestServer.NewClient() + require.NoError(t, err) + + _, err = cli.Indices.Create(indexName) + require.NoError(t, err) + + esClient, err := store.NewClient( + log.NewNoop(), + store.Config{}, + store.WithClient(cli), + ) + require.NoError(t, err) + + repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"}) + + err = repo.SyncAssets(ctx, indexName, assets) + require.NoError(t, err) + }) +} diff --git a/internal/workermanager/discovery_worker.go b/internal/workermanager/discovery_worker.go index 30a3bc12..3712e593 100644 --- a/internal/workermanager/discovery_worker.go +++ b/internal/workermanager/discovery_worker.go @@ -50,8 +50,8 @@ func (m *Manager) syncAssetHandler() worker.JobHandler { return worker.JobHandler{ Handle: m.SyncAssets, JobOpts: worker.JobOptions{ - MaxAttempts: 3, - Timeout: 5 * time.Second, + MaxAttempts: 1, + Timeout: 5 * time.Minute, BackoffStrategy: worker.DefaultExponentialBackoff, }, } @@ -72,6 +72,7 @@ func (m *Manager) IndexAsset(ctx context.Context, job worker.JobSpec) error { } func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error { + const batchSize = 1000 service := string(job.Payload) jobs, err := m.jobRepo.GetSyncJobsByService(ctx, service) @@ -82,19 +83,52 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error { if len(jobs) > 1 { for _, job := range jobs { if job.RunAt.Before(job.RunAt) { - return nil // mark job as done if there's earlier job with same service + return nil // mark job as done if there's earlier job with same service to prevent race conditions } } } assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ Services: []string{service}, + Size: batchSize, + SortBy: "name", }) if err != nil { return fmt.Errorf("sync asset: get assets: %w", err) } - return m.discoveryRepo.SyncAssets(ctx, service, assets) + if err := m.discoveryRepo.SyncAssets(ctx, service, assets); err != nil { + return err + } + + if len(assets) == batchSize { // do remaining upsert after first batch completed + it := 1 + + for { + assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ + Services: []string{service}, + Size: batchSize, + Offset: it * batchSize, + SortBy: "name", + }) + if err != nil { + return fmt.Errorf("sync asset: get assets: %w", err) + } + + for _, ast := range assets { + if err := m.discoveryRepo.Upsert(ctx, ast); err != nil { + return err + } + } + + if len(assets) != batchSize { + break + } + it++ + } + + } + return nil } func (m *Manager) EnqueueDeleteAssetJob(ctx context.Context, urn string) error { diff --git a/internal/workermanager/in_situ_worker.go b/internal/workermanager/in_situ_worker.go index 1b3a0d60..13ba6564 100644 --- a/internal/workermanager/in_situ_worker.go +++ b/internal/workermanager/in_situ_worker.go @@ -39,6 +39,7 @@ func (m *InSituWorker) EnqueueDeleteAssetJob(ctx context.Context, urn string) er } func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) error { + const batchSize = 1000 jobs, err := m.jobRepo.GetSyncJobsByService(ctx, service) if err != nil { return fmt.Errorf("sync asset: get sync jobs by service: %w", err) @@ -56,14 +57,49 @@ func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) _ = m.jobRepo.Delete(ctx, jobID) }() + // need to limit this assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ Services: []string{service}, + Size: batchSize, + SortBy: "name", }) if err != nil { return fmt.Errorf("sync asset: get assets: %w", err) } - return m.discoveryRepo.SyncAssets(ctx, service, assets) + if err := m.discoveryRepo.SyncAssets(ctx, service, assets); err != nil { + return err + } + + if len(assets) == batchSize { // do remaining upsert after first batch completed + it := 1 + + for { + assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ + Services: []string{service}, + Size: batchSize, + Offset: it * batchSize, + SortBy: "name", + }) + if err != nil { + return fmt.Errorf("sync asset: get assets: %w", err) + } + + for _, ast := range assets { + if err := m.discoveryRepo.Upsert(ctx, ast); err != nil { + return err + } + } + + if len(assets) != batchSize { + break + } + it++ + } + + } + + return nil } func (*InSituWorker) Close() error { return nil }