From 9a5d86bf2f32a689f96f909f44c91af094e5b8de Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Wed, 29 Jan 2025 17:49:24 +0530 Subject: [PATCH] chore(spanner): add multiplexed session support for batch write (#3470) * chore(spanner): add multiplexed session support for batch write * chore(spanner): lint fix --- ...tractMultiplexedSessionDatabaseClient.java | 10 ----- .../cloud/spanner/DatabaseClientImpl.java | 3 ++ .../DelayedMultiplexedSessionTransaction.java | 18 ++++++++ .../MultiplexedSessionDatabaseClient.java | 10 +++++ .../com/google/cloud/spanner/SessionImpl.java | 1 + ...edSessionDatabaseClientMockServerTest.java | 41 +++++++++++++++++++ 6 files changed, 73 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index 10ab997d88a..073b7b3d2d9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -16,10 +16,7 @@ package com.google.cloud.spanner; -import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.Options.TransactionOption; -import com.google.spanner.v1.BatchWriteResponse; /** * Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link @@ -43,11 +40,4 @@ public String getDatabaseRole() { public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); } - - @Override - public ServerStream batchWriteAtLeastOnce( - Iterable mutationGroups, TransactionOption... options) - throws SpannerException { - throw new UnsupportedOperationException(); - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 7957838e408..92971ff320f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -189,6 +189,9 @@ public ServerStream batchWriteAtLeastOnce( throws SpannerException { ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { + if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) { + return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options); + } return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options)); } catch (RuntimeException e) { span.setStatus(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 0193805cbeb..90de3d7de31 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -20,12 +20,14 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchWriteResponse; import java.util.concurrent.ExecutionException; /** @@ -164,6 +166,22 @@ public CommitResponse writeWithOptions(Iterable mutations, Transaction } } + /** + * This is a blocking method, as the interface that it implements is also defined as a blocking + * method. + */ + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + SessionReference sessionReference = getSessionReference(); + try (MultiplexedSessionTransaction transaction = + new MultiplexedSessionTransaction( + client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true)) { + return transaction.batchWriteAtLeastOnce(mutationGroups, options); + } + } + @Override public TransactionRunner readWriteTransaction(TransactionOption... options) { return new DelayedTransactionRunner( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 89371a21c51..33ddcdeb0cb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; @@ -30,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchWriteResponse; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.Transaction; @@ -505,6 +507,14 @@ public CommitResponse writeAtLeastOnceWithOptions( .writeAtLeastOnceWithOptions(mutations, options); } + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + return createMultiplexedSessionTransaction(/* singleUse = */ true) + .batchWriteAtLeastOnce(mutationGroups, options); + } + @Override public ReadContext singleUse() { return createMultiplexedSessionTransaction(/* singleUse = */ true).singleUse(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 2f0d86b6314..454709275f8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -321,6 +321,7 @@ public ServerStream batchWriteAtLeastOnce( throw SpannerExceptionFactory.newSpannerException(e); } finally { span.end(); + onTransactionDone(); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index c808bbe1110..edafa15f00d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -30,6 +30,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.NoCredentials; import com.google.cloud.Timestamp; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep; @@ -45,6 +46,8 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import com.google.spanner.v1.BatchWriteRequest; +import com.google.spanner.v1.BatchWriteResponse; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteSqlRequest; @@ -1635,6 +1638,44 @@ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() { assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testBatchWriteAtLeastOnce() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + Iterable MUTATION_GROUPS = + ImmutableList.of( + MutationGroup.of( + Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(), + Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()), + MutationGroup.of( + Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(), + Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build())); + + ServerStream responseStream = client.batchWriteAtLeastOnce(MUTATION_GROUPS); + int idx = 0; + for (BatchWriteResponse response : responseStream) { + assertEquals( + response.getStatus(), + com.google.rpc.Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE).build()); + assertEquals(response.getIndexesList(), ImmutableList.of(idx, idx + 1)); + idx += 2; + } + + assertNotNull(responseStream); + List requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class); + assertEquals(requests.size(), 1); + BatchWriteRequest request = requests.get(0); + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertEquals(request.getMutationGroupsCount(), 2); + assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED); + assertFalse(request.getExcludeTxnFromChangeStreams()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference =