diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 52f33ec25cc..5cf0ea103bd 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -143,69 +143,74 @@ private WriteConcern getWriteConcern(@Nullable final TimeoutContext timeoutConte @Override public Publisher commitTransaction() { - if (transactionState == TransactionState.ABORTED) { - throw new IllegalStateException("Cannot call commitTransaction after calling abortTransaction"); - } - if (transactionState == TransactionState.NONE) { - throw new IllegalStateException("There is no transaction started"); - } - if (!messageSentInCurrentTransaction) { - cleanupTransaction(TransactionState.COMMITTED); - return Mono.create(MonoSink::success); - } else { - ReadConcern readConcern = transactionOptions.getReadConcern(); - if (readConcern == null) { - throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); + return Mono.defer(() -> { + if (transactionState == TransactionState.ABORTED) { + return Mono.error(new IllegalStateException("Cannot call commitTransaction after calling abortTransaction")); } - boolean alreadyCommitted = commitInProgress || transactionState == TransactionState.COMMITTED; - commitInProgress = true; - resetTimeout(); - TimeoutContext timeoutContext = getTimeoutContext(); - WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); - return executor - .execute( - new CommitTransactionOperation(writeConcern, alreadyCommitted) - .recoveryToken(getRecoveryToken()), readConcern, this) - .doOnTerminate(() -> { - commitInProgress = false; - transactionState = TransactionState.COMMITTED; - }) - .doOnError(MongoException.class, this::clearTransactionContextOnError); - } + if (transactionState == TransactionState.NONE) { + return Mono.error(new IllegalStateException("There is no transaction started")); + } + if (!messageSentInCurrentTransaction) { + cleanupTransaction(TransactionState.COMMITTED); + return Mono.create(MonoSink::success); + } else { + ReadConcern readConcern = transactionOptions.getReadConcern(); + if (readConcern == null) { + return Mono.error(new MongoInternalException("Invariant violated. Transaction options read concern can not be null")); + } + boolean alreadyCommitted = commitInProgress || transactionState == TransactionState.COMMITTED; + commitInProgress = true; + resetTimeout(); + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); + return executor + .execute( + new CommitTransactionOperation(writeConcern, alreadyCommitted) + .recoveryToken(getRecoveryToken()), readConcern, this) + .doOnTerminate(() -> { + commitInProgress = false; + transactionState = TransactionState.COMMITTED; + }) + .doOnError(MongoException.class, this::clearTransactionContextOnError); + } + }); } + @Override public Publisher abortTransaction() { - if (transactionState == TransactionState.ABORTED) { - throw new IllegalStateException("Cannot call abortTransaction twice"); - } - if (transactionState == TransactionState.COMMITTED) { - throw new IllegalStateException("Cannot call abortTransaction after calling commitTransaction"); - } - if (transactionState == TransactionState.NONE) { - throw new IllegalStateException("There is no transaction started"); - } - if (!messageSentInCurrentTransaction) { - cleanupTransaction(TransactionState.ABORTED); - return Mono.create(MonoSink::success); - } else { - ReadConcern readConcern = transactionOptions.getReadConcern(); - if (readConcern == null) { - throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); + return Mono.defer(() -> { + if (transactionState == TransactionState.ABORTED) { + throw new IllegalStateException("Cannot call abortTransaction twice"); } - - resetTimeout(); - TimeoutContext timeoutContext = getTimeoutContext(); - WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); - return executor - .execute(new AbortTransactionOperation(writeConcern) - .recoveryToken(getRecoveryToken()), readConcern, this) - .onErrorResume(Throwable.class, (e) -> Mono.empty()) - .doOnTerminate(() -> { - clearTransactionContext(); - cleanupTransaction(TransactionState.ABORTED); - }); - } + if (transactionState == TransactionState.COMMITTED) { + throw new IllegalStateException("Cannot call abortTransaction after calling commitTransaction"); + } + if (transactionState == TransactionState.NONE) { + throw new IllegalStateException("There is no transaction started"); + } + if (!messageSentInCurrentTransaction) { + cleanupTransaction(TransactionState.ABORTED); + return Mono.create(MonoSink::success); + } else { + ReadConcern readConcern = transactionOptions.getReadConcern(); + if (readConcern == null) { + throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); + } + + resetTimeout(); + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); + return executor + .execute(new AbortTransactionOperation(writeConcern) + .recoveryToken(getRecoveryToken()), readConcern, this) + .onErrorResume(Throwable.class, (e) -> Mono.empty()) + .doOnTerminate(() -> { + clearTransactionContext(); + cleanupTransaction(TransactionState.ABORTED); + }); + } + }); } private void clearTransactionContextOnError(final MongoException e) { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java new file mode 100644 index 00000000000..7aafe4f9e22 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.reactivestreams.client; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.WriteConcern; +import org.bson.Document; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.concurrent.TimeUnit; + +import static com.mongodb.ClusterFixture.getDefaultDatabaseName; +import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; +import static com.mongodb.ClusterFixture.isSharded; +import static com.mongodb.ClusterFixture.serverVersionAtLeast; +import static org.junit.jupiter.api.Assumptions.abort; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +public class TransactionProseTest { + private MongoClient client; + private MongoCollection collection; + + @BeforeEach + public void setUp() { + assumeTrue(isSharded()); + ConnectionString multiMongosConnectionString = getMultiMongosConnectionString(); + assumeTrue(multiMongosConnectionString != null); + + MongoClientSettings.Builder builder = MongoClientSettings.builder() + .applyConnectionString(multiMongosConnectionString); + + client = MongoClients.create(MongoClientSettings.builder(builder.build()) + .applyToSocketSettings(builder1 -> builder1.readTimeout(5, TimeUnit.SECONDS)) + .build()); + + collection = client.getDatabase(getDefaultDatabaseName()).getCollection(getClass().getName()); + StepVerifier.create(Mono.from(collection.drop())).verifyComplete(); + } + + @AfterEach + public void tearDown() { + if (collection != null) { + StepVerifier.create(Mono.from(collection.drop())).verifyComplete(); + } + if (client != null) { + client.close(); + } + } + + @DisplayName("Mongos Pinning Prose Tests: 1. Test that starting a new transaction on a pinned ClientSession unpins the session " + + "and normal server selection is performed for the next operation.") + @Test + void testNewTransactionUnpinsSession() { + abort("There is no ability to get the server address with the reactive api"); + } + + @DisplayName("Mongos Pinning Prose Tests: 2. Test non-transaction operations using a pinned ClientSession unpins the session" + + " and normal server selection is performed") + @Test + void testNonTransactionOpsUnpinsSession() { + abort("There is no ability to get the server address with the reactive api"); + } + + @DisplayName("Options Inside Transaction Prose Tests. 1. Write concern not inherited from collection object inside transaction") + @Test + void testWriteConcernInheritance() { + assumeTrue(serverVersionAtLeast(4, 4)); + Mono testWriteConcern = Mono.from(client.startSession()) + .flatMap(session -> + Mono.fromRunnable(session::startTransaction) + .then(Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1)))) + .then(Mono.from(session.commitTransaction())) + .then(Mono.from(collection.find(new Document("n", 1)).first())) + .doFinally(signalType -> session.close()) + ); + + StepVerifier.create(testWriteConcern).assertNext(Assertions::assertNotNull).verifyComplete(); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java index 9a1426ad887..1f0209247fc 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java @@ -16,12 +16,16 @@ package com.mongodb.client; +import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.MongoException; +import com.mongodb.ServerAddress; +import com.mongodb.WriteConcern; import org.bson.Document; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; import java.util.HashSet; import java.util.Set; @@ -30,19 +34,25 @@ import static com.mongodb.ClusterFixture.getDefaultDatabaseName; import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; import static com.mongodb.ClusterFixture.isSharded; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; +import static com.mongodb.ClusterFixture.serverVersionAtLeast; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; // See https://github.com/mongodb/specifications/blob/master/source/transactions/tests/README.md#mongos-pinning-prose-tests public class TransactionProseTest { private MongoClient client; private MongoCollection collection; - @Before + @BeforeEach public void setUp() { - assumeTrue(canRunTests()); + assumeTrue(isSharded()); + ConnectionString multiMongosConnectionString = getMultiMongosConnectionString(); + assumeTrue(multiMongosConnectionString != null); + MongoClientSettings.Builder builder = MongoClientSettings.builder() - .applyConnectionString(getMultiMongosConnectionString()); + .applyConnectionString(multiMongosConnectionString); client = MongoClients.create(MongoClientSettings.builder(builder.build()) .applyToSocketSettings(builder1 -> builder1.readTimeout(5, TimeUnit.SECONDS)) @@ -52,7 +62,7 @@ public void setUp() { collection.drop(); } - @After + @AfterEach public void tearDown() { if (collection != null) { collection.drop(); @@ -62,63 +72,64 @@ public void tearDown() { } } - // Test that starting a new transaction on a pinned ClientSession unpins the session and normal - // server selection is performed for the next operation. + @DisplayName("Mongos Pinning Prose Tests: 1. Test that starting a new transaction on a pinned ClientSession unpins the session " + + "and normal server selection is performed for the next operation.") @Test - public void testNewTransactionUnpinsSession() throws MongoException { - ClientSession session = null; - try { - collection.insertOne(Document.parse("{}")); - session = client.startSession(); + public void testNewTransactionUnpinsSession() { + collection.insertOne(Document.parse("{}")); + try (ClientSession session = client.startSession()) { session.startTransaction(); collection.insertOne(session, Document.parse("{ _id : 1 }")); session.commitTransaction(); - Set> addresses = new HashSet<>(); + Set addresses = new HashSet<>(); int iterations = 50; while (iterations-- > 0) { session.startTransaction(); - addresses.add(collection.find(session, Document.parse("{}"))); + try (MongoCursor cursor = collection.find(session, Document.parse("{}")).cursor()) { + addresses.add(cursor.getServerAddress()); + } session.commitTransaction(); } assertTrue(addresses.size() > 1); - } finally { - if (session != null) { - session.close(); - } - if (collection != null) { - collection.drop(); - } } } - // Test non-transaction operations using a pinned ClientSession unpins the session and normal server selection is performed. + @DisplayName("Mongos Pinning Prose Tests: 2. Test non-transaction operations using a pinned ClientSession unpins the session" + + " and normal server selection is performed") @Test public void testNonTransactionOpsUnpinsSession() throws MongoException { - ClientSession session = null; - try { - collection.insertOne(Document.parse("{}")); - session = client.startSession(); + collection.insertOne(Document.parse("{}")); + try (ClientSession session = client.startSession()) { session.startTransaction(); collection.insertOne(session, Document.parse("{ _id : 1 }")); + session.commitTransaction(); - Set> addresses = new HashSet<>(); + Set addresses = new HashSet<>(); int iterations = 50; while (iterations-- > 0) { - addresses.add(collection.find(session, Document.parse("{}"))); + try (MongoCursor cursor = collection.find(session, Document.parse("{}")).cursor()) { + addresses.add(cursor.getServerAddress()); + } } assertTrue(addresses.size() > 1); - } finally { - if (session != null) { - session.close(); - } - if (collection != null) { - collection.drop(); - } } } - private boolean canRunTests() { - return isSharded(); + @DisplayName("Options Inside Transaction Prose Tests. 1. Write concern not inherited from collection object inside transaction") + @Test + void testWriteConcernInheritance() { + assumeTrue(serverVersionAtLeast(4, 4)); + try (ClientSession session = client.startSession()) { + MongoCollection wcCollection = collection.withWriteConcern(new WriteConcern(0)); + + assertDoesNotThrow(() -> { + session.startTransaction(); + wcCollection.insertOne(session, new Document("n", 1)); + session.commitTransaction(); + }); + assertNotNull(collection.find(new Document("n", 1)).first()); + } } + }