Skip to content

Commit

Permalink
Add docs and warning about using SpannerIO.Read in streaming pipelines (
Browse files Browse the repository at this point in the history
#29601)

* Add docs and warning about using SpannerIO.Read in streaming.

Add more documentation around SpannerIO.Read and .ReadAll
explaining PartitionedRead API, batching, and how it should
not be used for unbounded reads in Streaming pipelines.

Add a warning if SpannerIO.ReadAll is applied to an unbounded input.
  • Loading branch information
nielm authored Dec 5, 2023
1 parent 68bd67f commit 70e41f4
Showing 1 changed file with 76 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,15 @@
*
* <h3>Reading from Cloud Spanner</h3>
*
* <p>To read from Cloud Spanner, apply {@link Read} transformation. It will return a {@link
* PCollection} of {@link Struct Structs}, where each element represents an individual row returned
* from the read operation. Both Query and Read APIs are supported. See more information about <a
* <h4>Bulk reading of a single query or table</h4>
*
* <p>To perform a single read from Cloud Spanner, construct a {@link Read} transform using {@link
* SpannerIO#read() SpannerIO.read()}. It will return a {@link PCollection} of {@link Struct
* Structs}, where each element represents an individual row returned from the read operation. Both
* Query and Read APIs are supported. See more information about <a
* href="https://cloud.google.com/spanner/docs/reads">reading from Cloud Spanner</a>
*
* <p>To execute a <strong>query</strong>, specify a {@link Read#withQuery(Statement)} or {@link
* <p>To execute a <strong>Query</strong>, specify a {@link Read#withQuery(Statement)} or {@link
* Read#withQuery(String)} during the construction of the transform.
*
* <pre>{@code
Expand All @@ -158,8 +161,17 @@
* .withQuery("SELECT id, name, email FROM users"));
* }</pre>
*
* <p>To use the Read API, specify a {@link Read#withTable(String) table name} and a {@link
* Read#withColumns(List) list of columns}.
* <p>Reads by default use the <a
* href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">PartitionQuery API</a>
* which enforces some limitations on the type of queries that can be used so that the data can be
* read in parallel. If the query is not supported by the PartitionQuery API, then you can specify a
* non-partitioned read by setting {@link Read#withBatching(boolean) withBatching(false)}. If the
* amount of data being read by a non-partitioned read is very large, it may be useful to add a
* {@link Reshuffle#viaRandomKey()} transform on the output so that the downstream transforms can
* run in parallel.
*
* <p>To read an entire <strong>Table</strong>, use {@link Read#withTable(String)} and optionally
* specify a {@link Read#withColumns(List) list of columns}.
*
* <pre>{@code
* PCollection<Struct> rows = p.apply(
Expand All @@ -170,13 +182,26 @@
* .withColumns("id", "name", "email"));
* }</pre>
*
* <p>To optimally read using index, specify the index name using {@link Read#withIndex}.
* <p>To read using an <strong>Index</strong>, specify the index name using {@link
* Read#withIndex(String)}.
*
* <pre>{@code
* PCollection<Struct> rows = p.apply(
* SpannerIO.read()
* .withInstanceId(instanceId)
* .withDatabaseId(dbId)
* .withTable("users")
* .withIndex("users_by_name")
* .withColumns("id", "name", "email"));
* }</pre>
*
* <h4>Read consistency</h4>
*
* <p>The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the
* power of read only transactions. Staleness of data can be controlled using {@link
* Read#withTimestampBound} or {@link Read#withTimestamp(Timestamp)} methods. <a
* href="https://cloud.google.com/spanner/docs/transactions">Read more</a> about transactions in
* Cloud Spanner.
* href="https://cloud.google.com/spanner/docs/transactions#read-only_transactions">Read more</a>
* about transactions in Cloud Spanner.
*
* <p>It is possible to read several {@link PCollection PCollections} within a single transaction.
* Apply {@link SpannerIO#createTransaction()} transform, that lazily creates a transaction. The
Expand Down Expand Up @@ -204,6 +229,29 @@
* .withTransaction(tx));
* }</pre>
*
* <h4>Bulk reading of multiple queries or tables</h4>
*
* You can perform multiple consistent reads on a set of tables or using a set of queries by
* constructing a {@link ReadAll} transform using {@link SpannerIO#readAll() SpannerIO.readAll()}.
* This transform takes a {@link PCollection} of {@link ReadOperation} elements, and performs the
* partitioned read on each of them using the same Read Only Transaction for consistent results.
*
* <p>Note that this transform should <strong>not</strong> be used in Streaming pipelines. This is
* because the same Read Only Transaction, which is created once when the pipeline is first
* executed, will be used for all reads. The data being read will therefore become stale, and if no
* reads are made for more than 1 hour, the transaction will automatically timeout and be closed by
* the Spanner server, meaning that any subsequent reads will fail.
*
* <pre>{@code
* // Build a collection of ReadOperations.
* PCollection<ReadOperation> reads = ...
*
* PCollection<Struct> rows = reads.apply(
* SpannerIO.readAll()
* .withInstanceId(instanceId)
* .withDatabaseId(dbId)
* }</pre>
*
* <h3>Writing to Cloud Spanner</h3>
*
* <p>The Cloud Spanner {@link Write} transform writes to Cloud Spanner by executing a collection of
Expand Down Expand Up @@ -362,6 +410,12 @@
* <p>{@link Write} can be used as a streaming sink, however as with batch mode note that the write
* order of individual {@link Mutation}/{@link MutationGroup} objects is not guaranteed.
*
* <p>{@link Read} and {@link ReadAll} can be used in Streaming pipelines to read a set of Facts on
* pipeline startup.
*
* <p>{@link ReadAll} should not be used on an unbounded {@code PCollection<ReadOperation>}, for the
* reasons stated above.
*
* <h3>Updates to the I/O connector code</h3>
*
* For any significant significant updates to this I/O connector, please consider involving
Expand Down Expand Up @@ -564,8 +618,10 @@ public ReadAll withTimestampBound(TimestampBound timestampBound) {
}

/**
* By default Batch API is used to read data from Cloud Spanner. It is useful to disable
* batching when the underlying query is not root-partitionable.
* By default the <a
* href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">PartitionQuery
* API</a> is used to read data from Cloud Spanner. It is useful to disable batching when the
* underlying query is not root-partitionable.
*/
public ReadAll withBatching(boolean batching) {
return toBuilder().setBatching(batching).build();
Expand All @@ -585,6 +641,15 @@ public ReadAll withHighPriority() {

@Override
public PCollection<Struct> expand(PCollection<ReadOperation> input) {

if (PCollection.IsBounded.UNBOUNDED == input.isBounded()) {
// Warn that SpannerIO.ReadAll should not be used on unbounded inputs.
LOG.warn(
"SpannerIO.ReadAll({}) is being applied to an unbounded input. "
+ "This is not supported and can lead to runtime failures.",
this.getName());
}

PTransform<PCollection<ReadOperation>, PCollection<Struct>> readTransform;
if (getBatching()) {
readTransform =
Expand Down

0 comments on commit 70e41f4

Please sign in to comment.