From e6109d33f9ad1e99aa35d75785a1198d82129495 Mon Sep 17 00:00:00 2001 From: Hermawan Wijaya Date: Wed, 6 Dec 2023 18:40:57 +0700 Subject: [PATCH] feat: add batching and cleanup mechanism --- core/asset/discovery.go | 2 +- core/asset/mocks/discovery_repository.go | 43 ++++++----- .../elasticsearch/discovery_repository.go | 72 +++++++++++-------- .../discovery_repository_test.go | 27 ++----- internal/store/elasticsearch/es.go | 16 +++-- internal/store/elasticsearch/es_test.go | 2 +- internal/store/elasticsearch/schema.go | 4 +- internal/workermanager/discovery_worker.go | 56 ++++++--------- internal/workermanager/in_situ_worker.go | 53 ++++++-------- .../mocks/discovery_repository_mock.go | 43 ++++++----- 10 files changed, 159 insertions(+), 159 deletions(-) diff --git a/core/asset/discovery.go b/core/asset/discovery.go index 8b1fa915..a1c49848 100644 --- a/core/asset/discovery.go +++ b/core/asset/discovery.go @@ -12,7 +12,7 @@ type DiscoveryRepository interface { Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error) Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error) GroupAssets(ctx context.Context, cfg GroupConfig) (results []GroupResult, err error) - SyncAssets(ctx context.Context, indexName string, assets []Asset) error + SyncAssets(ctx context.Context, indexName string) (cleanup func() error, err error) } // GroupConfig represents a group query along diff --git a/core/asset/mocks/discovery_repository.go b/core/asset/mocks/discovery_repository.go index d622e81b..f9a56ca3 100644 --- a/core/asset/mocks/discovery_repository.go +++ b/core/asset/mocks/discovery_repository.go @@ -274,18 +274,30 @@ func (_c *DiscoveryRepository_Suggest_Call) RunAndReturn(run func(context.Contex return _c } -// SyncAssets provides a mock function with given fields: ctx, indexName, assets -func (_m *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string, assets []asset.Asset) error { - ret := _m.Called(ctx, indexName, assets) +// SyncAssets provides a mock function with given fields: ctx, indexName +func (_m *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string) (func() error, error) { + ret := _m.Called(ctx, indexName) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, []asset.Asset) error); ok { - r0 = rf(ctx, indexName, assets) + var r0 func() error + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (func() error, error)); ok { + return rf(ctx, indexName) + } + if rf, ok := ret.Get(0).(func(context.Context, string) func() error); ok { + r0 = rf(ctx, indexName) } else { - r0 = ret.Error(0) + if ret.Get(0) != nil { + r0 = ret.Get(0).(func() error) + } } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, indexName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // DiscoveryRepository_SyncAssets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncAssets' @@ -296,24 +308,23 @@ type DiscoveryRepository_SyncAssets_Call struct { // SyncAssets is a helper method to define mock.On call // - ctx context.Context // - indexName string -// - assets []asset.Asset -func (_e *DiscoveryRepository_Expecter) SyncAssets(ctx interface{}, indexName interface{}, assets interface{}) *DiscoveryRepository_SyncAssets_Call { - return &DiscoveryRepository_SyncAssets_Call{Call: _e.mock.On("SyncAssets", ctx, indexName, assets)} +func (_e *DiscoveryRepository_Expecter) SyncAssets(ctx interface{}, indexName interface{}) *DiscoveryRepository_SyncAssets_Call { + return &DiscoveryRepository_SyncAssets_Call{Call: _e.mock.On("SyncAssets", ctx, indexName)} } -func (_c *DiscoveryRepository_SyncAssets_Call) Run(run func(ctx context.Context, indexName string, assets []asset.Asset)) *DiscoveryRepository_SyncAssets_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) Run(run func(ctx context.Context, indexName string)) *DiscoveryRepository_SyncAssets_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].([]asset.Asset)) + run(args[0].(context.Context), args[1].(string)) }) return _c } -func (_c *DiscoveryRepository_SyncAssets_Call) Return(_a0 error) *DiscoveryRepository_SyncAssets_Call { - _c.Call.Return(_a0) +func (_c *DiscoveryRepository_SyncAssets_Call) Return(cleanup func() error, err error) *DiscoveryRepository_SyncAssets_Call { + _c.Call.Return(cleanup, err) return _c } -func (_c *DiscoveryRepository_SyncAssets_Call) RunAndReturn(run func(context.Context, string, []asset.Asset) error) *DiscoveryRepository_SyncAssets_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) RunAndReturn(run func(context.Context, string) (func() error, error)) *DiscoveryRepository_SyncAssets_Call { _c.Call.Return(run) return _c } diff --git a/internal/store/elasticsearch/discovery_repository.go b/internal/store/elasticsearch/discovery_repository.go index 35ed44ed..5850b832 100644 --- a/internal/store/elasticsearch/discovery_repository.go +++ b/internal/store/elasticsearch/discovery_repository.go @@ -34,79 +34,91 @@ func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout time. } } -func (repo *DiscoveryRepository) Upsert(ctx context.Context, ast asset.Asset) error { - if ast.ID == "" { - return asset.ErrEmptyID - } - if !ast.Type.IsValid() { - return asset.ErrUnknownType - } - - idxExists, err := repo.cli.indexExists(ctx, "Upsert", ast.Service) +func (repo *DiscoveryRepository) createIndex(ctx context.Context, discoveryOp, indexName, alias string) error { + idxExists, err := repo.cli.indexExists(ctx, discoveryOp, indexName) if err != nil { return asset.DiscoveryError{ Op: "IndexExists", - ID: ast.ID, - Index: ast.Service, + Index: indexName, Err: err, } } if !idxExists { - if err := repo.cli.CreateIdx(ctx, "Upsert", ast.Service); err != nil { + if err := repo.cli.CreateIdx(ctx, discoveryOp, indexName, alias); err != nil { return asset.DiscoveryError{ Op: "CreateIndex", - ID: ast.ID, - Index: ast.Service, + Index: indexName, Err: err, } } } + return nil +} + +func (repo *DiscoveryRepository) Upsert(ctx context.Context, ast asset.Asset) error { + if ast.ID == "" { + return asset.ErrEmptyID + } + if !ast.Type.IsValid() { + return asset.ErrUnknownType + } + + if err := repo.createIndex(ctx, "Upsert", ast.Service, defaultSearchIndex); err != nil { + return err + } + return repo.indexAsset(ctx, ast) } -func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string, asts []asset.Asset) error { +func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string) (func() error, error) { backupIndexName := fmt.Sprintf("%+v-bak", indexName) err := repo.updateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":true}}`) if err != nil { - return err + return nil, err } err = repo.clone(ctx, indexName, backupIndexName) if err != nil { - return err + return nil, err } err = repo.updateAlias(ctx, backupIndexName, "universe") if err != nil { - return err + return nil, err } err = repo.deleteByIndexName(ctx, indexName) if err != nil { - return err + return nil, err } - for _, ast := range asts { - err = repo.Upsert(ctx, ast) + err = repo.createIndex(ctx, "SyncAssets", indexName, "") + if err != nil { + return nil, err + } + + cleanup := func() error { + err = repo.updateAlias(ctx, indexName, "universe") if err != nil { return err } - } - err = repo.deleteByIndexName(ctx, backupIndexName) - if err != nil { - return err - } + err = repo.deleteByIndexName(ctx, backupIndexName) + if err != nil { + return err + } - err = repo.updateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":false}}`) - if err != nil { - return err + err = repo.updateIndexSettings(ctx, indexName, `{"settings":{"index.blocks.write":false}}`) + if err != nil { + return err + } + return nil } - return nil + return cleanup, err } func (repo *DiscoveryRepository) DeleteByID(ctx context.Context, assetID string) error { diff --git a/internal/store/elasticsearch/discovery_repository_test.go b/internal/store/elasticsearch/discovery_repository_test.go index d777130f..a21ad3e7 100644 --- a/internal/store/elasticsearch/discovery_repository_test.go +++ b/internal/store/elasticsearch/discovery_repository_test.go @@ -398,27 +398,6 @@ func TestDiscoveryRepository_SyncAssets(t *testing.T) { indexName = "bigquery-test" ) - ast1 := asset.Asset{ - ID: "id1", - Type: asset.TypeTable, - Service: indexName, - URN: "urn1", - } - ast2 := asset.Asset{ - ID: "id2", - Type: asset.TypeTable, - Service: indexName, - URN: "urn2", - } - ast3 := asset.Asset{ - ID: "id3", - Type: asset.TypeTable, - Service: indexName, - URN: "urn3", - } - - assets := []asset.Asset{ast1, ast2, ast3} - cli, err := esTestServer.NewClient() require.NoError(t, err) @@ -434,7 +413,11 @@ func TestDiscoveryRepository_SyncAssets(t *testing.T) { repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"}) - err = repo.SyncAssets(ctx, indexName, assets) + _, err = repo.SyncAssets(ctx, indexName) require.NoError(t, err) + + alias := cli.Indices.GetAlias + resp, _ := alias(alias.WithIndex(indexName)) + require.NotEmpty(t, resp) }) } diff --git a/internal/store/elasticsearch/es.go b/internal/store/elasticsearch/es.go index 11aa821b..5014d905 100644 --- a/internal/store/elasticsearch/es.go +++ b/internal/store/elasticsearch/es.go @@ -170,7 +170,7 @@ func (c *Client) Init() (string, error) { return fmt.Sprintf("%q (server version %s)", info.ClusterName, info.Version.Number), nil } -func (c *Client) CreateIdx(ctx context.Context, discoveryOp, indexName string) (err error) { +func (c *Client) CreateIdx(ctx context.Context, discoveryOp, indexName, alias string) (err error) { defer func(start time.Time) { const op = "create_index" c.instrumentOp(ctx, instrumentParams{ @@ -181,7 +181,7 @@ func (c *Client) CreateIdx(ctx context.Context, discoveryOp, indexName string) ( }) }(time.Now()) - indexSettings := buildTypeIndexSettings() + indexSettings := buildTypeIndexSettings(alias) res, err := c.client.Indices.Create( indexName, c.client.Indices.Create.WithBody(strings.NewReader(indexSettings)), @@ -207,8 +207,16 @@ func (c *Client) CreateIdx(ctx context.Context, discoveryOp, indexName string) ( return nil } -func buildTypeIndexSettings() string { - return fmt.Sprintf(indexSettingsTemplate, serviceIndexMapping, defaultSearchIndex) +func buildTypeIndexSettings(alias string) string { + var aliasObj string + + if len(alias) > 0 { + aliasObj = fmt.Sprintf(`"aliases": { + %q: {} + },`, alias) + } + + return fmt.Sprintf(indexSettingsTemplate, serviceIndexMapping, aliasObj) } // checks for the existence of an index diff --git a/internal/store/elasticsearch/es_test.go b/internal/store/elasticsearch/es_test.go index 4290f905..201a643c 100644 --- a/internal/store/elasticsearch/es_test.go +++ b/internal/store/elasticsearch/es_test.go @@ -186,7 +186,7 @@ func TestElasticsearch(t *testing.T) { require.NoError(t, err) _, err = esClient.Init() assert.NoError(t, err) - err = esClient.CreateIdx(ctx, "", testCase.Service) + err = esClient.CreateIdx(ctx, "", testCase.Service, "universe") if testCase.ShouldFail { assert.Error(t, err) return diff --git a/internal/store/elasticsearch/schema.go b/internal/store/elasticsearch/schema.go index ccb3ea7f..313ea93d 100644 --- a/internal/store/elasticsearch/schema.go +++ b/internal/store/elasticsearch/schema.go @@ -5,9 +5,7 @@ package elasticsearch // and sets up the camelcase analyzer var indexSettingsTemplate = `{ "mappings": %s, - "aliases": { - %q: {} - }, + %s "settings": { "similarity": { "my_bm25_without_length_normalization": { diff --git a/internal/workermanager/discovery_worker.go b/internal/workermanager/discovery_worker.go index 3712e593..229be5fb 100644 --- a/internal/workermanager/discovery_worker.go +++ b/internal/workermanager/discovery_worker.go @@ -15,7 +15,7 @@ import ( type DiscoveryRepository interface { Upsert(context.Context, asset.Asset) error DeleteByURN(ctx context.Context, assetURN string) error - SyncAssets(ctx context.Context, indexName string, assets []asset.Asset) error + SyncAssets(ctx context.Context, indexName string) (cleanup func() error, err error) } func (m *Manager) EnqueueIndexAssetJob(ctx context.Context, ast asset.Asset) error { @@ -88,47 +88,37 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error { } } - assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ - Services: []string{service}, - Size: batchSize, - SortBy: "name", - }) + cleanup, err := m.discoveryRepo.SyncAssets(ctx, service) if err != nil { - return fmt.Errorf("sync asset: get assets: %w", err) - } - - if err := m.discoveryRepo.SyncAssets(ctx, service, assets); err != nil { return err } - if len(assets) == batchSize { // do remaining upsert after first batch completed - it := 1 - - for { - assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ - Services: []string{service}, - Size: batchSize, - Offset: it * batchSize, - SortBy: "name", - }) - if err != nil { - return fmt.Errorf("sync asset: get assets: %w", err) - } - - for _, ast := range assets { - if err := m.discoveryRepo.Upsert(ctx, ast); err != nil { - return err - } - } + it := 0 + + for { + assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ + Services: []string{service}, + Size: batchSize, + Offset: it * batchSize, + SortBy: "name", + }) + if err != nil { + return fmt.Errorf("sync asset: get assets: %w", err) + } - if len(assets) != batchSize { - break + for _, ast := range assets { + if err := m.discoveryRepo.Upsert(ctx, ast); err != nil { + return err } - it++ } + if len(assets) != batchSize { + break + } + it++ } - return nil + + return cleanup() } func (m *Manager) EnqueueDeleteAssetJob(ctx context.Context, urn string) error { diff --git a/internal/workermanager/in_situ_worker.go b/internal/workermanager/in_situ_worker.go index 13ba6564..42d301b1 100644 --- a/internal/workermanager/in_situ_worker.go +++ b/internal/workermanager/in_situ_worker.go @@ -57,49 +57,36 @@ func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) _ = m.jobRepo.Delete(ctx, jobID) }() - // need to limit this - assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ - Services: []string{service}, - Size: batchSize, - SortBy: "name", - }) + cleanup, err := m.discoveryRepo.SyncAssets(ctx, service) if err != nil { - return fmt.Errorf("sync asset: get assets: %w", err) - } - - if err := m.discoveryRepo.SyncAssets(ctx, service, assets); err != nil { return err } - if len(assets) == batchSize { // do remaining upsert after first batch completed - it := 1 - - for { - assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ - Services: []string{service}, - Size: batchSize, - Offset: it * batchSize, - SortBy: "name", - }) - if err != nil { - return fmt.Errorf("sync asset: get assets: %w", err) - } - - for _, ast := range assets { - if err := m.discoveryRepo.Upsert(ctx, ast); err != nil { - return err - } - } + it := 0 + for { + assets, err := m.assetRepo.GetAll(ctx, asset.Filter{ + Services: []string{service}, + Size: batchSize, + Offset: it * batchSize, + SortBy: "name", + }) + if err != nil { + return fmt.Errorf("sync asset: get assets: %w", err) + } - if len(assets) != batchSize { - break + for _, ast := range assets { + if err := m.discoveryRepo.Upsert(ctx, ast); err != nil { + return err } - it++ } + if len(assets) != batchSize { + break + } + it++ } - return nil + return cleanup() } func (*InSituWorker) Close() error { return nil } diff --git a/internal/workermanager/mocks/discovery_repository_mock.go b/internal/workermanager/mocks/discovery_repository_mock.go index f4cc1e93..4fa2bb79 100644 --- a/internal/workermanager/mocks/discovery_repository_mock.go +++ b/internal/workermanager/mocks/discovery_repository_mock.go @@ -66,18 +66,30 @@ func (_c *DiscoveryRepository_DeleteByURN_Call) RunAndReturn(run func(context.Co return _c } -// SyncAssets provides a mock function with given fields: ctx, indexName, assets -func (_m *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string, assets []asset.Asset) error { - ret := _m.Called(ctx, indexName, assets) +// SyncAssets provides a mock function with given fields: ctx, indexName +func (_m *DiscoveryRepository) SyncAssets(ctx context.Context, indexName string) (func() error, error) { + ret := _m.Called(ctx, indexName) + + var r0 func() error + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (func() error, error)); ok { + return rf(ctx, indexName) + } + if rf, ok := ret.Get(0).(func(context.Context, string) func() error); ok { + r0 = rf(ctx, indexName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(func() error) + } + } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, []asset.Asset) error); ok { - r0 = rf(ctx, indexName, assets) + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, indexName) } else { - r0 = ret.Error(0) + r1 = ret.Error(1) } - return r0 + return r0, r1 } // DiscoveryRepository_SyncAssets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncAssets' @@ -88,24 +100,23 @@ type DiscoveryRepository_SyncAssets_Call struct { // SyncAssets is a helper method to define mock.On call // - ctx context.Context // - indexName string -// - assets []asset.Asset -func (_e *DiscoveryRepository_Expecter) SyncAssets(ctx interface{}, indexName interface{}, assets interface{}) *DiscoveryRepository_SyncAssets_Call { - return &DiscoveryRepository_SyncAssets_Call{Call: _e.mock.On("SyncAssets", ctx, indexName, assets)} +func (_e *DiscoveryRepository_Expecter) SyncAssets(ctx interface{}, indexName interface{}) *DiscoveryRepository_SyncAssets_Call { + return &DiscoveryRepository_SyncAssets_Call{Call: _e.mock.On("SyncAssets", ctx, indexName)} } -func (_c *DiscoveryRepository_SyncAssets_Call) Run(run func(ctx context.Context, indexName string, assets []asset.Asset)) *DiscoveryRepository_SyncAssets_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) Run(run func(ctx context.Context, indexName string)) *DiscoveryRepository_SyncAssets_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].([]asset.Asset)) + run(args[0].(context.Context), args[1].(string)) }) return _c } -func (_c *DiscoveryRepository_SyncAssets_Call) Return(_a0 error) *DiscoveryRepository_SyncAssets_Call { - _c.Call.Return(_a0) +func (_c *DiscoveryRepository_SyncAssets_Call) Return(cleanup func() error, err error) *DiscoveryRepository_SyncAssets_Call { + _c.Call.Return(cleanup, err) return _c } -func (_c *DiscoveryRepository_SyncAssets_Call) RunAndReturn(run func(context.Context, string, []asset.Asset) error) *DiscoveryRepository_SyncAssets_Call { +func (_c *DiscoveryRepository_SyncAssets_Call) RunAndReturn(run func(context.Context, string) (func() error, error)) *DiscoveryRepository_SyncAssets_Call { _c.Call.Return(run) return _c }