From 70e41f4ebef55667943616f322708be950e91ddf Mon Sep 17 00:00:00 2001 From: Niel Markwick Date: Tue, 5 Dec 2023 20:59:40 +0100 Subject: [PATCH] Add docs and warning about using SpannerIO.Read in streaming pipelines (#29601) * Add docs and warning about using SpannerIO.Read in streaming. Add more documentation around SpannerIO.Read and .ReadAll explaining PartitionedRead API, batching, and how it should not be used for unbounded reads in Streaming pipelines. Add a warning if SpannerIO.ReadAll is applied to an unbounded input. --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 87 ++++++++++++++++--- 1 file changed, 76 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 786fa91f5582..b6ec8097a5fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -142,12 +142,15 @@ * *

Reading from Cloud Spanner

* - *

To read from Cloud Spanner, apply {@link Read} transformation. It will return a {@link - * PCollection} of {@link Struct Structs}, where each element represents an individual row returned - * from the read operation. Both Query and Read APIs are supported. See more information about Bulk reading of a single query or table + * + *

To perform a single read from Cloud Spanner, construct a {@link Read} transform using {@link + * SpannerIO#read() SpannerIO.read()}. It will return a {@link PCollection} of {@link Struct + * Structs}, where each element represents an individual row returned from the read operation. Both + * Query and Read APIs are supported. See more information about reading from Cloud Spanner * - *

To execute a query, specify a {@link Read#withQuery(Statement)} or {@link + *

To execute a Query, specify a {@link Read#withQuery(Statement)} or {@link * Read#withQuery(String)} during the construction of the transform. * *

{@code
@@ -158,8 +161,17 @@
  *         .withQuery("SELECT id, name, email FROM users"));
  * }
* - *

To use the Read API, specify a {@link Read#withTable(String) table name} and a {@link - * Read#withColumns(List) list of columns}. + *

Reads by default use the PartitionQuery API + * which enforces some limitations on the type of queries that can be used so that the data can be + * read in parallel. If the query is not supported by the PartitionQuery API, then you can specify a + * non-partitioned read by setting {@link Read#withBatching(boolean) withBatching(false)}. If the + * amount of data being read by a non-partitioned read is very large, it may be useful to add a + * {@link Reshuffle#viaRandomKey()} transform on the output so that the downstream transforms can + * run in parallel. + * + *

To read an entire Table, use {@link Read#withTable(String)} and optionally + * specify a {@link Read#withColumns(List) list of columns}. * *

{@code
  * PCollection rows = p.apply(
@@ -170,13 +182,26 @@
  *        .withColumns("id", "name", "email"));
  * }
* - *

To optimally read using index, specify the index name using {@link Read#withIndex}. + *

To read using an Index, specify the index name using {@link + * Read#withIndex(String)}. + * + *

{@code
+ * PCollection rows = p.apply(
+ *    SpannerIO.read()
+ *        .withInstanceId(instanceId)
+ *        .withDatabaseId(dbId)
+ *        .withTable("users")
+ *        .withIndex("users_by_name")
+ *        .withColumns("id", "name", "email"));
+ * }
+ * + *

Read consistency

* *

The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the * power of read only transactions. Staleness of data can be controlled using {@link * Read#withTimestampBound} or {@link Read#withTimestamp(Timestamp)} methods. Read more about transactions in - * Cloud Spanner. + * href="https://cloud.google.com/spanner/docs/transactions#read-only_transactions">Read more + * about transactions in Cloud Spanner. * *

It is possible to read several {@link PCollection PCollections} within a single transaction. * Apply {@link SpannerIO#createTransaction()} transform, that lazily creates a transaction. The @@ -204,6 +229,29 @@ * .withTransaction(tx)); * } * + *

Bulk reading of multiple queries or tables

+ * + * You can perform multiple consistent reads on a set of tables or using a set of queries by + * constructing a {@link ReadAll} transform using {@link SpannerIO#readAll() SpannerIO.readAll()}. + * This transform takes a {@link PCollection} of {@link ReadOperation} elements, and performs the + * partitioned read on each of them using the same Read Only Transaction for consistent results. + * + *

Note that this transform should not be used in Streaming pipelines. This is + * because the same Read Only Transaction, which is created once when the pipeline is first + * executed, will be used for all reads. The data being read will therefore become stale, and if no + * reads are made for more than 1 hour, the transaction will automatically timeout and be closed by + * the Spanner server, meaning that any subsequent reads will fail. + * + *

{@code
+ * // Build a collection of ReadOperations.
+ * PCollection reads = ...
+ *
+ * PCollection rows = reads.apply(
+ *     SpannerIO.readAll()
+ *         .withInstanceId(instanceId)
+ *         .withDatabaseId(dbId)
+ * }
+ * *

Writing to Cloud Spanner

* *

The Cloud Spanner {@link Write} transform writes to Cloud Spanner by executing a collection of @@ -362,6 +410,12 @@ *

{@link Write} can be used as a streaming sink, however as with batch mode note that the write * order of individual {@link Mutation}/{@link MutationGroup} objects is not guaranteed. * + *

{@link Read} and {@link ReadAll} can be used in Streaming pipelines to read a set of Facts on + * pipeline startup. + * + *

{@link ReadAll} should not be used on an unbounded {@code PCollection}, for the + * reasons stated above. + * *

Updates to the I/O connector code

* * For any significant significant updates to this I/O connector, please consider involving @@ -564,8 +618,10 @@ public ReadAll withTimestampBound(TimestampBound timestampBound) { } /** - * By default Batch API is used to read data from Cloud Spanner. It is useful to disable - * batching when the underlying query is not root-partitionable. + * By default the PartitionQuery + * API is used to read data from Cloud Spanner. It is useful to disable batching when the + * underlying query is not root-partitionable. */ public ReadAll withBatching(boolean batching) { return toBuilder().setBatching(batching).build(); @@ -585,6 +641,15 @@ public ReadAll withHighPriority() { @Override public PCollection expand(PCollection input) { + + if (PCollection.IsBounded.UNBOUNDED == input.isBounded()) { + // Warn that SpannerIO.ReadAll should not be used on unbounded inputs. + LOG.warn( + "SpannerIO.ReadAll({}) is being applied to an unbounded input. " + + "This is not supported and can lead to runtime failures.", + this.getName()); + } + PTransform, PCollection> readTransform; if (getBatching()) { readTransform =