From 9ec72b051cadc669147b4982e6003833c1d88e39 Mon Sep 17 00:00:00 2001 From: Reeba Qureshi <64488642+reeba212@users.noreply.github.com> Date: Mon, 9 Sep 2024 19:58:04 +0530 Subject: [PATCH] [yaml] Adding Spanner IO Providers for Beam YAML (#31987) * Add Spanner IO providers to YAML SDK * add handling logic for more datatypes * delete examples * minor changes * minor change * add integration test * add docs * minor change * minor changes 1. Removed serialiazability from ErrorHandling.java 2. Removed double map definitions from MutationUtils.java 3. Added checkNotNull in spanner write provider 4. Modified some variables in spanner wrapper 5. Change instance id in integration tests * Update spanner_wrapper.py import retry * minor changes 1. replace checknotnull with checkargument in spanner read provider 2. use the correct table name (tmp_table) in integration test * minor changes 1. Added serializable to error handling 2. Corrected validation methods in spanner read 3. Added retry import and removed default project name in spanner wrapper 4. Corrected instance and database names in spanner integration test 5. Corrected table name in query * formatting * Update SpannerWriteSchemaTransformProvider.java * correct lint failures * correct lint failures * correct lint failures * Update SpannerReadSchemaTransformProvider.java * correct lint failures * Update SpannerWriteSchemaTransformProvider.java * spanner version update --- .../transforms/providers/ErrorHandling.java | 3 +- .../sdk/io/gcp/spanner/MutationUtils.java | 60 +++++ .../SpannerReadSchemaTransformProvider.java | 235 ++++++++++++++++++ .../SpannerWriteSchemaTransformProvider.java | 160 +++++++++--- .../apache_beam/io/gcp/spanner_wrapper.py | 76 ++++++ .../apache_beam/yaml/integration_tests.py | 17 ++ sdks/python/apache_beam/yaml/standard_io.yaml | 27 ++ .../apache_beam/yaml/tests/spanner.yaml | 95 +++++++ sdks/python/setup.py | 2 +- 9 files changed, 645 insertions(+), 30 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java create mode 100644 sdks/python/apache_beam/io/gcp/spanner_wrapper.py create mode 100644 sdks/python/apache_beam/yaml/tests/spanner.yaml diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java index 7fa29708c9ff..053521dbfb39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java @@ -18,13 +18,14 @@ package org.apache.beam.sdk.schemas.transforms.providers; import com.google.auto.value.AutoValue; +import java.io.Serializable; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.values.Row; @AutoValue -public abstract class ErrorHandling { +public abstract class ErrorHandling implements Serializable { @SchemaFieldDescription("The name of the output PCollection containing failed writes.") public abstract String getOutput(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java index c0654b2cb05f..5a106a34b0c6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java @@ -26,8 +26,11 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Key; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Value; import java.math.BigDecimal; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.StreamSupport; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -351,4 +354,61 @@ private static void addIterableToMutationBuilder( beamIterableType.getTypeName())); } } + + public static Row createRowFromMutation(Schema schema, Mutation mutation) { + Map mutationHashMap = new HashMap<>(); + mutation + .asMap() + .forEach( + (column, value) -> mutationHashMap.put(column, convertValueToBeamFieldType(value))); + return Row.withSchema(schema).withFieldValues(mutationHashMap).build(); + } + + public static Object convertValueToBeamFieldType(Value value) { + switch (value.getType().getCode()) { + case BOOL: + return value.getBool(); + case BYTES: + return value.getBytes(); + case DATE: + return value.getDate(); + case INT64: + return value.getInt64(); + case FLOAT64: + return value.getFloat64(); + case NUMERIC: + return value.getNumeric(); + case TIMESTAMP: + return value.getTimestamp(); + case STRING: + return value.getString(); + case JSON: + return value.getJson(); + case ARRAY: + switch (value.getType().getArrayElementType().getCode()) { + case BOOL: + return value.getBoolArray(); + case BYTES: + return value.getBytesArray(); + case DATE: + return value.getDateArray(); + case INT64: + return value.getInt64Array(); + case FLOAT64: + return value.getFloat64Array(); + case NUMERIC: + return value.getNumericArray(); + case TIMESTAMP: + return value.getTimestampArray(); + case STRING: + return value.getStringArray(); + case JSON: + return value.getJsonArray(); + default: + return value.toString(); + } + default: + return value.toString(); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java new file mode 100644 index 000000000000..9820bb39d09d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import com.google.cloud.spanner.Struct; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/** + * A provider for reading from Cloud Spanner using a Schema Transform Provider. + * + *

This provider enables reading from Cloud Spanner using a specified SQL query or by directly + * accessing a table and its columns. It supports configuration through the {@link + * SpannerReadSchemaTransformConfiguration} class, allowing users to specify project, instance, + * database, table, query, and columns. + * + *

The transformation leverages the {@link SpannerIO} to perform the read operation and maps the + * results to Beam rows, preserving the schema. + * + *

Example usage in a YAML pipeline using query: + * + *

{@code
+ * pipeline:
+ *   transforms:
+ *     - type: ReadFromSpanner
+ *       name: ReadShipments
+ *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         query: 'SELECT * FROM shipments'
+ * }
+ * + *

Example usage in a YAML pipeline using a table and columns: + * + *

{@code
+ * pipeline:
+ *   transforms:
+ *     - type: ReadFromSpanner
+ *       name: ReadShipments
+ *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         table: 'shipments'
+ *         columns: ['customer_id', 'customer_name']
+ * }
+ */ +@AutoService(SchemaTransformProvider.class) +public class SpannerReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> { + + static class SpannerSchemaTransformRead extends SchemaTransform implements Serializable { + private final SpannerReadSchemaTransformConfiguration configuration; + + SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be null."); + SpannerIO.Read read = + SpannerIO.readWithSchema() + .withProjectId(configuration.getProjectId()) + .withInstanceId(configuration.getInstanceId()) + .withDatabaseId(configuration.getDatabaseId()); + + if (!Strings.isNullOrEmpty(configuration.getQuery())) { + read = read.withQuery(configuration.getQuery()); + } else { + read = read.withTable(configuration.getTableId()).withColumns(configuration.getColumns()); + } + PCollection spannerRows = input.getPipeline().apply(read); + Schema schema = spannerRows.getSchema(); + PCollection rows = + spannerRows.apply( + MapElements.into(TypeDescriptor.of(Row.class)) + .via((Struct struct) -> StructUtils.structToBeamRow(struct, schema))); + + return PCollectionRowTuple.of("output", rows.setRowSchema(schema)); + } + } + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:spanner_read:v1"; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + outputCollectionNames() { + return Collections.singletonList("output"); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class SpannerReadSchemaTransformConfiguration implements Serializable { + @AutoValue.Builder + @Nullable + public abstract static class Builder { + public abstract Builder setProjectId(String projectId); + + public abstract Builder setInstanceId(String instanceId); + + public abstract Builder setDatabaseId(String databaseId); + + public abstract Builder setTableId(String tableId); + + public abstract Builder setQuery(String query); + + public abstract Builder setColumns(List columns); + + public abstract SpannerReadSchemaTransformConfiguration build(); + } + + public void validate() { + String invalidConfigMessage = "Invalid Cloud Spanner Read configuration: "; + checkArgument( + !Strings.isNullOrEmpty(this.getInstanceId()), + invalidConfigMessage + "Instance ID must be specified."); + checkArgument( + !Strings.isNullOrEmpty(this.getDatabaseId()), + invalidConfigMessage + "Database ID must be specified."); + if (Strings.isNullOrEmpty(this.getQuery())) { + checkArgument( + !Strings.isNullOrEmpty(this.getTableId()), + invalidConfigMessage + "Table name must be specified for table read."); + checkArgument( + this.getColumns() != null && !this.getColumns().isEmpty(), + invalidConfigMessage + "Columns must be specified for table read."); + } else { + checkArgument( + !Strings.isNullOrEmpty(this.getQuery()), + invalidConfigMessage + "Query must be specified for query read."); + checkArgument( + Strings.isNullOrEmpty(this.getTableId()), + invalidConfigMessage + "Table name should not be specified when using a query."); + checkArgument( + this.getColumns() == null || this.getColumns().isEmpty(), + invalidConfigMessage + "Columns should not be specified when using a query."); + } + } + + public static Builder builder() { + return new AutoValue_SpannerReadSchemaTransformProvider_SpannerReadSchemaTransformConfiguration + .Builder(); + } + + @SchemaFieldDescription("Specifies the GCP project ID.") + @Nullable + public abstract String getProjectId(); + + @SchemaFieldDescription("Specifies the Cloud Spanner instance.") + public abstract String getInstanceId(); + + @SchemaFieldDescription("Specifies the Cloud Spanner database.") + public abstract String getDatabaseId(); + + @SchemaFieldDescription("Specifies the Cloud Spanner table.") + @Nullable + public abstract String getTableId(); + + @SchemaFieldDescription("Specifies the SQL query to execute.") + @Nullable + public abstract String getQuery(); + + @SchemaFieldDescription("Specifies the columns to read from the table.") + @Nullable + public abstract List getColumns(); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized Class + configurationClass() { + return SpannerReadSchemaTransformConfiguration.class; + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + SpannerReadSchemaTransformConfiguration configuration) { + return new SpannerSchemaTransformRead(configuration); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index 6c8a2541a88b..f50755d18155 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import com.google.cloud.spanner.Mutation; @@ -26,6 +28,9 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.FailureMode; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -35,20 +40,73 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/** + * A provider for writing to Cloud Spanner using the Schema Transform Provider. + * + *

This provider enables writing to Cloud Spanner with support for error handling during the + * write process. Configuration is managed through the {@link + * SpannerWriteSchemaTransformConfiguration} class, allowing users to specify project, instance, + * database, table, and error handling strategies. + * + *

The transformation uses the {@link SpannerIO} to perform the write operation and provides + * options to handle failed mutations, either by throwing an error, or passing the failed mutation + * further in the pipeline for dealing with accordingly. + * + *

Example usage in a YAML pipeline without error handling: + * + *

{@code
+ * pipeline:
+ *   transforms:
+ *     - type: WriteToSpanner
+ *       name: WriteShipments
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         table_id: 'shipments'
+ *
+ * }
+ * + *

Example usage in a YAML pipeline using error handling: + * + *

{@code
+ * pipeline:
+ *   transforms:
+ *     - type: WriteToSpanner
+ *       name: WriteShipments
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         table_id: 'shipments'
+ *         error_handling:
+ *           output: 'errors'
+ *
+ *     - type: WriteToJson
+ *       input: WriteSpanner.my_error_output
+ *       config:
+ *          path: errors.json
+ *
+ * }
+ */ @AutoService(SchemaTransformProvider.class) public class SpannerWriteSchemaTransformProvider extends TypedSchemaTransformProvider< @@ -98,31 +156,45 @@ public void finish(FinishBundleContext c) { } } + private static class NoOutputDoFn extends DoFn { + @ProcessElement + public void process(ProcessContext c) {} + } + @Override public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { + boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling()); SpannerWriteResult result = input .get("input") .apply( - MapElements.via( - new SimpleFunction( - row -> + MapElements.into(TypeDescriptor.of(Mutation.class)) + .via( + (Row row) -> MutationUtils.createMutationFromBeamRows( Mutation.newInsertOrUpdateBuilder(configuration.getTableId()), - Objects.requireNonNull(row))) {})) + Objects.requireNonNull(row)))) .apply( SpannerIO.write() + .withProjectId(configuration.getProjectId()) .withDatabaseId(configuration.getDatabaseId()) .withInstanceId(configuration.getInstanceId()) - .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES)); - Schema failureSchema = - Schema.builder() - .addStringField("operation") - .addStringField("instanceId") - .addStringField("databaseId") - .addStringField("tableId") - .addStringField("mutationData") - .build(); + .withFailureMode( + handleErrors ? FailureMode.REPORT_FAILURES : FailureMode.FAIL_FAST)); + + PCollection postWrite = + result + .getFailedMutations() + .apply("post-write", ParDo.of(new NoOutputDoFn())) + .setRowSchema(Schema.of()); + + if (!handleErrors) { + return PCollectionRowTuple.of("post-write", postWrite); + } + + Schema inputSchema = input.get("input").getSchema(); + Schema failureSchema = ErrorHandling.errorSchema(inputSchema); + PCollection failures = result .getFailedMutations() @@ -130,26 +202,30 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { FlatMapElements.into(TypeDescriptors.rows()) .via( mtg -> - Objects.requireNonNull(mtg).attached().stream() + StreamSupport.stream(Objects.requireNonNull(mtg).spliterator(), false) .map( mutation -> Row.withSchema(failureSchema) - .addValue(mutation.getOperation().toString()) - .addValue(configuration.getInstanceId()) - .addValue(configuration.getDatabaseId()) - .addValue(mutation.getTable()) - // TODO(pabloem): Figure out how to represent - // mutation - // contents in DLQ - .addValue( - Iterators.toString( - mutation.getValues().iterator())) + .withFieldValue( + "error_message", + String.format( + "%s operation failed at instance: %s, database: %s, table: %s", + mutation.getOperation(), + configuration.getInstanceId(), + configuration.getDatabaseId(), + mutation.getTable())) + .withFieldValue( + "failed_row", + MutationUtils.createRowFromMutation( + inputSchema, mutation)) .build()) .collect(Collectors.toList()))) .setRowSchema(failureSchema) .apply("error-count", ParDo.of(new ElementCounterFn("Spanner-write-error-counter"))) .setRowSchema(failureSchema); - return PCollectionRowTuple.of("failures", failures).and("errors", failures); + + return PCollectionRowTuple.of("post-write", postWrite) + .and(configuration.getErrorHandling().getOutput(), failures); } } @@ -167,13 +243,17 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { @Override public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() { - return Arrays.asList("failures", "errors"); + return Arrays.asList("post-write", "errors"); } @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract static class SpannerWriteSchemaTransformConfiguration implements Serializable { + @SchemaFieldDescription("Specifies the GCP project.") + @Nullable + public abstract String getProjectId(); + @SchemaFieldDescription("Specifies the Cloud Spanner instance.") public abstract String getInstanceId(); @@ -183,6 +263,10 @@ public abstract static class SpannerWriteSchemaTransformConfiguration implements @SchemaFieldDescription("Specifies the Cloud Spanner table.") public abstract String getTableId(); + @SchemaFieldDescription("Specifies how to handle errors.") + @Nullable + public abstract ErrorHandling getErrorHandling(); + public static Builder builder() { return new AutoValue_SpannerWriteSchemaTransformProvider_SpannerWriteSchemaTransformConfiguration .Builder(); @@ -190,13 +274,33 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { + public abstract Builder setProjectId(String projectId); + public abstract Builder setInstanceId(String instanceId); public abstract Builder setDatabaseId(String databaseId); public abstract Builder setTableId(String tableId); + public abstract Builder setErrorHandling(ErrorHandling errorHandling); + public abstract SpannerWriteSchemaTransformConfiguration build(); } + + public void validate() { + String invalidConfigMessage = "Invalid Spanner Write configuration: "; + + checkArgument( + !Strings.isNullOrEmpty(this.getInstanceId()), + invalidConfigMessage + "Instance ID for a Spanner Write must be specified."); + + checkArgument( + !Strings.isNullOrEmpty(this.getDatabaseId()), + invalidConfigMessage + "Database ID for a Spanner Write must be specified."); + + checkArgument( + !Strings.isNullOrEmpty(this.getTableId()), + invalidConfigMessage + "Table ID for a Spanner Write must be specified."); + } } } diff --git a/sdks/python/apache_beam/io/gcp/spanner_wrapper.py b/sdks/python/apache_beam/io/gcp/spanner_wrapper.py new file mode 100644 index 000000000000..0cd7bec95b0f --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/spanner_wrapper.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import uuid + +from apache_beam.utils import retry + +try: + from google.cloud import spanner + +except ImportError: + spanner = None + +_LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 3 + + +class SpannerWrapper(object): + TEMP_DATABASE_PREFIX = 'temp-' + + def __init__(self, project_id, temp_database_id=None): + self._spanner_client = spanner.Client(project=project_id) + self._spanner_instance = self._spanner_client.instance("beam-test") + self._test_database = None + + if temp_database_id and temp_database_id.startswith( + self.TEMP_DATABASE_PREFIX): + raise ValueError( + 'User provided temp database ID cannot start with %r' % + self.TEMP_DATABASE_PREFIX) + + if temp_database_id is not None: + self._test_database = temp_database_id + else: + self._test_database = self._get_temp_database() + + def _get_temp_database(self): + uniq_id = uuid.uuid4().hex[:10] + return f'{self.TEMP_DATABASE_PREFIX}{uniq_id}' + + @retry.with_exponential_backoff( + num_retries=MAX_RETRIES, + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def _create_database(self): + _LOGGER.info('Creating test database: %s' % self._test_database) + instance = self._spanner_instance + database = instance.database( + self._test_database, + ddl_statements=[ + '''CREATE TABLE tmp_table ( + UserId STRING(256) NOT NULL, + Key STRING(1024) + ) PRIMARY KEY (UserId)''' + ]) + operation = database.create() + _LOGGER.info('Creating database: Done! %s' % str(operation.result())) + + def _delete_database(self): + if (self._spanner_instance): + database = self._spanner_instance.database(self._test_database) + database.drop() diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index bd38ab82c845..af1be7b1e8e5 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -33,6 +33,7 @@ from apache_beam.io import filesystems from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery +from apache_beam.io.gcp.spanner_wrapper import SpannerWrapper from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils import python_callable from apache_beam.yaml import yaml_provider @@ -46,6 +47,22 @@ def gcs_temp_dir(bucket): filesystems.FileSystems.delete([gcs_tempdir]) +@contextlib.contextmanager +def temp_spanner_table(project, prefix='temp_spanner_db_'): + spanner_client = SpannerWrapper(project) + spanner_client._create_database() + instance = "beam-test" + database = spanner_client._test_database + table = 'tmp_table' + columns = ['UserId', 'Key'] + logging.info("Created Spanner database: %s", database) + try: + yield [f'{project}', f'{instance}', f'{database}', f'{table}', columns] + finally: + logging.info("Deleting Spanner database: %s", database) + spanner_client._delete_database() + + @contextlib.contextmanager def temp_bigquery_table(project, prefix='yaml_bq_it_'): bigquery_client = BigQueryWrapper() diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 579df2b82209..4de36b3dc9e0 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -257,3 +257,30 @@ 'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1' config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' + +- type: renaming + transforms: + 'ReadFromSpanner': 'ReadFromSpanner' + 'WriteToSpanner': 'WriteToSpanner' + config: + mappings: + 'ReadFromSpanner': + project: 'project_id' + instance: 'instance_id' + database: 'database_id' + table: 'table_id' + query: 'query' + columns: 'columns' + 'WriteToSpanner': + project: 'project_id' + instance: 'instance_id' + database: 'database_id' + table: 'table_id' + error_handling: 'error_handling' + underlying_provider: + type: beamJar + transforms: + 'ReadFromSpanner': 'beam:schematransform:org.apache.beam:spanner_read:v1' + 'WriteToSpanner': 'beam:schematransform:org.apache.beam:spanner_write:v1' + config: + gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/tests/spanner.yaml b/sdks/python/apache_beam/yaml/tests/spanner.yaml new file mode 100644 index 000000000000..d4345441825a --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/spanner.yaml @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: SPANNER_TABLE + type: "apache_beam.yaml.integration_tests.temp_spanner_table" + config: + project: "apache-beam-testing" + - name: TEMP_DIR + type: "apache_beam.yaml.integration_tests.gcs_temp_dir" + config: + bucket: "gs://temp-storage-for-end-to-end-tests/temp-it" + +pipelines: + - pipeline: + transforms: + - type: Create + name: Row + config: + elements: + - UserId: "01" + Key: "Apple" + + - type: WriteToSpanner + name: Write + input: Row + config: + project: "{SPANNER_TABLE[0]}" + instance: "{SPANNER_TABLE[1]}" + database: "{SPANNER_TABLE[2]}" + table: "{SPANNER_TABLE[3]}" + error_handling: + output: my_error_output + + - type: LogForTesting + input: Write.my_error_output + + - pipeline: + type: chain + transforms: + - type: ReadFromSpanner + config: + project: "{SPANNER_TABLE[0]}" + instance: "{SPANNER_TABLE[1]}" + database: "{SPANNER_TABLE[2]}" + query: 'SELECT * FROM tmp_table where UserId = "01"' + + - type: AssertEqual + config: + elements: + - UserId: "01" + Key: "Apple" + + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - UserId: "02" + Key: "Mango" + + - type: WriteToSpanner + config: + project: "{SPANNER_TABLE[0]}" + instance: "{SPANNER_TABLE[1]}" + database: "{SPANNER_TABLE[2]}" + table: "{SPANNER_TABLE[3]}" + + - pipeline: + type: chain + transforms: + - type: ReadFromSpanner + config: + project: "{SPANNER_TABLE[0]}" + instance: "{SPANNER_TABLE[1]}" + database: "{SPANNER_TABLE[2]}" + table: "{SPANNER_TABLE[3]}" + columns: + - "{SPANNER_TABLE[4][0]}" + - "{SPANNER_TABLE[4][1]}" diff --git a/sdks/python/setup.py b/sdks/python/setup.py index f50a3a07746f..4de6949cc192 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -443,7 +443,7 @@ def get_portability_package_data(): 'google-cloud-bigquery-storage>=2.6.3,<3', 'google-cloud-core>=2.0.0,<3', 'google-cloud-bigtable>=2.19.0,<3', - 'google-cloud-spanner>=3.0.0,<4', + 'google-cloud-spanner>=3.0.0,<3.48', # GCP Packages required by ML functionality 'google-cloud-dlp>=3.0.0,<4', 'google-cloud-language>=2.0,<3',