From 66eb45bc6198e7d3b19d2868fe8fd1c88c8b6c37 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 14 Oct 2025 10:37:36 +0100 Subject: [PATCH 1/7] Implemented new prose test for ensuring collection write concern options are ignored when inside a transaction. JAVA-5684 --- .../client/TransactionProseTest.java | 106 ++++++++++++++++++ .../mongodb/client/TransactionProseTest.java | 90 ++++++++------- 2 files changed, 156 insertions(+), 40 deletions(-) create mode 100644 driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java new file mode 100644 index 00000000000..fb60c163ccd --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.reactivestreams.client; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.WriteConcern; +import org.bson.Document; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.concurrent.TimeUnit; + +import static com.mongodb.ClusterFixture.getDefaultDatabaseName; +import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; +import static com.mongodb.ClusterFixture.isSharded; +import static org.junit.jupiter.api.Assumptions.abort; +import static org.junit.jupiter.api.Assumptions.assumeFalse; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +public class TransactionProseTest { + private MongoClient client; + private MongoCollection collection; + + @BeforeEach + public void setUp() { + assumeTrue(isSharded()); + ConnectionString multiMongosConnectionString = getMultiMongosConnectionString(); + assumeTrue(multiMongosConnectionString != null); + + MongoClientSettings.Builder builder = MongoClientSettings.builder() + .applyConnectionString(multiMongosConnectionString); + + client = MongoClients.create(MongoClientSettings.builder(builder.build()) + .applyToSocketSettings(builder1 -> builder1.readTimeout(5, TimeUnit.SECONDS)) + .build()); + + collection = client.getDatabase(getDefaultDatabaseName()).getCollection(getClass().getName()); + StepVerifier.create(Mono.from(collection.drop())).verifyComplete(); + } + + @AfterEach + public void tearDown() { + if (collection != null) { + StepVerifier.create(Mono.from(collection.drop())).verifyComplete(); + } + if (client != null) { + client.close(); + } + } + + @DisplayName("Mongos Pinning Prose Tests: 1. Test that starting a new transaction on a pinned ClientSession unpins the session " + + "and normal server selection is performed for the next operation.") + @Test + void testNewTransactionUnpinsSession() { + abort("There is no ability to get the server address with the reactive api"); + } + + @DisplayName("Mongos Pinning Prose Tests: 2. Test non-transaction operations using a pinned ClientSession unpins the session" + + " and normal server selection is performed") + @Test + void testNonTransactionOpsUnpinsSession() { + abort("There is no ability to get the server address with the reactive api"); + } + + @DisplayName("Options Inside Transaction Prose Tests. 1. Write concern not inherited from collection object inside transaction") + @Test + void testWriteConcernInheritance() { + StepVerifier.create( + Mono.from(client.startSession()) + .map(session -> { + session.startTransaction(); + return session; + }) + .flatMap(session -> + Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1))) + .thenReturn(session)) + .flatMap(session -> + Mono.from(session.commitTransaction()) + .thenReturn(session)) + .flatMap(session -> + Mono.from(collection.find(new Document("n", 1)).first()) + .doOnNext(Assertions::assertNotNull) + .thenReturn(session) + ).flatMap(session -> Mono.fromRunnable(session::close)) + ).verifyComplete(); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java index 9a1426ad887..5ccaf5d14d5 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java @@ -16,12 +16,16 @@ package com.mongodb.client; +import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.MongoException; +import com.mongodb.ServerAddress; +import com.mongodb.WriteConcern; import org.bson.Document; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; import java.util.HashSet; import java.util.Set; @@ -30,19 +34,24 @@ import static com.mongodb.ClusterFixture.getDefaultDatabaseName; import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; import static com.mongodb.ClusterFixture.isSharded; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; // See https://github.com/mongodb/specifications/blob/master/source/transactions/tests/README.md#mongos-pinning-prose-tests public class TransactionProseTest { private MongoClient client; private MongoCollection collection; - @Before + @BeforeEach public void setUp() { - assumeTrue(canRunTests()); + assumeTrue(isSharded()); + ConnectionString multiMongosConnectionString = getMultiMongosConnectionString(); + assumeTrue(multiMongosConnectionString != null); + MongoClientSettings.Builder builder = MongoClientSettings.builder() - .applyConnectionString(getMultiMongosConnectionString()); + .applyConnectionString(multiMongosConnectionString); client = MongoClients.create(MongoClientSettings.builder(builder.build()) .applyToSocketSettings(builder1 -> builder1.readTimeout(5, TimeUnit.SECONDS)) @@ -52,7 +61,7 @@ public void setUp() { collection.drop(); } - @After + @AfterEach public void tearDown() { if (collection != null) { collection.drop(); @@ -62,63 +71,64 @@ public void tearDown() { } } - // Test that starting a new transaction on a pinned ClientSession unpins the session and normal - // server selection is performed for the next operation. + @DisplayName("Mongos Pinning Prose Tests: 1. Test that starting a new transaction on a pinned ClientSession unpins the session " + + "and normal server selection is performed for the next operation.") @Test - public void testNewTransactionUnpinsSession() throws MongoException { - ClientSession session = null; - try { - collection.insertOne(Document.parse("{}")); - session = client.startSession(); + public void testNewTransactionUnpinsSession() { + collection.insertOne(Document.parse("{}")); + try (ClientSession session = client.startSession()) { session.startTransaction(); collection.insertOne(session, Document.parse("{ _id : 1 }")); session.commitTransaction(); - Set> addresses = new HashSet<>(); + Set addresses = new HashSet<>(); int iterations = 50; while (iterations-- > 0) { session.startTransaction(); - addresses.add(collection.find(session, Document.parse("{}"))); + try (MongoCursor cursor = collection.find(session, Document.parse("{}")).cursor()) { + addresses.add(cursor.getServerAddress()); + } session.commitTransaction(); } assertTrue(addresses.size() > 1); - } finally { - if (session != null) { - session.close(); - } - if (collection != null) { - collection.drop(); - } } } // Test non-transaction operations using a pinned ClientSession unpins the session and normal server selection is performed. + @DisplayName("Mongos Pinning Prose Tests: 2. Test non-transaction operations using a pinned ClientSession unpins the session" + + " and normal server selection is performed") @Test public void testNonTransactionOpsUnpinsSession() throws MongoException { - ClientSession session = null; - try { - collection.insertOne(Document.parse("{}")); - session = client.startSession(); + collection.insertOne(Document.parse("{}")); + try (ClientSession session = client.startSession()) { session.startTransaction(); collection.insertOne(session, Document.parse("{ _id : 1 }")); + session.commitTransaction(); - Set> addresses = new HashSet<>(); + Set addresses = new HashSet<>(); int iterations = 50; while (iterations-- > 0) { - addresses.add(collection.find(session, Document.parse("{}"))); + try (MongoCursor cursor = collection.find(session, Document.parse("{}")).cursor()) { + addresses.add(cursor.getServerAddress()); + } } assertTrue(addresses.size() > 1); - } finally { - if (session != null) { - session.close(); - } - if (collection != null) { - collection.drop(); - } } } - private boolean canRunTests() { - return isSharded(); + @DisplayName("Options Inside Transaction Prose Tests. 1. Write concern not inherited from collection object inside transaction") + @Test + void testWriteConcernInheritance() { + try (ClientSession session = client.startSession()) { + MongoCollection wcCollection = collection.withWriteConcern(new WriteConcern(0)); + + assertDoesNotThrow(() -> { + session.startTransaction(); + wcCollection.insertOne(session, new Document("n", 1)); + session.commitTransaction(); + }); + assertNotNull(collection.find(new Document("n", 1)).first()); + } } + } From bb815e833e49642ebc140c95649c57fa33c749d2 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 14 Oct 2025 10:52:04 +0100 Subject: [PATCH 2/7] PR update - ensure session always cleaned up --- .../client/TransactionProseTest.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java index fb60c163ccd..0ad624cf455 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java @@ -18,6 +18,7 @@ import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.WriteConcern; +import com.mongodb.session.ClientSession; import org.bson.Document; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -84,23 +85,20 @@ void testNonTransactionOpsUnpinsSession() { @DisplayName("Options Inside Transaction Prose Tests. 1. Write concern not inherited from collection object inside transaction") @Test void testWriteConcernInheritance() { - StepVerifier.create( - Mono.from(client.startSession()) - .map(session -> { - session.startTransaction(); - return session; - }) - .flatMap(session -> - Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1))) - .thenReturn(session)) - .flatMap(session -> - Mono.from(session.commitTransaction()) - .thenReturn(session)) - .flatMap(session -> - Mono.from(collection.find(new Document("n", 1)).first()) - .doOnNext(Assertions::assertNotNull) - .thenReturn(session) - ).flatMap(session -> Mono.fromRunnable(session::close)) - ).verifyComplete(); + Mono testWriteConcern = Mono.from(client.startSession()) + .flatMap(clientSession -> { + clientSession.startTransaction(); + return Mono.using(() -> clientSession, + session -> Mono.fromRunnable(session::startTransaction) + .then(Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1)))) + .then(Mono.from(session.commitTransaction())) + .then(Mono.from(collection.find(new Document("n", 1)).first()) + .doOnNext(Assertions::assertNotNull) + ), + ClientSession::close + ); + }); + + StepVerifier.create(testWriteConcern).verifyComplete(); } } From eb277f94a952fa709bd609fdb63d787ebd94653f Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 14 Oct 2025 10:54:25 +0100 Subject: [PATCH 3/7] Remove comment as the display name explains the test --- .../test/functional/com/mongodb/client/TransactionProseTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java index 5ccaf5d14d5..391950a6449 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java @@ -94,7 +94,6 @@ public void testNewTransactionUnpinsSession() { } } - // Test non-transaction operations using a pinned ClientSession unpins the session and normal server selection is performed. @DisplayName("Mongos Pinning Prose Tests: 2. Test non-transaction operations using a pinned ClientSession unpins the session" + " and normal server selection is performed") @Test From b6715db12b388605d0972f2b4ad7d503b7992ba3 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 14 Oct 2025 11:09:28 +0100 Subject: [PATCH 4/7] Simplify the reactive streams logic --- .../client/TransactionProseTest.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java index 0ad624cf455..dea188fff7f 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java @@ -86,18 +86,17 @@ void testNonTransactionOpsUnpinsSession() { @Test void testWriteConcernInheritance() { Mono testWriteConcern = Mono.from(client.startSession()) - .flatMap(clientSession -> { - clientSession.startTransaction(); - return Mono.using(() -> clientSession, - session -> Mono.fromRunnable(session::startTransaction) - .then(Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1)))) - .then(Mono.from(session.commitTransaction())) - .then(Mono.from(collection.find(new Document("n", 1)).first()) - .doOnNext(Assertions::assertNotNull) - ), - ClientSession::close - ); - }); + .flatMap(clientSession -> + Mono.using(() -> clientSession, + session -> Mono.fromRunnable(session::startTransaction) + .then(Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1)))) + .then(Mono.fromCallable(() -> Mono.from(session.commitTransaction()))) + .then(Mono.from(collection.find(new Document("n", 1)).first()) + .doOnNext(Assertions::assertNotNull) + ), + ClientSession::close + ) + ); StepVerifier.create(testWriteConcern).verifyComplete(); } From 2b03fbc28eceb0acff1cbd8ff4bd0730d8d4e696 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 14 Oct 2025 11:24:05 +0100 Subject: [PATCH 5/7] Actually assert --- .../reactivestreams/client/TransactionProseTest.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java index dea188fff7f..297305c6bf1 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java @@ -34,7 +34,6 @@ import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; import static com.mongodb.ClusterFixture.isSharded; import static org.junit.jupiter.api.Assumptions.abort; -import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; public class TransactionProseTest { @@ -90,14 +89,12 @@ void testWriteConcernInheritance() { Mono.using(() -> clientSession, session -> Mono.fromRunnable(session::startTransaction) .then(Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1)))) - .then(Mono.fromCallable(() -> Mono.from(session.commitTransaction()))) - .then(Mono.from(collection.find(new Document("n", 1)).first()) - .doOnNext(Assertions::assertNotNull) - ), + .flatMap( r -> Mono.from(session.commitTransaction())) + .then(Mono.from(collection.find(new Document("n", 1)).first())), ClientSession::close ) ); - StepVerifier.create(testWriteConcern).verifyComplete(); + StepVerifier.create(testWriteConcern).assertNext(Assertions::assertNotNull).verifyComplete(); } } From b1f5e1034574fb799c3def7a3ecaa7c2ae6292f2 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 14 Oct 2025 11:51:26 +0100 Subject: [PATCH 6/7] Make ClientSession#commitTransaction and ClientSession#abortTransaction cold publishers Only run the validation when subscribe has been called. JAVA-5985 --- .../internal/ClientSessionPublisherImpl.java | 119 +++++++++--------- .../client/TransactionProseTest.java | 15 +-- 2 files changed, 68 insertions(+), 66 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 52f33ec25cc..5cf0ea103bd 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -143,69 +143,74 @@ private WriteConcern getWriteConcern(@Nullable final TimeoutContext timeoutConte @Override public Publisher commitTransaction() { - if (transactionState == TransactionState.ABORTED) { - throw new IllegalStateException("Cannot call commitTransaction after calling abortTransaction"); - } - if (transactionState == TransactionState.NONE) { - throw new IllegalStateException("There is no transaction started"); - } - if (!messageSentInCurrentTransaction) { - cleanupTransaction(TransactionState.COMMITTED); - return Mono.create(MonoSink::success); - } else { - ReadConcern readConcern = transactionOptions.getReadConcern(); - if (readConcern == null) { - throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); + return Mono.defer(() -> { + if (transactionState == TransactionState.ABORTED) { + return Mono.error(new IllegalStateException("Cannot call commitTransaction after calling abortTransaction")); } - boolean alreadyCommitted = commitInProgress || transactionState == TransactionState.COMMITTED; - commitInProgress = true; - resetTimeout(); - TimeoutContext timeoutContext = getTimeoutContext(); - WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); - return executor - .execute( - new CommitTransactionOperation(writeConcern, alreadyCommitted) - .recoveryToken(getRecoveryToken()), readConcern, this) - .doOnTerminate(() -> { - commitInProgress = false; - transactionState = TransactionState.COMMITTED; - }) - .doOnError(MongoException.class, this::clearTransactionContextOnError); - } + if (transactionState == TransactionState.NONE) { + return Mono.error(new IllegalStateException("There is no transaction started")); + } + if (!messageSentInCurrentTransaction) { + cleanupTransaction(TransactionState.COMMITTED); + return Mono.create(MonoSink::success); + } else { + ReadConcern readConcern = transactionOptions.getReadConcern(); + if (readConcern == null) { + return Mono.error(new MongoInternalException("Invariant violated. Transaction options read concern can not be null")); + } + boolean alreadyCommitted = commitInProgress || transactionState == TransactionState.COMMITTED; + commitInProgress = true; + resetTimeout(); + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); + return executor + .execute( + new CommitTransactionOperation(writeConcern, alreadyCommitted) + .recoveryToken(getRecoveryToken()), readConcern, this) + .doOnTerminate(() -> { + commitInProgress = false; + transactionState = TransactionState.COMMITTED; + }) + .doOnError(MongoException.class, this::clearTransactionContextOnError); + } + }); } + @Override public Publisher abortTransaction() { - if (transactionState == TransactionState.ABORTED) { - throw new IllegalStateException("Cannot call abortTransaction twice"); - } - if (transactionState == TransactionState.COMMITTED) { - throw new IllegalStateException("Cannot call abortTransaction after calling commitTransaction"); - } - if (transactionState == TransactionState.NONE) { - throw new IllegalStateException("There is no transaction started"); - } - if (!messageSentInCurrentTransaction) { - cleanupTransaction(TransactionState.ABORTED); - return Mono.create(MonoSink::success); - } else { - ReadConcern readConcern = transactionOptions.getReadConcern(); - if (readConcern == null) { - throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); + return Mono.defer(() -> { + if (transactionState == TransactionState.ABORTED) { + throw new IllegalStateException("Cannot call abortTransaction twice"); } - - resetTimeout(); - TimeoutContext timeoutContext = getTimeoutContext(); - WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); - return executor - .execute(new AbortTransactionOperation(writeConcern) - .recoveryToken(getRecoveryToken()), readConcern, this) - .onErrorResume(Throwable.class, (e) -> Mono.empty()) - .doOnTerminate(() -> { - clearTransactionContext(); - cleanupTransaction(TransactionState.ABORTED); - }); - } + if (transactionState == TransactionState.COMMITTED) { + throw new IllegalStateException("Cannot call abortTransaction after calling commitTransaction"); + } + if (transactionState == TransactionState.NONE) { + throw new IllegalStateException("There is no transaction started"); + } + if (!messageSentInCurrentTransaction) { + cleanupTransaction(TransactionState.ABORTED); + return Mono.create(MonoSink::success); + } else { + ReadConcern readConcern = transactionOptions.getReadConcern(); + if (readConcern == null) { + throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); + } + + resetTimeout(); + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); + return executor + .execute(new AbortTransactionOperation(writeConcern) + .recoveryToken(getRecoveryToken()), readConcern, this) + .onErrorResume(Throwable.class, (e) -> Mono.empty()) + .doOnTerminate(() -> { + clearTransactionContext(); + cleanupTransaction(TransactionState.ABORTED); + }); + } + }); } private void clearTransactionContextOnError(final MongoException e) { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java index 297305c6bf1..e2899992bb6 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java @@ -18,7 +18,6 @@ import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.WriteConcern; -import com.mongodb.session.ClientSession; import org.bson.Document; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -85,14 +84,12 @@ void testNonTransactionOpsUnpinsSession() { @Test void testWriteConcernInheritance() { Mono testWriteConcern = Mono.from(client.startSession()) - .flatMap(clientSession -> - Mono.using(() -> clientSession, - session -> Mono.fromRunnable(session::startTransaction) - .then(Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1)))) - .flatMap( r -> Mono.from(session.commitTransaction())) - .then(Mono.from(collection.find(new Document("n", 1)).first())), - ClientSession::close - ) + .flatMap(session -> + Mono.fromRunnable(session::startTransaction) + .then(Mono.from(collection.withWriteConcern(new WriteConcern(0)).insertOne(session, new Document("n", 1)))) + .then(Mono.from(session.commitTransaction())) + .then(Mono.from(collection.find(new Document("n", 1)).first())) + .doFinally(signalType -> session.close()) ); StepVerifier.create(testWriteConcern).assertNext(Assertions::assertNotNull).verifyComplete(); From 91528ec178bc505e66b3056153cd03e2f671cc4f Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 20 Oct 2025 16:00:38 +0100 Subject: [PATCH 7/7] Fix prose test requirements --- .../mongodb/reactivestreams/client/TransactionProseTest.java | 2 ++ .../functional/com/mongodb/client/TransactionProseTest.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java index e2899992bb6..7aafe4f9e22 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TransactionProseTest.java @@ -32,6 +32,7 @@ import static com.mongodb.ClusterFixture.getDefaultDatabaseName; import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; import static com.mongodb.ClusterFixture.isSharded; +import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static org.junit.jupiter.api.Assumptions.abort; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -83,6 +84,7 @@ void testNonTransactionOpsUnpinsSession() { @DisplayName("Options Inside Transaction Prose Tests. 1. Write concern not inherited from collection object inside transaction") @Test void testWriteConcernInheritance() { + assumeTrue(serverVersionAtLeast(4, 4)); Mono testWriteConcern = Mono.from(client.startSession()) .flatMap(session -> Mono.fromRunnable(session::startTransaction) diff --git a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java index 391950a6449..1f0209247fc 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/TransactionProseTest.java @@ -34,6 +34,7 @@ import static com.mongodb.ClusterFixture.getDefaultDatabaseName; import static com.mongodb.ClusterFixture.getMultiMongosConnectionString; import static com.mongodb.ClusterFixture.isSharded; +import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -118,6 +119,7 @@ public void testNonTransactionOpsUnpinsSession() throws MongoException { @DisplayName("Options Inside Transaction Prose Tests. 1. Write concern not inherited from collection object inside transaction") @Test void testWriteConcernInheritance() { + assumeTrue(serverVersionAtLeast(4, 4)); try (ClientSession session = client.startSession()) { MongoCollection wcCollection = collection.withWriteConcern(new WriteConcern(0));