diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java index 7fa29708c9ff..053521dbfb39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java @@ -18,13 +18,14 @@ package org.apache.beam.sdk.schemas.transforms.providers; import com.google.auto.value.AutoValue; +import java.io.Serializable; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.values.Row; @AutoValue -public abstract class ErrorHandling { +public abstract class ErrorHandling implements Serializable { @SchemaFieldDescription("The name of the output PCollection containing failed writes.") public abstract String getOutput(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java index c0654b2cb05f..5a106a34b0c6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java @@ -26,8 +26,11 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Key; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Value; import java.math.BigDecimal; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.StreamSupport; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -351,4 +354,61 @@ private static void addIterableToMutationBuilder( beamIterableType.getTypeName())); } } + + public static Row createRowFromMutation(Schema schema, Mutation mutation) { + Map mutationHashMap = new HashMap<>(); + mutation + .asMap() + .forEach( + (column, value) -> mutationHashMap.put(column, convertValueToBeamFieldType(value))); + return Row.withSchema(schema).withFieldValues(mutationHashMap).build(); + } + + public static Object convertValueToBeamFieldType(Value value) { + switch (value.getType().getCode()) { + case BOOL: + return value.getBool(); + case BYTES: + return value.getBytes(); + case DATE: + return value.getDate(); + case INT64: + return value.getInt64(); + case FLOAT64: + return value.getFloat64(); + case NUMERIC: + return value.getNumeric(); + case TIMESTAMP: + return value.getTimestamp(); + case STRING: + return value.getString(); + case JSON: + return value.getJson(); + case ARRAY: + switch (value.getType().getArrayElementType().getCode()) { + case BOOL: + return value.getBoolArray(); + case BYTES: + return value.getBytesArray(); + case DATE: + return value.getDateArray(); + case INT64: + return value.getInt64Array(); + case FLOAT64: + return value.getFloat64Array(); + case NUMERIC: + return value.getNumericArray(); + case TIMESTAMP: + return value.getTimestampArray(); + case STRING: + return value.getStringArray(); + case JSON: + return value.getJsonArray(); + default: + return value.toString(); + } + default: + return value.toString(); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java new file mode 100644 index 000000000000..9820bb39d09d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import com.google.cloud.spanner.Struct; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/** + * A provider for reading from Cloud Spanner using a Schema Transform Provider. + * + *

This provider enables reading from Cloud Spanner using a specified SQL query or by directly + * accessing a table and its columns. It supports configuration through the {@link + * SpannerReadSchemaTransformConfiguration} class, allowing users to specify project, instance, + * database, table, query, and columns. + * + *

The transformation leverages the {@link SpannerIO} to perform the read operation and maps the + * results to Beam rows, preserving the schema. + * + *

Example usage in a YAML pipeline using query: + * + *

{@code
+ * pipeline:
+ *   transforms:
+ *     - type: ReadFromSpanner
+ *       name: ReadShipments
+ *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         query: 'SELECT * FROM shipments'
+ * }
+ * + *

Example usage in a YAML pipeline using a table and columns: + * + *

{@code
+ * pipeline:
+ *   transforms:
+ *     - type: ReadFromSpanner
+ *       name: ReadShipments
+ *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         table: 'shipments'
+ *         columns: ['customer_id', 'customer_name']
+ * }
+ */ +@AutoService(SchemaTransformProvider.class) +public class SpannerReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> { + + static class SpannerSchemaTransformRead extends SchemaTransform implements Serializable { + private final SpannerReadSchemaTransformConfiguration configuration; + + SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be null."); + SpannerIO.Read read = + SpannerIO.readWithSchema() + .withProjectId(configuration.getProjectId()) + .withInstanceId(configuration.getInstanceId()) + .withDatabaseId(configuration.getDatabaseId()); + + if (!Strings.isNullOrEmpty(configuration.getQuery())) { + read = read.withQuery(configuration.getQuery()); + } else { + read = read.withTable(configuration.getTableId()).withColumns(configuration.getColumns()); + } + PCollection spannerRows = input.getPipeline().apply(read); + Schema schema = spannerRows.getSchema(); + PCollection rows = + spannerRows.apply( + MapElements.into(TypeDescriptor.of(Row.class)) + .via((Struct struct) -> StructUtils.structToBeamRow(struct, schema))); + + return PCollectionRowTuple.of("output", rows.setRowSchema(schema)); + } + } + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:spanner_read:v1"; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + outputCollectionNames() { + return Collections.singletonList("output"); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class SpannerReadSchemaTransformConfiguration implements Serializable { + @AutoValue.Builder + @Nullable + public abstract static class Builder { + public abstract Builder setProjectId(String projectId); + + public abstract Builder setInstanceId(String instanceId); + + public abstract Builder setDatabaseId(String databaseId); + + public abstract Builder setTableId(String tableId); + + public abstract Builder setQuery(String query); + + public abstract Builder setColumns(List columns); + + public abstract SpannerReadSchemaTransformConfiguration build(); + } + + public void validate() { + String invalidConfigMessage = "Invalid Cloud Spanner Read configuration: "; + checkArgument( + !Strings.isNullOrEmpty(this.getInstanceId()), + invalidConfigMessage + "Instance ID must be specified."); + checkArgument( + !Strings.isNullOrEmpty(this.getDatabaseId()), + invalidConfigMessage + "Database ID must be specified."); + if (Strings.isNullOrEmpty(this.getQuery())) { + checkArgument( + !Strings.isNullOrEmpty(this.getTableId()), + invalidConfigMessage + "Table name must be specified for table read."); + checkArgument( + this.getColumns() != null && !this.getColumns().isEmpty(), + invalidConfigMessage + "Columns must be specified for table read."); + } else { + checkArgument( + !Strings.isNullOrEmpty(this.getQuery()), + invalidConfigMessage + "Query must be specified for query read."); + checkArgument( + Strings.isNullOrEmpty(this.getTableId()), + invalidConfigMessage + "Table name should not be specified when using a query."); + checkArgument( + this.getColumns() == null || this.getColumns().isEmpty(), + invalidConfigMessage + "Columns should not be specified when using a query."); + } + } + + public static Builder builder() { + return new AutoValue_SpannerReadSchemaTransformProvider_SpannerReadSchemaTransformConfiguration + .Builder(); + } + + @SchemaFieldDescription("Specifies the GCP project ID.") + @Nullable + public abstract String getProjectId(); + + @SchemaFieldDescription("Specifies the Cloud Spanner instance.") + public abstract String getInstanceId(); + + @SchemaFieldDescription("Specifies the Cloud Spanner database.") + public abstract String getDatabaseId(); + + @SchemaFieldDescription("Specifies the Cloud Spanner table.") + @Nullable + public abstract String getTableId(); + + @SchemaFieldDescription("Specifies the SQL query to execute.") + @Nullable + public abstract String getQuery(); + + @SchemaFieldDescription("Specifies the columns to read from the table.") + @Nullable + public abstract List getColumns(); + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized Class + configurationClass() { + return SpannerReadSchemaTransformConfiguration.class; + } + + @Override + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + SpannerReadSchemaTransformConfiguration configuration) { + return new SpannerSchemaTransformRead(configuration); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index 6c8a2541a88b..f50755d18155 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import com.google.cloud.spanner.Mutation; @@ -26,6 +28,9 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.FailureMode; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -35,20 +40,73 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/** + * A provider for writing to Cloud Spanner using the Schema Transform Provider. + * + *

This provider enables writing to Cloud Spanner with support for error handling during the + * write process. Configuration is managed through the {@link + * SpannerWriteSchemaTransformConfiguration} class, allowing users to specify project, instance, + * database, table, and error handling strategies. + * + *

The transformation uses the {@link SpannerIO} to perform the write operation and provides + * options to handle failed mutations, either by throwing an error, or passing the failed mutation + * further in the pipeline for dealing with accordingly. + * + *

Example usage in a YAML pipeline without error handling: + * + *

{@code
+ * pipeline:
+ *   transforms:
+ *     - type: WriteToSpanner
+ *       name: WriteShipments
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         table_id: 'shipments'
+ *
+ * }
+ * + *

Example usage in a YAML pipeline using error handling: + * + *

{@code
+ * pipeline:
+ *   transforms:
+ *     - type: WriteToSpanner
+ *       name: WriteShipments
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         table_id: 'shipments'
+ *         error_handling:
+ *           output: 'errors'
+ *
+ *     - type: WriteToJson
+ *       input: WriteSpanner.my_error_output
+ *       config:
+ *          path: errors.json
+ *
+ * }
+ */ @AutoService(SchemaTransformProvider.class) public class SpannerWriteSchemaTransformProvider extends TypedSchemaTransformProvider< @@ -98,31 +156,45 @@ public void finish(FinishBundleContext c) { } } + private static class NoOutputDoFn extends DoFn { + @ProcessElement + public void process(ProcessContext c) {} + } + @Override public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { + boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling()); SpannerWriteResult result = input .get("input") .apply( - MapElements.via( - new SimpleFunction( - row -> + MapElements.into(TypeDescriptor.of(Mutation.class)) + .via( + (Row row) -> MutationUtils.createMutationFromBeamRows( Mutation.newInsertOrUpdateBuilder(configuration.getTableId()), - Objects.requireNonNull(row))) {})) + Objects.requireNonNull(row)))) .apply( SpannerIO.write() + .withProjectId(configuration.getProjectId()) .withDatabaseId(configuration.getDatabaseId()) .withInstanceId(configuration.getInstanceId()) - .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES)); - Schema failureSchema = - Schema.builder() - .addStringField("operation") - .addStringField("instanceId") - .addStringField("databaseId") - .addStringField("tableId") - .addStringField("mutationData") - .build(); + .withFailureMode( + handleErrors ? FailureMode.REPORT_FAILURES : FailureMode.FAIL_FAST)); + + PCollection postWrite = + result + .getFailedMutations() + .apply("post-write", ParDo.of(new NoOutputDoFn())) + .setRowSchema(Schema.of()); + + if (!handleErrors) { + return PCollectionRowTuple.of("post-write", postWrite); + } + + Schema inputSchema = input.get("input").getSchema(); + Schema failureSchema = ErrorHandling.errorSchema(inputSchema); + PCollection failures = result .getFailedMutations() @@ -130,26 +202,30 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { FlatMapElements.into(TypeDescriptors.rows()) .via( mtg -> - Objects.requireNonNull(mtg).attached().stream() + StreamSupport.stream(Objects.requireNonNull(mtg).spliterator(), false) .map( mutation -> Row.withSchema(failureSchema) - .addValue(mutation.getOperation().toString()) - .addValue(configuration.getInstanceId()) - .addValue(configuration.getDatabaseId()) - .addValue(mutation.getTable()) - // TODO(pabloem): Figure out how to represent - // mutation - // contents in DLQ - .addValue( - Iterators.toString( - mutation.getValues().iterator())) + .withFieldValue( + "error_message", + String.format( + "%s operation failed at instance: %s, database: %s, table: %s", + mutation.getOperation(), + configuration.getInstanceId(), + configuration.getDatabaseId(), + mutation.getTable())) + .withFieldValue( + "failed_row", + MutationUtils.createRowFromMutation( + inputSchema, mutation)) .build()) .collect(Collectors.toList()))) .setRowSchema(failureSchema) .apply("error-count", ParDo.of(new ElementCounterFn("Spanner-write-error-counter"))) .setRowSchema(failureSchema); - return PCollectionRowTuple.of("failures", failures).and("errors", failures); + + return PCollectionRowTuple.of("post-write", postWrite) + .and(configuration.getErrorHandling().getOutput(), failures); } } @@ -167,13 +243,17 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { @Override public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() { - return Arrays.asList("failures", "errors"); + return Arrays.asList("post-write", "errors"); } @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract static class SpannerWriteSchemaTransformConfiguration implements Serializable { + @SchemaFieldDescription("Specifies the GCP project.") + @Nullable + public abstract String getProjectId(); + @SchemaFieldDescription("Specifies the Cloud Spanner instance.") public abstract String getInstanceId(); @@ -183,6 +263,10 @@ public abstract static class SpannerWriteSchemaTransformConfiguration implements @SchemaFieldDescription("Specifies the Cloud Spanner table.") public abstract String getTableId(); + @SchemaFieldDescription("Specifies how to handle errors.") + @Nullable + public abstract ErrorHandling getErrorHandling(); + public static Builder builder() { return new AutoValue_SpannerWriteSchemaTransformProvider_SpannerWriteSchemaTransformConfiguration .Builder(); @@ -190,13 +274,33 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { + public abstract Builder setProjectId(String projectId); + public abstract Builder setInstanceId(String instanceId); public abstract Builder setDatabaseId(String databaseId); public abstract Builder setTableId(String tableId); + public abstract Builder setErrorHandling(ErrorHandling errorHandling); + public abstract SpannerWriteSchemaTransformConfiguration build(); } + + public void validate() { + String invalidConfigMessage = "Invalid Spanner Write configuration: "; + + checkArgument( + !Strings.isNullOrEmpty(this.getInstanceId()), + invalidConfigMessage + "Instance ID for a Spanner Write must be specified."); + + checkArgument( + !Strings.isNullOrEmpty(this.getDatabaseId()), + invalidConfigMessage + "Database ID for a Spanner Write must be specified."); + + checkArgument( + !Strings.isNullOrEmpty(this.getTableId()), + invalidConfigMessage + "Table ID for a Spanner Write must be specified."); + } } } diff --git a/sdks/python/apache_beam/io/gcp/spanner_wrapper.py b/sdks/python/apache_beam/io/gcp/spanner_wrapper.py new file mode 100644 index 000000000000..0cd7bec95b0f --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/spanner_wrapper.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import uuid + +from apache_beam.utils import retry + +try: + from google.cloud import spanner + +except ImportError: + spanner = None + +_LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 3 + + +class SpannerWrapper(object): + TEMP_DATABASE_PREFIX = 'temp-' + + def __init__(self, project_id, temp_database_id=None): + self._spanner_client = spanner.Client(project=project_id) + self._spanner_instance = self._spanner_client.instance("beam-test") + self._test_database = None + + if temp_database_id and temp_database_id.startswith( + self.TEMP_DATABASE_PREFIX): + raise ValueError( + 'User provided temp database ID cannot start with %r' % + self.TEMP_DATABASE_PREFIX) + + if temp_database_id is not None: + self._test_database = temp_database_id + else: + self._test_database = self._get_temp_database() + + def _get_temp_database(self): + uniq_id = uuid.uuid4().hex[:10] + return f'{self.TEMP_DATABASE_PREFIX}{uniq_id}' + + @retry.with_exponential_backoff( + num_retries=MAX_RETRIES, + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def _create_database(self): + _LOGGER.info('Creating test database: %s' % self._test_database) + instance = self._spanner_instance + database = instance.database( + self._test_database, + ddl_statements=[ + '''CREATE TABLE tmp_table ( + UserId STRING(256) NOT NULL, + Key STRING(1024) + ) PRIMARY KEY (UserId)''' + ]) + operation = database.create() + _LOGGER.info('Creating database: Done! %s' % str(operation.result())) + + def _delete_database(self): + if (self._spanner_instance): + database = self._spanner_instance.database(self._test_database) + database.drop() diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index bd38ab82c845..af1be7b1e8e5 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -33,6 +33,7 @@ from apache_beam.io import filesystems from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery +from apache_beam.io.gcp.spanner_wrapper import SpannerWrapper from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils import python_callable from apache_beam.yaml import yaml_provider @@ -46,6 +47,22 @@ def gcs_temp_dir(bucket): filesystems.FileSystems.delete([gcs_tempdir]) +@contextlib.contextmanager +def temp_spanner_table(project, prefix='temp_spanner_db_'): + spanner_client = SpannerWrapper(project) + spanner_client._create_database() + instance = "beam-test" + database = spanner_client._test_database + table = 'tmp_table' + columns = ['UserId', 'Key'] + logging.info("Created Spanner database: %s", database) + try: + yield [f'{project}', f'{instance}', f'{database}', f'{table}', columns] + finally: + logging.info("Deleting Spanner database: %s", database) + spanner_client._delete_database() + + @contextlib.contextmanager def temp_bigquery_table(project, prefix='yaml_bq_it_'): bigquery_client = BigQueryWrapper() diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 579df2b82209..4de36b3dc9e0 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -257,3 +257,30 @@ 'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1' config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' + +- type: renaming + transforms: + 'ReadFromSpanner': 'ReadFromSpanner' + 'WriteToSpanner': 'WriteToSpanner' + config: + mappings: + 'ReadFromSpanner': + project: 'project_id' + instance: 'instance_id' + database: 'database_id' + table: 'table_id' + query: 'query' + columns: 'columns' + 'WriteToSpanner': + project: 'project_id' + instance: 'instance_id' + database: 'database_id' + table: 'table_id' + error_handling: 'error_handling' + underlying_provider: + type: beamJar + transforms: + 'ReadFromSpanner': 'beam:schematransform:org.apache.beam:spanner_read:v1' + 'WriteToSpanner': 'beam:schematransform:org.apache.beam:spanner_write:v1' + config: + gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/tests/spanner.yaml b/sdks/python/apache_beam/yaml/tests/spanner.yaml new file mode 100644 index 000000000000..d4345441825a --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/spanner.yaml @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: SPANNER_TABLE + type: "apache_beam.yaml.integration_tests.temp_spanner_table" + config: + project: "apache-beam-testing" + - name: TEMP_DIR + type: "apache_beam.yaml.integration_tests.gcs_temp_dir" + config: + bucket: "gs://temp-storage-for-end-to-end-tests/temp-it" + +pipelines: + - pipeline: + transforms: + - type: Create + name: Row + config: + elements: + - UserId: "01" + Key: "Apple" + + - type: WriteToSpanner + name: Write + input: Row + config: + project: "{SPANNER_TABLE[0]}" + instance: "{SPANNER_TABLE[1]}" + database: "{SPANNER_TABLE[2]}" + table: "{SPANNER_TABLE[3]}" + error_handling: + output: my_error_output + + - type: LogForTesting + input: Write.my_error_output + + - pipeline: + type: chain + transforms: + - type: ReadFromSpanner + config: + project: "{SPANNER_TABLE[0]}" + instance: "{SPANNER_TABLE[1]}" + database: "{SPANNER_TABLE[2]}" + query: 'SELECT * FROM tmp_table where UserId = "01"' + + - type: AssertEqual + config: + elements: + - UserId: "01" + Key: "Apple" + + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - UserId: "02" + Key: "Mango" + + - type: WriteToSpanner + config: + project: "{SPANNER_TABLE[0]}" + instance: "{SPANNER_TABLE[1]}" + database: "{SPANNER_TABLE[2]}" + table: "{SPANNER_TABLE[3]}" + + - pipeline: + type: chain + transforms: + - type: ReadFromSpanner + config: + project: "{SPANNER_TABLE[0]}" + instance: "{SPANNER_TABLE[1]}" + database: "{SPANNER_TABLE[2]}" + table: "{SPANNER_TABLE[3]}" + columns: + - "{SPANNER_TABLE[4][0]}" + - "{SPANNER_TABLE[4][1]}" diff --git a/sdks/python/setup.py b/sdks/python/setup.py index f50a3a07746f..4de6949cc192 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -443,7 +443,7 @@ def get_portability_package_data(): 'google-cloud-bigquery-storage>=2.6.3,<3', 'google-cloud-core>=2.0.0,<3', 'google-cloud-bigtable>=2.19.0,<3', - 'google-cloud-spanner>=3.0.0,<4', + 'google-cloud-spanner>=3.0.0,<3.48', # GCP Packages required by ML functionality 'google-cloud-dlp>=3.0.0,<4', 'google-cloud-language>=2.0,<3',