From a11b75659104d60119e201f00ff353214cfff252 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 05:41:33 +0000 Subject: [PATCH 01/15] chore(spanner): support AbortedException in begin --- .../com/google/cloud/spanner/AbortedException.java | 13 +++++++++++++ .../java/com/google/cloud/spanner/SessionPool.java | 5 +++++ .../google/cloud/spanner/TransactionRunnerImpl.java | 1 + .../connection/ReadWriteTransactionTest.java | 5 +++++ .../connection/SingleUseTransactionTest.java | 6 ++++++ 5 files changed, 30 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java index 3e5227888d9..8b5dee0a7ca 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import com.google.api.gax.rpc.ApiException; +import com.google.protobuf.ByteString; import javax.annotation.Nullable; /** @@ -31,6 +32,7 @@ public class AbortedException extends SpannerException { * new transaction attempt) before a retry can succeed. */ private static final boolean IS_RETRYABLE = false; + private ByteString transactionID; /** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */ AbortedException( @@ -46,6 +48,9 @@ public class AbortedException extends SpannerException { @Nullable ApiException apiException, @Nullable XGoogSpannerRequestId reqId) { super(token, ErrorCode.ABORTED, IS_RETRYABLE, message, cause, apiException, reqId); + if (cause instanceof AbortedException) { + this.transactionID = ((AbortedException) cause).getTransactionID(); + } } /** @@ -55,4 +60,12 @@ public class AbortedException extends SpannerException { public boolean isEmulatorOnlySupportsOneTransactionException() { return getMessage().endsWith("The emulator only supports one transaction at a time."); } + + void setTransactionID(ByteString transactionID) { + this.transactionID = transactionID; + } + + ByteString getTransactionID() { + return this.transactionID; + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 37fa2c5d202..4afc055de67 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -900,6 +900,11 @@ public TransactionContext begin() { return internalBegin(); } + @Override + public TransactionContext begin(AbortedException exception) { + return begin(); + } + private TransactionContext internalBegin() { TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin()); session.get().markUsed(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index a715fae0fad..3f41c91019b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -793,6 +793,7 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction long delay = -1L; if (exceptionToThrow instanceof AbortedException) { delay = exceptionToThrow.getRetryDelayInMillis(); + ((AbortedException) exceptionToThrow).setTransactionID(this.transactionId); } if (delay == -1L) { txnLogger.log( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java index 17994b34682..947fe9d33cf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java @@ -97,6 +97,11 @@ public TransactionContext begin() { return txContext; } + @Override + public TransactionContext begin(AbortedException exception) { + return begin(); + } + @Override public void commit() { switch (commitBehavior) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java index 76ace88d77a..bf4d8655d09 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java @@ -36,6 +36,7 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.AbortedException; import com.google.cloud.spanner.AsyncResultSet; import com.google.cloud.spanner.BatchClient; import com.google.cloud.spanner.CommitResponse; @@ -130,6 +131,11 @@ public TransactionContext begin() { return txContext; } + @Override + public TransactionContext begin(AbortedException exception) { + return begin(); + } + @Override public void commit() { switch (commitBehavior) { From 7270c9b3d6b7c682c55a620e76e9c5260310b5b7 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 05:41:56 +0000 Subject: [PATCH 02/15] chore(spanner): add tests --- ...edSessionDatabaseClientMockServerTest.java | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 126a2df5f42..654399ec8aa 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -2170,6 +2170,148 @@ public void testBatchWriteAtLeastOnce() { assertFalse(mockSpanner.getSession(executeSqlRequests.get(2).getSession()).getMultiplexed()); } + @Test + public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_DuringAborted() { + // Whenever an ABORTED exception occurs, the transaction ID that caused the ABORT should be set in the AbortedException class. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared + // after the first call, so the retry should succeed. + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + + ByteString abortedTransactionID = null; + AbortedException exception = null; + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + try { + try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + } catch (AbortedException e) { + // The transactionID of the Aborted transaction should be set in AbortedException class. + assertNotNull(e.getTransactionID()); + abortedTransactionID = e.getTransactionID(); + exception = e; + } + } + assertNotNull(abortedTransactionID); + assertNotNull(exception); + mockSpanner.clearRequests(); + + // Use AbortedException while creating a new instance of TransactionManager + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(exception); + while (true) { + try { + try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + + List executeSqlRequests = + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, executeSqlRequests.size()); + assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId()); + assertEquals(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId(), abortedTransactionID); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + + @Test + public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_DuringAbortedInExecuteSql() { + // Whenever an ABORTED exception occurs, the transaction ID that caused the ABORT should be set in the AbortedException class. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + ByteString abortedTransactionID = null; + AbortedException exception = null; + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + try { + try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + + try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + } catch (AbortedException e) { + // The transactionID of the Aborted transaction should be set in AbortedException class. + assertNotNull(e.getTransactionID()); + abortedTransactionID = e.getTransactionID(); + exception = e; + } + } + assertNotNull(abortedTransactionID); + assertNotNull(exception); + mockSpanner.clearRequests(); + + // Use AbortedException while creating a new instance of TransactionManager + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(exception); + while (true) { + try { + try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + + List executeSqlRequests = + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, executeSqlRequests.size()); + assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId()); + assertEquals(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId(), abortedTransactionID); + + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = From d0de490a331f337ae1d8ee581defb70c7982fd9d Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 05:49:58 +0000 Subject: [PATCH 03/15] chore(spanner): support for TransactionManager --- .../cloud/spanner/DelayedTransactionManager.java | 5 +++++ .../google/cloud/spanner/TransactionManager.java | 2 ++ .../cloud/spanner/TransactionManagerImpl.java | 13 ++++++++++++- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedTransactionManager.java index 29eae6477fc..96400e9e9bb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedTransactionManager.java @@ -49,6 +49,11 @@ public TransactionContext begin() { return getTransactionManager().begin(); } + @Override + public TransactionContext begin(AbortedException exception) { + return getTransactionManager().begin(exception); + } + @Override public void commit() { getTransactionManager().commit(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java index 76656efea28..48c46d41d24 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java @@ -61,6 +61,8 @@ enum TransactionState { */ TransactionContext begin(); + TransactionContext begin(AbortedException exception); + /** * Commits the currently active transaction. If the transaction was already aborted, then this * would throw an {@link AbortedException}. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index bbf34ab5c8f..ded2f3e870b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -53,8 +53,19 @@ public void setSpan(ISpan span) { @Override public TransactionContext begin() { Preconditions.checkState(txn == null, "begin can only be called once"); + return begin(ByteString.EMPTY); + } + + @Override + public TransactionContext begin(AbortedException exception) { + Preconditions.checkState(txn == null, "begin can only be called once"); + ByteString previousAbortedTransactionID = exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY; + return begin(previousAbortedTransactionID); + } + + TransactionContext begin(ByteString previousTransactionId) { try (IScope s = tracer.withSpan(span)) { - txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY); + txn = session.newTransaction(options, /* previousTransactionId = */ previousTransactionId); session.setActive(this); txnState = TransactionState.STARTED; return txn; From b2f22da1ec53e3a55d7253cd84373f5a3614d959 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 05:52:58 +0000 Subject: [PATCH 04/15] chore(spanner): support AsyncTransactionManager --- .../cloud/spanner/AsyncTransactionManager.java | 2 ++ .../spanner/AsyncTransactionManagerImpl.java | 16 +++++++++++++--- .../spanner/DelayedAsyncTransactionManager.java | 5 +++++ .../SessionPoolAsyncTransactionManager.java | 5 +++++ .../spanner/connection/ConnectionImplTest.java | 6 ++++++ 5 files changed, 31 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java index c6ead432046..835291ccd55 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java @@ -170,6 +170,8 @@ interface AsyncTransactionFunction { */ TransactionContextFuture beginAsync(); + TransactionContextFuture beginAsync(AbortedException abortedException); + /** * Rolls back the currently active transaction. In most cases there should be no need to call this * explicitly since {@link #close()} would automatically roll back any active transaction. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 1578de87cdb..0f69290ffd3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -76,14 +76,24 @@ public ApiFuture closeAsync() { @Override public TransactionContextFutureImpl beginAsync() { Preconditions.checkState(txn == null, "begin can only be called once"); - return new TransactionContextFutureImpl(this, internalBeginAsync(true)); + return new TransactionContextFutureImpl(this, internalBeginAsync(true, ByteString.EMPTY)); } - private ApiFuture internalBeginAsync(boolean firstAttempt) { + @Override + public TransactionContextFutureImpl beginAsync(AbortedException abortedException) { + Preconditions.checkState(txn == null, "begin can only be called once"); + ByteString abortedTransactionId = abortedException.getTransactionID() != null ? abortedException.getTransactionID() : ByteString.EMPTY; + return new TransactionContextFutureImpl(this, internalBeginAsync(true, abortedTransactionId)); + } + + private ApiFuture internalBeginAsync(boolean firstAttempt, ByteString abortedTransactionID) { txnState = TransactionState.STARTED; // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; + if (firstAttempt && session.getIsMultiplexed()) { + multiplexedSessionPreviousTransactionId = abortedTransactionID; + } if (txn != null && session.getIsMultiplexed() && !firstAttempt) { // Use the current transactionId if available, otherwise fallback to the previous aborted // transactionId. @@ -187,7 +197,7 @@ public TransactionContextFuture resetForRetryAsync() { throw new IllegalStateException( "resetForRetry can only be called if the previous attempt aborted"); } - return new TransactionContextFutureImpl(this, internalBeginAsync(false)); + return new TransactionContextFutureImpl(this, internalBeginAsync(false, ByteString.EMPTY)); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncTransactionManager.java index 56b874e4a87..bafaf630872 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncTransactionManager.java @@ -50,6 +50,11 @@ public TransactionContextFuture beginAsync() { return getAsyncTransactionManager().beginAsync(); } + @Override + public TransactionContextFuture beginAsync(AbortedException abortedException) { + return getAsyncTransactionManager().beginAsync(abortedException); + } + @Override public ApiFuture rollbackAsync() { return getAsyncTransactionManager().rollbackAsync(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 3d6a015531f..e3c83a5c90f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -163,6 +163,11 @@ public void onSuccess(TransactionContext result) { return new TransactionContextFutureImpl(this, delegateTxnFuture); } + @Override + public TransactionContextFuture beginAsync(AbortedException abortedException) { + return beginAsync(); + } + @Override public void onError(Throwable t) { if (t instanceof AbortedException) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java index d4b7c035658..ead2fd0f655 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java @@ -46,6 +46,7 @@ import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.NoCredentials; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.AbortedException; import com.google.cloud.spanner.BatchClient; import com.google.cloud.spanner.BatchReadOnlyTransaction; import com.google.cloud.spanner.BatchTransactionId; @@ -120,6 +121,11 @@ public TransactionContext begin() { return txContext; } + @Override + public TransactionContext begin(AbortedException exception) { + return begin(); + } + @Override public void commit() { Timestamp commitTimestamp = Timestamp.now(); From 28704e389c68a3500b0a6ef790bdb446ab738f45 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 06:06:41 +0000 Subject: [PATCH 05/15] chore(spanner): fix test --- .../MultiplexedSessionDatabaseClientMockServerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 654399ec8aa..0e8ff9f267c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -2233,8 +2233,8 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du assertEquals(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId(), abortedTransactionID); assertNotNull(client.multiplexedSessionDatabaseClient); - assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); - assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } @Test From 00360f0aab210c54c83bc9bb85956ffeded36e08 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 07:39:10 +0000 Subject: [PATCH 06/15] chore(spanner): lint format --- .../cloud/spanner/AbortedException.java | 1 + .../spanner/AsyncTransactionManager.java | 11 ++++- .../spanner/AsyncTransactionManagerImpl.java | 8 ++-- .../DelayedAsyncTransactionManager.java | 4 +- .../com/google/cloud/spanner/SessionPool.java | 2 + .../SessionPoolAsyncTransactionManager.java | 4 +- .../cloud/spanner/TransactionManager.java | 8 ++++ .../cloud/spanner/TransactionManagerImpl.java | 3 +- ...edSessionDatabaseClientMockServerTest.java | 44 +++++++++++++++---- 9 files changed, 69 insertions(+), 16 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java index 8b5dee0a7ca..74e45062113 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java @@ -32,6 +32,7 @@ public class AbortedException extends SpannerException { * new transaction attempt) before a retry can succeed. */ private static final boolean IS_RETRYABLE = false; + private ByteString transactionID; /** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */ diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java index 835291ccd55..045dd9c0033 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java @@ -170,7 +170,16 @@ interface AsyncTransactionFunction { */ TransactionContextFuture beginAsync(); - TransactionContextFuture beginAsync(AbortedException abortedException); + /** + * Initializes a new read-write transaction. This method must be called before performing any + * operations, and it can only be invoked once per transaction lifecycle. + * + *

This is especially useful in scenarios involving multiplexed sessions and when creating a + * new transaction for retry attempts. If {@link #resetForRetryAsync()} is not used, you can pass + * the {@link AbortedException} from a previous attempt here to preserve the transaction's + * priority. + */ + TransactionContextFuture beginAsync(AbortedException exception); /** * Rolls back the currently active transaction. In most cases there should be no need to call this diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 0f69290ffd3..96249a80aab 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -80,13 +80,15 @@ public TransactionContextFutureImpl beginAsync() { } @Override - public TransactionContextFutureImpl beginAsync(AbortedException abortedException) { + public TransactionContextFutureImpl beginAsync(AbortedException exception) { Preconditions.checkState(txn == null, "begin can only be called once"); - ByteString abortedTransactionId = abortedException.getTransactionID() != null ? abortedException.getTransactionID() : ByteString.EMPTY; + ByteString abortedTransactionId = + exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY; return new TransactionContextFutureImpl(this, internalBeginAsync(true, abortedTransactionId)); } - private ApiFuture internalBeginAsync(boolean firstAttempt, ByteString abortedTransactionID) { + private ApiFuture internalBeginAsync( + boolean firstAttempt, ByteString abortedTransactionID) { txnState = TransactionState.STARTED; // Determine the latest transactionId when using a multiplexed session. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncTransactionManager.java index bafaf630872..530670960ca 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncTransactionManager.java @@ -51,8 +51,8 @@ public TransactionContextFuture beginAsync() { } @Override - public TransactionContextFuture beginAsync(AbortedException abortedException) { - return getAsyncTransactionManager().beginAsync(abortedException); + public TransactionContextFuture beginAsync(AbortedException exception) { + return getAsyncTransactionManager().beginAsync(exception); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 4afc055de67..a7548758b35 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -902,6 +902,8 @@ public TransactionContext begin() { @Override public TransactionContext begin(AbortedException exception) { + // For regular sessions, the input exception is ignored and the behavior is equivalent to + // calling {@link #begin()}. return begin(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index e3c83a5c90f..5e48d1b78bc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -164,7 +164,9 @@ public void onSuccess(TransactionContext result) { } @Override - public TransactionContextFuture beginAsync(AbortedException abortedException) { + public TransactionContextFuture beginAsync(AbortedException exception) { + // For regular sessions, the input exception is ignored and the behavior is equivalent to + // calling {@link #beginAsync()}. return beginAsync(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java index 48c46d41d24..35018f81975 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java @@ -61,6 +61,14 @@ enum TransactionState { */ TransactionContext begin(); + /** + * Initializes a new read-write transaction. This method must be called before performing any + * operations, and it can only be invoked once per transaction lifecycle. + * + *

This is especially useful in scenarios involving multiplexed sessions and when creating a + * new transaction for retry attempts. If {@link #resetForRetry()} is not used, you can pass the + * {@link AbortedException} from a previous attempt here to preserve the transaction's priority. + */ TransactionContext begin(AbortedException exception); /** diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index ded2f3e870b..e49379ed861 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -59,7 +59,8 @@ public TransactionContext begin() { @Override public TransactionContext begin(AbortedException exception) { Preconditions.checkState(txn == null, "begin can only be called once"); - ByteString previousAbortedTransactionID = exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY; + ByteString previousAbortedTransactionID = + exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY; return begin(previousAbortedTransactionID); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 0e8ff9f267c..04e2071ce02 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -2172,7 +2172,8 @@ public void testBatchWriteAtLeastOnce() { @Test public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_DuringAborted() { - // Whenever an ABORTED exception occurs, the transaction ID that caused the ABORT should be set in the AbortedException class. + // Whenever an ABORTED exception occurs, the transaction ID that caused the ABORT should be set + // in the AbortedException class. DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared @@ -2229,8 +2230,21 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); assertEquals(1, executeSqlRequests.size()); assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed()); - assertNotNull(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId()); - assertEquals(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId(), abortedTransactionID); + assertNotNull( + executeSqlRequests + .get(0) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertEquals( + executeSqlRequests + .get(0) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId(), + abortedTransactionID); assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); @@ -2238,8 +2252,10 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du } @Test - public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_DuringAbortedInExecuteSql() { - // Whenever an ABORTED exception occurs, the transaction ID that caused the ABORT should be set in the AbortedException class. + public void + testReadWriteTransactionUsingTransactionManager_SetsTransactionID_DuringAbortedInExecuteSql() { + // Whenever an ABORTED exception occurs, the transaction ID that caused the ABORT should be set + // in the AbortedException class. DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); @@ -2303,9 +2319,21 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); assertEquals(1, executeSqlRequests.size()); assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed()); - assertNotNull(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId()); - assertEquals(executeSqlRequests.get(0).getTransaction().getBegin().getReadWrite().getMultiplexedSessionPreviousTransactionId(), abortedTransactionID); - + assertNotNull( + executeSqlRequests + .get(0) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertEquals( + executeSqlRequests + .get(0) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId(), + abortedTransactionID); assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); From e5ef71eda39a6b258b6be026e91f1c7e870de5ca Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 09:35:13 +0000 Subject: [PATCH 07/15] chore(spanner): update documentation --- ...edSessionDatabaseClientMockServerTest.java | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 04e2071ce02..0e1908b4598 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -2171,9 +2171,17 @@ public void testBatchWriteAtLeastOnce() { } @Test - public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_DuringAborted() { - // Whenever an ABORTED exception occurs, the transaction ID that caused the ABORT should be set - // in the AbortedException class. + public void + testRWTransactionWithTransactionManager_CommitAborted_SetsTransactionId_AndUsedInNewInstance() { + // The below test verifies the behaviour of begin(AbortedException) method which is used to + // maintain transaction priority if resetForRetry() is not called. + + // This test performs the following steps: + // 1. Simulates an ABORTED exception during commit and verifies that the transaction ID is + // included in the AbortedException. + // 2. Passes the ABORTED exception to the begin(AbortedException) method of a new + // TransactionManager, and verifies that the transaction ID from the failed transaction is sent + // during the inline begin of the first request. DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared @@ -2202,11 +2210,12 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du exception = e; } } + // Verify that the transactionID of the aborted transaction is set. assertNotNull(abortedTransactionID); assertNotNull(exception); mockSpanner.clearRequests(); - // Use AbortedException while creating a new instance of TransactionManager + // Pass AbortedException while invoking begin on the new manager instance. try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(exception); while (true) { @@ -2226,6 +2235,8 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du } } + // Verify that the ExecuteSqlRequest with the inline begin passes the transactionID of the + // previously aborted transaction. List executeSqlRequests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); assertEquals(1, executeSqlRequests.size()); @@ -2253,9 +2264,13 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du @Test public void - testReadWriteTransactionUsingTransactionManager_SetsTransactionID_DuringAbortedInExecuteSql() { - // Whenever an ABORTED exception occurs, the transaction ID that caused the ABORT should be set - // in the AbortedException class. + testRWTransactionWithTransactionManager_ExecuteSQLAborted_SetsTransactionId_AndUsedInNewInstance() { + // This test performs the following steps: + // 1. Simulates an ABORTED exception during ExecuteSQL and verifies that the transaction ID is + // included in the AbortedException. + // 2. Passes the ABORTED exception to the begin(AbortedException) method of a new + // TransactionManager, and verifies that the transaction ID from the failed transaction is sent + // during the inline begin of the first request. DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); @@ -2271,6 +2286,7 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du } } + // Simulate an ABORTED in next ExecuteSQL request. mockSpanner.setExecuteStreamingSqlExecutionTime( SimulatedExecutionTime.ofException( mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); @@ -2290,12 +2306,12 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du exception = e; } } + // Verify that the transactionID of the aborted transaction is set. assertNotNull(abortedTransactionID); assertNotNull(exception); mockSpanner.clearRequests(); - // Use AbortedException while creating a new instance of TransactionManager - + // Pass AbortedException while invoking begin on the new manager instance. try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(exception); while (true) { @@ -2315,6 +2331,8 @@ public void testReadWriteTransactionUsingTransactionManager_SetsTransactionID_Du } } + // Verify that the ExecuteSqlRequest with inline begin includes the transaction ID from the + // previously aborted transaction. List executeSqlRequests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); assertEquals(1, executeSqlRequests.size()); From c11f3a21b19f1490b4f2b731f28989697d4a3b46 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 10:05:06 +0000 Subject: [PATCH 08/15] chore(spanner): add test for AsyncTransactionManager --- ...edSessionDatabaseClientMockServerTest.java | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 0e1908b4598..d0841e3ac40 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -2358,6 +2358,91 @@ public void testBatchWriteAtLeastOnce() { assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void + testRWTransactionWithAsyncTransactionManager_CommitAborted_SetsTransactionId_AndUsedInNewInstance() + throws Exception { + // This test performs the following steps: + // 1. Simulates an ABORTED exception during ExecuteSQL and verifies that the transaction ID is + // included in the AbortedException. + // 2. Passes the ABORTED exception to the begin(AbortedException) method of a new + // AsyncTransactionManager, and verifies that the transaction ID from the failed transaction is + // sent + // during the inline begin of the first request. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared + // after the first call, so the retry should succeed. + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + ByteString abortedTransactionID = null; + AbortedException exception = null; + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture transactionContextFuture = manager.beginAsync(); + try { + AsyncTransactionStep updateCount = + transactionContextFuture.then( + (transaction, ignored) -> transaction.executeUpdateAsync(UPDATE_STATEMENT), + MoreExecutors.directExecutor()); + CommitTimestampFuture commitTimestamp = updateCount.commitAsync(); + assertEquals(UPDATE_COUNT, updateCount.get().longValue()); + assertNotNull(commitTimestamp.get()); + } catch (AbortedException e) { + assertNotNull(e.getTransactionID()); + exception = e; + abortedTransactionID = e.getTransactionID(); + } + } + + // Verify that the transactionID of the aborted transaction is set. + assertNotNull(abortedTransactionID); + assertNotNull(exception); + mockSpanner.clearRequests(); + + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture transactionContextFuture = manager.beginAsync(exception); + while (true) { + try { + AsyncTransactionStep updateCount = + transactionContextFuture.then( + (transaction, ignored) -> transaction.executeUpdateAsync(UPDATE_STATEMENT), + MoreExecutors.directExecutor()); + CommitTimestampFuture commitTimestamp = updateCount.commitAsync(); + assertEquals(UPDATE_COUNT, updateCount.get().longValue()); + assertNotNull(commitTimestamp.get()); + break; + } catch (AbortedException e) { + transactionContextFuture = manager.resetForRetryAsync(); + } + } + } + + List executeSqlRequests = + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, executeSqlRequests.size()); + assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed()); + assertNotNull( + executeSqlRequests + .get(0) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId()); + assertEquals( + executeSqlRequests + .get(0) + .getTransaction() + .getBegin() + .getReadWrite() + .getMultiplexedSessionPreviousTransactionId(), + abortedTransactionID); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = From fa31c5156da38e7db69693db32307c6855e3fa93 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 24 Apr 2025 10:17:00 +0000 Subject: [PATCH 09/15] chore(spanner): add clirr --- google-cloud-spanner/clirr-ignored-differences.xml | 12 ++++++++++++ .../cloud/spanner/AsyncTransactionManager.java | 2 ++ .../com/google/cloud/spanner/TransactionManager.java | 2 ++ 3 files changed, 16 insertions(+) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 28e22e4a86c..be69039f8cb 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -964,4 +964,16 @@ com/google/cloud/spanner/DatabaseClient com.google.cloud.spanner.Statement$StatementFactory getStatementFactory() + + + + 7012 + com/google/cloud/spanner/AsyncTransactionManager + com.google.cloud.spanner.AsyncTransactionManager$TransactionContextFuture beginAsync(com.google.cloud.spanner.AbortedException) + + + 7012 + com/google/cloud/spanner/TransactionManager + com.google.cloud.spanner.TransactionContext begin(com.google.cloud.spanner.AbortedException) + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java index 045dd9c0033..a33d08a8fae 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java @@ -178,6 +178,8 @@ interface AsyncTransactionFunction { * new transaction for retry attempts. If {@link #resetForRetryAsync()} is not used, you can pass * the {@link AbortedException} from a previous attempt here to preserve the transaction's * priority. + * + *

For regular sessions, this behaves the same as {@link #beginAsync()}. */ TransactionContextFuture beginAsync(AbortedException exception); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java index 35018f81975..0293d77e8de 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java @@ -68,6 +68,8 @@ enum TransactionState { *

This is especially useful in scenarios involving multiplexed sessions and when creating a * new transaction for retry attempts. If {@link #resetForRetry()} is not used, you can pass the * {@link AbortedException} from a previous attempt here to preserve the transaction's priority. + * + *

For regular sessions, this behaves the same as {@link #begin()}. */ TransactionContext begin(AbortedException exception); From 444d1e1d6cfece2c53f22ff60d74f7362e4d0250 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Fri, 25 Apr 2025 04:54:34 +0000 Subject: [PATCH 10/15] chore(spanner): for debugging --- .../java/com/google/cloud/spanner/SessionPoolOptionsTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java index 9e16b3fb1c8..b64ac9b4eed 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java @@ -304,6 +304,10 @@ public void testUseMultiplexedSession() { @Test public void testUseMultiplexedSessionForRW() { // skip these tests since this configuration can have dual behaviour in different test-runners + System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); + System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSessionForRW()); + System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS")); + System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW")); assumeFalse(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); assumeFalse(SessionPoolOptions.newBuilder().build().getUseMultiplexedSessionForRW()); From 22de193367e2e3b5dac4f4481ad9f9289ea4fa50 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Fri, 25 Apr 2025 05:42:52 +0000 Subject: [PATCH 11/15] chore(spanner): find java version running tests --- .../com/google/cloud/spanner/SessionPoolOptionsTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java index b64ac9b4eed..bf8561b80a0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java @@ -284,6 +284,11 @@ public void testRandomizePositionQPSThreshold() { @Test public void testUseMultiplexedSession() { // skip these tests since this configuration can have dual behaviour in different test-runners + System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); + System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSessionForRW()); + System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS")); + System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW")); + System.out.println("Running test on Java version: " + System.getProperty("java.version")); assumeFalse(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); assertEquals(false, SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); assertEquals( @@ -308,6 +313,7 @@ public void testUseMultiplexedSessionForRW() { System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSessionForRW()); System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS")); System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW")); + System.out.println("Running test on Java version: " + System.getProperty("java.version")); assumeFalse(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); assumeFalse(SessionPoolOptions.newBuilder().build().getUseMultiplexedSessionForRW()); From 234c7325ae94bdecb448b4008a2f07d4a2dd7513 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Fri, 25 Apr 2025 06:18:09 +0000 Subject: [PATCH 12/15] chore(spanner): inject env --- .github/workflows/ci.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b2b187f4ac5..4aef938da19 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -86,6 +86,8 @@ jobs: distribution: temurin - run: echo "SUREFIRE_JVM_OPT=-Djvm=${JAVA_HOME}/bin/java" >> $GITHUB_ENV shell: bash + - run: echo "MAVEN_OPTS=-DGOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS=true -DGOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW=true" >> $GITHUB_ENV + shell: bash - uses: actions/setup-java@v3 with: java-version: 17 From d399cbdfb0abe3922617971f2914d54bc220efcc Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Fri, 25 Apr 2025 09:52:19 +0000 Subject: [PATCH 13/15] chore(spanner): verify env for skipping tests --- .../google/cloud/spanner/SessionPoolOptionsTest.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java index bf8561b80a0..d4b36e97488 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java @@ -284,11 +284,6 @@ public void testRandomizePositionQPSThreshold() { @Test public void testUseMultiplexedSession() { // skip these tests since this configuration can have dual behaviour in different test-runners - System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); - System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSessionForRW()); - System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS")); - System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW")); - System.out.println("Running test on Java version: " + System.getProperty("java.version")); assumeFalse(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); assertEquals(false, SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); assertEquals( @@ -309,11 +304,8 @@ public void testUseMultiplexedSession() { @Test public void testUseMultiplexedSessionForRW() { // skip these tests since this configuration can have dual behaviour in different test-runners - System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); - System.out.println(SessionPoolOptions.newBuilder().build().getUseMultiplexedSessionForRW()); - System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS")); - System.out.println(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW")); - System.out.println("Running test on Java version: " + System.getProperty("java.version")); + assumeFalse( + Boolean.parseBoolean(System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"))); assumeFalse(SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); assumeFalse(SessionPoolOptions.newBuilder().build().getUseMultiplexedSessionForRW()); From a149a4fc389e53c45a44cb1130a33642278c6f1b Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Fri, 25 Apr 2025 09:54:37 +0000 Subject: [PATCH 14/15] chore(spanner): remove injecting env --- .github/workflows/ci.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4aef938da19..b2b187f4ac5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -86,8 +86,6 @@ jobs: distribution: temurin - run: echo "SUREFIRE_JVM_OPT=-Djvm=${JAVA_HOME}/bin/java" >> $GITHUB_ENV shell: bash - - run: echo "MAVEN_OPTS=-DGOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS=true -DGOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW=true" >> $GITHUB_ENV - shell: bash - uses: actions/setup-java@v3 with: java-version: 17 From 3261ed630a786a4e9bfcab21843580a8e60b7131 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 28 Apr 2025 13:29:35 +0000 Subject: [PATCH 15/15] chore(spanner): review comments --- .../cloud/spanner/AsyncTransactionManager.java | 14 ++++++++------ .../cloud/spanner/AsyncTransactionManagerImpl.java | 1 + .../google/cloud/spanner/TransactionManager.java | 13 ++++++++----- .../cloud/spanner/TransactionManagerImpl.java | 3 ++- .../cloud/spanner/TransactionRunnerImpl.java | 6 +++++- 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java index a33d08a8fae..bb5140c4755 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java @@ -171,13 +171,15 @@ interface AsyncTransactionFunction { TransactionContextFuture beginAsync(); /** - * Initializes a new read-write transaction. This method must be called before performing any - * operations, and it can only be invoked once per transaction lifecycle. + * Initializes a new read-write transaction that is a retry of a previously aborted transaction. + * This method must be called before performing any operations, and it can only be invoked once + * per transaction lifecycle. * - *

This is especially useful in scenarios involving multiplexed sessions and when creating a - * new transaction for retry attempts. If {@link #resetForRetryAsync()} is not used, you can pass - * the {@link AbortedException} from a previous attempt here to preserve the transaction's - * priority. + *

This method should only be used when multiplexed sessions are enabled to create a retry for + * a previously aborted transaction. This method can be used instead of {@link + * #resetForRetryAsync()} to create a retry. Using this method or {@link #resetForRetryAsync()} + * will have the same effect. You must pass in the {@link AbortedException} from the previous + * attempt to preserve the transaction's priority. * *

For regular sessions, this behaves the same as {@link #beginAsync()}. */ diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 96249a80aab..1f7fd2a0cbe 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -82,6 +82,7 @@ public TransactionContextFutureImpl beginAsync() { @Override public TransactionContextFutureImpl beginAsync(AbortedException exception) { Preconditions.checkState(txn == null, "begin can only be called once"); + Preconditions.checkNotNull(exception, "AbortedException from the previous attempt is required"); ByteString abortedTransactionId = exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY; return new TransactionContextFutureImpl(this, internalBeginAsync(true, abortedTransactionId)); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java index 0293d77e8de..350adb2a2c2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java @@ -62,12 +62,15 @@ enum TransactionState { TransactionContext begin(); /** - * Initializes a new read-write transaction. This method must be called before performing any - * operations, and it can only be invoked once per transaction lifecycle. + * Initializes a new read-write transaction that is a retry of a previously aborted transaction. + * This method must be called before performing any operations, and it can only be invoked once + * per transaction lifecycle. * - *

This is especially useful in scenarios involving multiplexed sessions and when creating a - * new transaction for retry attempts. If {@link #resetForRetry()} is not used, you can pass the - * {@link AbortedException} from a previous attempt here to preserve the transaction's priority. + *

This method should only be used when multiplexed sessions are enabled to create a retry for + * a previously aborted transaction. This method can be used instead of {@link #resetForRetry()} + * to create a retry. Using this method or {@link #resetForRetry()} will have the same effect. You + * must pass in the {@link AbortedException} from the previous attempt to preserve the + * transaction's priority. * *

For regular sessions, this behaves the same as {@link #begin()}. */ diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index e49379ed861..aaed30e7fa7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -59,6 +59,7 @@ public TransactionContext begin() { @Override public TransactionContext begin(AbortedException exception) { Preconditions.checkState(txn == null, "begin can only be called once"); + Preconditions.checkNotNull(exception, "AbortedException from the previous attempt is required"); ByteString previousAbortedTransactionID = exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY; return begin(previousAbortedTransactionID); @@ -66,7 +67,7 @@ public TransactionContext begin(AbortedException exception) { TransactionContext begin(ByteString previousTransactionId) { try (IScope s = tracer.withSpan(span)) { - txn = session.newTransaction(options, /* previousTransactionId = */ previousTransactionId); + txn = session.newTransaction(options, previousTransactionId); session.setActive(this); txnState = TransactionState.STARTED; return txn; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 3f41c91019b..388301d9bf7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -793,7 +793,11 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction long delay = -1L; if (exceptionToThrow instanceof AbortedException) { delay = exceptionToThrow.getRetryDelayInMillis(); - ((AbortedException) exceptionToThrow).setTransactionID(this.transactionId); + ((AbortedException) exceptionToThrow) + .setTransactionID( + this.transactionId != null + ? this.transactionId + : this.getPreviousTransactionId()); } if (delay == -1L) { txnLogger.log(