diff --git a/task/bq2bq/executor/bumblebee/transformation.py b/task/bq2bq/executor/bumblebee/transformation.py index 2a82018..7d9df90 100644 --- a/task/bq2bq/executor/bumblebee/transformation.py +++ b/task/bq2bq/executor/bumblebee/transformation.py @@ -70,7 +70,7 @@ def transform(self): bq_destination_table = self.bigquery_service.get_table(self.task_config.destination_table) if bq_destination_table.time_partitioning is None: task_queries = self.sql_query.split(OPTIMUS_QUERY_BREAK_MARKER) - transformation = TableTransformation(self.bigquery_service, + transformation = NonPartitionedTableTransformation(self.bigquery_service, self.task_config, task_queries[0], self.dstart, @@ -199,6 +199,42 @@ def execute(self): logger.info("finished") + +class NonPartitionedTableTransformation: + """ + Query transformation effects whole non partitioned table + """ + + def __init__(self, bigquery_service: BigqueryService, + task_config: TaskConfig, + task_query: str, + dstart: datetime, + dend: datetime, + dry_run: bool, + execution_time: datetime): + self.bigquery_service = bigquery_service + self.task_config = task_config + self.task_query = task_query + self.dry_run = dry_run + self.window = CustomWindow(dstart, dend) + self.execution_time = execution_time + + def transform(self): + loader = TableLoader(self.bigquery_service, self.task_config.destination_table, LoadMethod.APPEND, + self.task_config.allow_field_addition) + logger.info("create transformation for table") + + task = NonPartitionTransformation(self.task_config, + loader, + self.task_query, + self.window, + self.dry_run, + self.execution_time) + + self.bigquery_service.execute_query("truncate table `gtfndata-integration.playground.replace_table_target`;") + task.execute() + + class TableTransformation: """ Query transformation effects whole non partitioned table @@ -270,6 +306,38 @@ def transform(self): task.execute() +class NonPartitionTransformation: + def __init__(self, + task_config: TaskConfig, + loader: BaseLoader, + query: str, + window: Window, + dry_run: bool, + execution_time: datetime): + self.dry_run = dry_run + self.loader = loader + + destination_parameter = DestinationParameter(task_config.destination_table) + window_parameter = WindowParameter(window) + execution_parameter = ExecutionParameter(execution_time) + + self.query = Query(query).apply_parameter(window_parameter).apply_parameter( + execution_parameter).apply_parameter(destination_parameter) + + def execute(self): + logger.info("start transformation job") + self.query.print_with_logger(logger) + + result = None + if not self.dry_run: + result = self.loader.load(self.query) + + logger.info(result) + logger.info("finished") + + async def async_execute(self): + self.execute() + class PartitionTransformation: def __init__(self, task_config: TaskConfig, diff --git a/task/bq2bq/factory.go b/task/bq2bq/factory.go index fc7bf83..7dbd4d4 100644 --- a/task/bq2bq/factory.go +++ b/task/bq2bq/factory.go @@ -6,6 +6,7 @@ import ( "cloud.google.com/go/bigquery" "github.com/googleapis/google-cloud-go-testing/bigquery/bqiface" + "github.com/hashicorp/go-hclog" "golang.org/x/oauth2/google" "google.golang.org/api/drive/v2" "google.golang.org/api/option" @@ -35,6 +36,6 @@ func (fac *DefaultBQClientFactory) New(ctx context.Context, svcAccount string) ( type DefaultUpstreamExtractorFactory struct { } -func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client) (UpstreamExtractor, error) { - return upstream.NewExtractor(client) +func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) { + return upstream.NewExtractor(client, logger) } diff --git a/task/bq2bq/main.go b/task/bq2bq/main.go index c5495b5..6a8c196 100644 --- a/task/bq2bq/main.go +++ b/task/bq2bq/main.go @@ -47,11 +47,11 @@ type ClientFactory interface { } type UpstreamExtractor interface { - ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) + ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) } type ExtractorFactory interface { - New(client bqiface.Client) (UpstreamExtractor, error) + New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) } type BQ2BQ struct { @@ -224,10 +224,7 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat return response, fmt.Errorf("error extracting upstreams: %w", err) } - flattenedUpstreams := upstream.FlattenUpstreams(upstreams) - uniqueUpstreams := upstream.UniqueFilterResources(flattenedUpstreams) - - formattedUpstreams := b.formatUpstreams(uniqueUpstreams, func(r upstream.Resource) string { + formattedUpstreams := b.formatUpstreams(upstreams, func(r upstream.Resource) string { name := fmt.Sprintf("%s:%s.%s", r.Project, r.Dataset, r.Name) return fmt.Sprintf(plugin.DestinationURNFormat, selfTable.Type, name) }) @@ -237,7 +234,7 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat return response, nil } -func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) { +func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) { spanCtx, span := StartChildSpan(ctx, "extractUpstreams") defer span.End() @@ -247,7 +244,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string return nil, fmt.Errorf("error creating bigquery client: %w", err) } - extractor, err := b.ExtractorFac.New(client) + extractor, err := b.ExtractorFac.New(client, b.logger) if err != nil { return nil, fmt.Errorf("error initializing upstream extractor: %w", err) } @@ -263,6 +260,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string } b.logger.Error("error extracting upstreams", err) + return nil, err } return upstreams, nil diff --git a/task/bq2bq/main_test.go b/task/bq2bq/main_test.go index a881ea7..b879cc0 100644 --- a/task/bq2bq/main_test.go +++ b/task/bq2bq/main_test.go @@ -9,6 +9,7 @@ import ( "github.com/googleapis/google-cloud-go-testing/bigquery/bqiface" "github.com/goto/optimus/sdk/plugin" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -33,8 +34,8 @@ type extractorFactoryMock struct { mock.Mock } -func (e *extractorFactoryMock) New(client bqiface.Client) (UpstreamExtractor, error) { - args := e.Called(client) +func (e *extractorFactoryMock) New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) { + args := e.Called(client, logger) r1, ok := args.Get(0).(UpstreamExtractor) if !ok { @@ -48,10 +49,10 @@ type extractorMock struct { mock.Mock } -func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) { +func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) { args := e.Called(ctx, query, resourcesToIgnore) - r1, ok := args.Get(0).([]*upstream.Upstream) + r1, ok := args.Get(0).([]upstream.Resource) if !ok { return nil, args.Error(1) } @@ -61,6 +62,7 @@ func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, reso func TestBQ2BQ(t *testing.T) { ctx := context.Background() + logger := hclog.NewNullLogger() t.Run("GetName", func(t *testing.T) { t.Run("should return name bq2bq", func(t *testing.T) { @@ -263,22 +265,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"` extractor := new(extractorMock) extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}). - Return([]*upstream.Upstream{ + Return([]upstream.Resource{ { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table1", - }, + Project: "proj", + Dataset: "dataset", + Name: "table1", }, }, nil) extractorFac := new(extractorFactoryMock) - extractorFac.On("New", client).Return(extractor, nil) + extractorFac.On("New", client, logger).Return(extractor, nil) b := &BQ2BQ{ ClientFac: bqClientFac, ExtractorFac: extractorFac, + logger: logger, } got, err := b.GenerateDependencies(ctx, data) if err != nil { @@ -335,45 +336,26 @@ Select * from table where ts > "2021-01-16T00:00:00Z"` extractor := new(extractorMock) extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}). - Return([]*upstream.Upstream{ - { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table1", - }, - Upstreams: []*upstream.Upstream{ - { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table2", - }, - }, - }, - }, - { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table2", - }, - }, - { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table1", - }, + Return([]upstream.Resource{ + { + Project: "proj", + Dataset: "dataset", + Name: "table1", + }, + { + Project: "proj", + Dataset: "dataset", + Name: "table2", }, }, nil) extractorFac := new(extractorFactoryMock) - extractorFac.On("New", client).Return(extractor, nil) + extractorFac.On("New", client, logger).Return(extractor, nil) b := &BQ2BQ{ ClientFac: bqClientFac, ExtractorFac: extractorFac, + logger: logger, } got, err := b.GenerateDependencies(ctx, data) if err != nil { @@ -430,14 +412,15 @@ Select * from table where ts > "2021-01-16T00:00:00Z"` extractor := new(extractorMock) extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}). - Return([]*upstream.Upstream{}, nil) + Return([]upstream.Resource{}, nil) extractorFac := new(extractorFactoryMock) - extractorFac.On("New", client).Return(extractor, nil) + extractorFac.On("New", client, logger).Return(extractor, nil) b := &BQ2BQ{ ClientFac: bqClientFac, ExtractorFac: extractorFac, + logger: logger, } got, err := b.GenerateDependencies(ctx, data) if err != nil { @@ -494,22 +477,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"` extractor := new(extractorMock) extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}). - Return([]*upstream.Upstream{ + Return([]upstream.Resource{ { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table1", - }, + Project: "proj", + Dataset: "dataset", + Name: "table1", }, }, nil) extractorFac := new(extractorFactoryMock) - extractorFac.On("New", client).Return(extractor, nil) + extractorFac.On("New", client, logger).Return(extractor, nil) b := &BQ2BQ{ ClientFac: bqClientFac, ExtractorFac: extractorFac, + logger: logger, } got, err := b.GenerateDependencies(ctx, data) if err != nil { @@ -566,22 +548,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"` extractor := new(extractorMock) extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}). - Return([]*upstream.Upstream{ + Return([]upstream.Resource{ { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table1", - }, + Project: "proj", + Dataset: "dataset", + Name: "table1", }, }, nil) extractorFac := new(extractorFactoryMock) - extractorFac.On("New", client).Return(extractor, nil) + extractorFac.On("New", client, logger).Return(extractor, nil) b := &BQ2BQ{ ClientFac: bqClientFac, ExtractorFac: extractorFac, + logger: logger, } got, err := b.GenerateDependencies(ctx, data) if err != nil { diff --git a/task/bq2bq/optimus-plugin-bq2bq.yaml b/task/bq2bq/optimus-plugin-bq2bq.yaml index 4c4456b..1b6e1cd 100644 --- a/task/bq2bq/optimus-plugin-bq2bq.yaml +++ b/task/bq2bq/optimus-plugin-bq2bq.yaml @@ -2,10 +2,6 @@ name: bq2bq description: BigQuery to BigQuery transformation task plugintype: task pluginversion: {{.version}} -asset_parsers: - bq: - - filepath: "./query.sql" -destination_urn_template: "bigquery://:." image: docker.io/gotocompany/optimus-task-bq2bq-executor:{{.version}} entrypoint: script: "python3 /opt/bumblebee/main.py" diff --git a/task/bq2bq/upstream/extractor.go b/task/bq2bq/upstream/extractor.go index aed9bea..e956fb0 100644 --- a/task/bq2bq/upstream/extractor.go +++ b/task/bq2bq/upstream/extractor.go @@ -8,102 +8,130 @@ import ( "sync" "github.com/googleapis/google-cloud-go-testing/bigquery/bqiface" + "github.com/hashicorp/go-hclog" ) type Extractor struct { mutex *sync.Mutex client bqiface.Client - schemaToUpstreams map[string][]*Upstream + urnToUpstreams map[string][]Resource + logger hclog.Logger } -func NewExtractor(client bqiface.Client) (*Extractor, error) { +func NewExtractor(client bqiface.Client, logger hclog.Logger) (*Extractor, error) { if client == nil { return nil, errors.New("client is nil") } + if logger == nil { + return nil, errors.New("logger is nil") + } + return &Extractor{ - mutex: &sync.Mutex{}, - client: client, - schemaToUpstreams: make(map[string][]*Upstream), + mutex: &sync.Mutex{}, + client: client, + urnToUpstreams: make(map[string][]Resource), + logger: logger, }, nil } -func (e *Extractor) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []Resource) ([]*Upstream, error) { +func (e *Extractor) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []Resource) ([]Resource, error) { ignoredResources := make(map[Resource]bool) for _, r := range resourcesToIgnore { ignoredResources[r] = true } - metResource := make(map[Resource]bool) - return e.extractUpstreamsFromQuery(ctx, query, ignoredResources, metResource, ParseTopLevelUpstreamsFromQuery) + encounteredResources := make(map[Resource]bool) + return e.extractUpstreamsFromQuery(ctx, query, ignoredResources, encounteredResources, ParseTopLevelUpstreamsFromQuery) } func (e *Extractor) extractUpstreamsFromQuery( ctx context.Context, query string, - ignoredResources, metResource map[Resource]bool, + ignoredResources, encounteredResources map[Resource]bool, parseFn QueryParser, -) ([]*Upstream, error) { - upstreamResources := parseFn(query) - - uniqueUpstreamResources := UniqueFilterResources(upstreamResources) +) ([]Resource, error) { + resources := parseFn(query) + uniqueResources := UniqueFilterResources(resources) + filteredResources := FilterResources(uniqueResources, func(r Resource) bool { return ignoredResources[r] }) - filteredUpstreamResources := FilterResources(uniqueUpstreamResources, func(r Resource) bool { return ignoredResources[r] }) - - resourceGroups := GroupResources(filteredUpstreamResources) - - var output []*Upstream + output := filteredResources var errorMessages []string + resourceGroups := GroupResources(filteredResources) for _, group := range resourceGroups { - schemas, err := ReadSchemasUnderGroup(ctx, e.client, group) + result, err := e.getUpstreamsFromGroup(ctx, group, ignoredResources, encounteredResources) if err != nil { errorMessages = append(errorMessages, err.Error()) } - nestedable, rest := splitNestedableFromRest(schemas) + output = append(output, result...) + } - restsNodes := convertSchemasToNodes(rest) - output = append(output, restsNodes...) + output = UniqueFilterResources(output) - nestedNodes, err := e.extractNestedNodes(ctx, nestedable, ignoredResources, metResource) - if err != nil { + if len(errorMessages) > 0 { + return output, fmt.Errorf("error reading upstream: [%s]", strings.Join(errorMessages, ", ")) + } + return output, nil +} + +func (e *Extractor) getUpstreamsFromGroup( + ctx context.Context, group *ResourceGroup, + ignoredResources, encounteredResources map[Resource]bool, +) ([]Resource, error) { + var output []Resource + var errorMessages []string + + schemas, err := ReadSchemasUnderGroup(ctx, e.client, group) + if err != nil { + if e.isIgnorableError(err) { + e.logger.Error("ignoring error when reading schema for [%s.%s]: %v", group.Project, group.Dataset, err) + } else { errorMessages = append(errorMessages, err.Error()) } + } + + nestedable, unnestedable := splitNestedableFromRest(schemas) - output = append(output, nestedNodes...) + unnestedableResources := convertSchemasToResources(unnestedable) + output = append(output, unnestedableResources...) + + nestedResources, err := e.extractNestedableUpstreams(ctx, nestedable, ignoredResources, encounteredResources) + if err != nil { + errorMessages = append(errorMessages, err.Error()) } + output = append(output, nestedResources...) if len(errorMessages) > 0 { - return output, fmt.Errorf("error reading upstream: [%s]", strings.Join(errorMessages, ", ")) + return output, fmt.Errorf("error getting upstream for [%s.%s]: %s", group.Project, group.Dataset, strings.Join(errorMessages, ", ")) } + return output, nil } -func (e *Extractor) extractNestedNodes( +func (e *Extractor) extractNestedableUpstreams( ctx context.Context, schemas []*Schema, - ignoredResources, metResource map[Resource]bool, -) ([]*Upstream, error) { - var output []*Upstream + ignoredResources, encounteredResources map[Resource]bool, +) ([]Resource, error) { + var output []Resource var errorMessages []string for _, sch := range schemas { - if metResource[sch.Resource] { - msg := fmt.Sprintf("circular reference is detected: [%s]", e.getCircularURNs(metResource)) + if encounteredResources[sch.Resource] { + msg := fmt.Sprintf("circular reference is detected: [%s]", e.getCircularURNs(encounteredResources)) errorMessages = append(errorMessages, msg) continue } - metResource[sch.Resource] = true + encounteredResources[sch.Resource] = true - nodes, err := e.getNodes(ctx, sch, ignoredResources, metResource) + upstreams, err := e.getUpstreams(ctx, sch, ignoredResources, encounteredResources) if err != nil { errorMessages = append(errorMessages, err.Error()) } - output = append(output, &Upstream{ - Resource: sch.Resource, - Upstreams: nodes, - }) + output = append(output, sch.Resource) + output = append(output, upstreams...) } if len(errorMessages) > 0 { @@ -112,34 +140,39 @@ func (e *Extractor) extractNestedNodes( return output, nil } -func (e *Extractor) getNodes( +func (e *Extractor) getUpstreams( ctx context.Context, schema *Schema, - ignoredResources, metResource map[Resource]bool, -) ([]*Upstream, error) { - key := schema.Resource.URN() + ignoredResources, encounteredResources map[Resource]bool, +) ([]Resource, error) { + urn := schema.Resource.URN() e.mutex.Lock() - existingNodes, ok := e.schemaToUpstreams[key] + existingUpstreams, ok := e.urnToUpstreams[urn] e.mutex.Unlock() if ok { - return existingNodes, nil + return existingUpstreams, nil } - nodes, err := e.extractUpstreamsFromQuery(ctx, schema.DDL, ignoredResources, metResource, ParseNestedUpsreamsFromDDL) + upstreams, err := e.extractUpstreamsFromQuery(ctx, schema.DDL, ignoredResources, encounteredResources, ParseNestedUpsreamsFromDDL) e.mutex.Lock() - e.schemaToUpstreams[key] = nodes + e.urnToUpstreams[urn] = upstreams e.mutex.Unlock() - return nodes, err + return upstreams, err } -func (*Extractor) getCircularURNs(metResource map[Resource]bool) string { +func (*Extractor) getCircularURNs(encounteredResources map[Resource]bool) string { var urns []string - for resource := range metResource { + for resource := range encounteredResources { urns = append(urns, resource.URN()) } return strings.Join(urns, ", ") } + +func (*Extractor) isIgnorableError(err error) bool { + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "access denied") || strings.Contains(msg, "user does not have permission") +} diff --git a/task/bq2bq/upstream/extractor_test.go b/task/bq2bq/upstream/extractor_test.go index f03e64a..a1c7387 100644 --- a/task/bq2bq/upstream/extractor_test.go +++ b/task/bq2bq/upstream/extractor_test.go @@ -2,10 +2,12 @@ package upstream_test import ( "context" + "errors" "testing" "cloud.google.com/go/bigquery" "github.com/googleapis/google-cloud-go-testing/bigquery/bqiface" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "google.golang.org/api/iterator" @@ -16,17 +18,29 @@ import ( func TestNewExtractor(t *testing.T) { t.Run("should return nil and error if client is nil", func(t *testing.T) { var client bqiface.Client + logger := hclog.NewNullLogger() - actualExtractor, actualError := upstream.NewExtractor(client) + actualExtractor, actualError := upstream.NewExtractor(client, logger) assert.Nil(t, actualExtractor) assert.EqualError(t, actualError, "client is nil") }) + t.Run("should return nil and error if logger is nil", func(t *testing.T) { + client := new(ClientMock) + var logger hclog.Logger + + actualExtractor, actualError := upstream.NewExtractor(client, logger) + + assert.Nil(t, actualExtractor) + assert.EqualError(t, actualError, "logger is nil") + }) + t.Run("should return extractor and nil if no error is encountered", func(t *testing.T) { client := new(ClientMock) + logger := hclog.NewNullLogger() - actualExtractor, actualError := upstream.NewExtractor(client) + actualExtractor, actualError := upstream.NewExtractor(client, logger) assert.NotNil(t, actualExtractor) assert.NoError(t, actualError) @@ -34,36 +48,34 @@ func TestNewExtractor(t *testing.T) { } func TestExtractor(t *testing.T) { + logger := hclog.NewNullLogger() + t.Run("ExtractUpstreams", func(t *testing.T) { t.Run("should pass the existing spec", func(t *testing.T) { testCases := []struct { Message string QueryRequest string - ExpectedUpstreams []*upstream.Upstream + ExpectedUpstreams []upstream.Resource }{ { Message: "should return upstreams and generate dependencies for select statements", QueryRequest: "Select * from proj.dataset.table1", - ExpectedUpstreams: []*upstream.Upstream{ + ExpectedUpstreams: []upstream.Resource{ { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table1", - }, + Project: "proj", + Dataset: "dataset", + Name: "table1", }, }, }, { Message: "should return unique upstreams and nil for select statements", QueryRequest: "Select * from proj.dataset.table1 t1 join proj.dataset.table1 t2 on t1.col1 = t2.col1", - ExpectedUpstreams: []*upstream.Upstream{ + ExpectedUpstreams: []upstream.Resource{ { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table1", - }, + Project: "proj", + Dataset: "dataset", + Name: "table1", }, }, }, @@ -75,13 +87,11 @@ func TestExtractor(t *testing.T) { { Message: "should return filtered upstreams for select statements with ignore statement for view", QueryRequest: "Select * from proj.dataset.table1 t1 join proj.dataset.table1 t2 on t1.col1 = t2.col1", - ExpectedUpstreams: []*upstream.Upstream{ + ExpectedUpstreams: []upstream.Resource{ { - Resource: upstream.Resource{ - Project: "proj", - Dataset: "dataset", - Name: "table1", - }, + Project: "proj", + Dataset: "dataset", + Name: "table1", }, }, }, @@ -99,7 +109,7 @@ func TestExtractor(t *testing.T) { }, } - extractor, err := upstream.NewExtractor(client) + extractor, err := upstream.NewExtractor(client, logger) assert.NotNil(t, extractor) assert.NoError(t, err) @@ -122,7 +132,7 @@ func TestExtractor(t *testing.T) { } }) - t.Run("should return upstreams with its nested ones if found any", func(t *testing.T) { + t.Run("should return unique upstreams with its nested ones if found any", func(t *testing.T) { client := new(ClientMock) query := new(QueryMock) rowIterator := new(RowIteratorMock) @@ -134,7 +144,7 @@ func TestExtractor(t *testing.T) { }, } - extractor, err := upstream.NewExtractor(client) + extractor, err := upstream.NewExtractor(client, logger) assert.NotNil(t, extractor) assert.NoError(t, err) @@ -160,35 +170,99 @@ func TestExtractor(t *testing.T) { }).Return(nil).Once() rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once() - expectedUpstreams := []*upstream.Upstream{ + expectedUpstreams := []upstream.Resource{ { - Resource: upstream.Resource{ - Project: "project_test_1", - Dataset: "dataset_test_1", - Name: "name_test_1", - }, + Project: "project_test_1", + Dataset: "dataset_test_1", + Name: "name_test_1", }, { - Resource: upstream.Resource{ - Project: "project_test_2", - Dataset: "dataset_test_2", - Name: "name_test_2", - }, - Upstreams: []*upstream.Upstream{ - { - Resource: upstream.Resource{ - Project: "project_test_3", - Dataset: "dataset_test_3", - Name: "name_test_3", - }, - }, - }, + Project: "project_test_2", + Dataset: "dataset_test_2", + Name: "name_test_2", + }, + { + Project: "project_test_3", + Dataset: "dataset_test_3", + Name: "name_test_3", }, } actualUpstreams, actualError := extractor.ExtractUpstreams(ctx, queryRequest, resourcestoIgnore) - assert.EqualValues(t, expectedUpstreams, actualUpstreams) + assert.ElementsMatch(t, expectedUpstreams, actualUpstreams) + assert.NoError(t, actualError) + }) + + t.Run("should return upstreams and nil if error being encountered is related to access denied", func(t *testing.T) { + client := new(ClientMock) + query := new(QueryMock) + resourcestoIgnore := []upstream.Resource{ + { + Project: "project_test_0", + Dataset: "dataset_test_0", + Name: "name_test_0", + }, + } + + extractor, err := upstream.NewExtractor(client, logger) + assert.NotNil(t, extractor) + assert.NoError(t, err) + + ctx := context.Background() + queryRequest := "select * from `project_test_1.dataset_test_1.name_test_1`" + + client.On("Query", mock.Anything).Return(query) + + query.On("Read", mock.Anything).Return(nil, errors.New("Access Denied")) + + expectedUpstreams := []upstream.Resource{ + { + Project: "project_test_1", + Dataset: "dataset_test_1", + Name: "name_test_1", + }, + } + + actualUpstreams, actualError := extractor.ExtractUpstreams(ctx, queryRequest, resourcestoIgnore) + + assert.ElementsMatch(t, expectedUpstreams, actualUpstreams) + assert.NoError(t, actualError) + }) + + t.Run("should return upstreams and nil if error being encountered is related to user does not have permission", func(t *testing.T) { + client := new(ClientMock) + query := new(QueryMock) + resourcestoIgnore := []upstream.Resource{ + { + Project: "project_test_0", + Dataset: "dataset_test_0", + Name: "name_test_0", + }, + } + + extractor, err := upstream.NewExtractor(client, logger) + assert.NotNil(t, extractor) + assert.NoError(t, err) + + ctx := context.Background() + queryRequest := "select * from `project_test_1.dataset_test_1.name_test_1`" + + client.On("Query", mock.Anything).Return(query) + + query.On("Read", mock.Anything).Return(nil, errors.New("User does not have permission")) + + expectedUpstreams := []upstream.Resource{ + { + Project: "project_test_1", + Dataset: "dataset_test_1", + Name: "name_test_1", + }, + } + + actualUpstreams, actualError := extractor.ExtractUpstreams(ctx, queryRequest, resourcestoIgnore) + + assert.ElementsMatch(t, expectedUpstreams, actualUpstreams) assert.NoError(t, actualError) }) @@ -198,7 +272,7 @@ func TestExtractor(t *testing.T) { rowIterator := new(RowIteratorMock) resourcestoIgnore := []upstream.Resource{} - extractor, err := upstream.NewExtractor(client) + extractor, err := upstream.NewExtractor(client, logger) assert.NotNil(t, extractor) assert.NoError(t, err) @@ -230,37 +304,28 @@ func TestExtractor(t *testing.T) { }).Return(nil).Once() rowIterator.On("Next", mock.Anything).Return(iterator.Done).Once() - expectedUpstreams := []*upstream.Upstream{ + expectedUpstreams := []upstream.Resource{ { - Resource: upstream.Resource{ - Project: "project_test_1", - Dataset: "dataset_test_1", - Name: "cyclic_test_1", - }, - Upstreams: []*upstream.Upstream{ - { - Resource: upstream.Resource{ - Project: "project_test_3", - Dataset: "dataset_test_3", - Name: "cyclic_test_3", - }, - Upstreams: []*upstream.Upstream{ - { - Resource: upstream.Resource{ - Project: "project_test_2", - Dataset: "dataset_test_2", - Name: "cyclic_test_2", - }, - }, - }, - }, - }, + + Project: "project_test_1", + Dataset: "dataset_test_1", + Name: "cyclic_test_1", + }, + { + Project: "project_test_2", + Dataset: "dataset_test_2", + Name: "cyclic_test_2", + }, + { + Project: "project_test_3", + Dataset: "dataset_test_3", + Name: "cyclic_test_3", }, } actualUpstreams, actualError := extractor.ExtractUpstreams(ctx, queryRequest, resourcestoIgnore) - assert.EqualValues(t, expectedUpstreams, actualUpstreams) + assert.ElementsMatch(t, expectedUpstreams, actualUpstreams) assert.ErrorContains(t, actualError, "circular reference is detected") }) }) diff --git a/task/bq2bq/upstream/schema.go b/task/bq2bq/upstream/schema.go index 19cdaf8..2be8d95 100644 --- a/task/bq2bq/upstream/schema.go +++ b/task/bq2bq/upstream/schema.go @@ -179,12 +179,11 @@ func splitNestedableFromRest(schemas []*Schema) (nestedable, rests []*Schema) { return nestedable, rests } -func convertSchemasToNodes(schemas []*Schema) []*Upstream { - output := make([]*Upstream, len(schemas)) +func convertSchemasToResources(schemas []*Schema) []Resource { + output := make([]Resource, len(schemas)) + for i, sch := range schemas { - output[i] = &Upstream{ - Resource: sch.Resource, - } + output[i] = sch.Resource } return output diff --git a/task/bq2bq/upstream/upstream.go b/task/bq2bq/upstream/upstream.go deleted file mode 100644 index 3306607..0000000 --- a/task/bq2bq/upstream/upstream.go +++ /dev/null @@ -1,22 +0,0 @@ -package upstream - -type Upstream struct { - Resource Resource - Upstreams []*Upstream -} - -func FlattenUpstreams(upstreams []*Upstream) []Resource { - var output []Resource - for _, u := range upstreams { - if u == nil { - continue - } - - output = append(output, u.Resource) - - nested := FlattenUpstreams(u.Upstreams) - output = append(output, nested...) - } - - return output -} diff --git a/task/bq2bq/upstream/upstream_test.go b/task/bq2bq/upstream/upstream_test.go deleted file mode 100644 index 150da78..0000000 --- a/task/bq2bq/upstream/upstream_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package upstream_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/goto/transformers/task/bq2bq/upstream" -) - -func TestFlattenUpstreams(t *testing.T) { - t.Run("should return flattened upstream in the form of resource", func(t *testing.T) { - upstreams := []*upstream.Upstream{ - { - Resource: upstream.Resource{ - Project: "project_test_1", - Dataset: "dataset_test_1", - Name: "name_test_1", - }, - }, - { - Resource: upstream.Resource{ - Project: "project_test_2", - Dataset: "dataset_test_2", - Name: "name_test_2", - }, - Upstreams: []*upstream.Upstream{ - { - Resource: upstream.Resource{ - Project: "project_test_3", - Dataset: "dataset_test_3", - Name: "name_test_3", - }, - }, - { - Resource: upstream.Resource{ - Project: "project_test_4", - Dataset: "dataset_test_4", - Name: "name_test_4", - }, - }, - nil, - }, - }, - } - - expectedResources := []upstream.Resource{ - { - Project: "project_test_1", - Dataset: "dataset_test_1", - Name: "name_test_1", - }, - { - Project: "project_test_2", - Dataset: "dataset_test_2", - Name: "name_test_2", - }, - { - Project: "project_test_3", - Dataset: "dataset_test_3", - Name: "name_test_3", - }, - { - Project: "project_test_4", - Dataset: "dataset_test_4", - Name: "name_test_4", - }, - } - - actualResources := upstream.FlattenUpstreams(upstreams) - - assert.Equal(t, expectedResources, actualResources) - }) -}