From 0e3673fbf48ef1c9c4cf223808f207b323fa7f2d Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 15 Jul 2025 12:55:48 -0700 Subject: [PATCH 1/6] Add prose test to verify server monitors do not gossip cluster time. JAVA-5546 --- .../event/TestServerMonitorListener.java | 12 ++- .../client/AbstractSessionsProseTest.java | 80 +++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java b/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java index b009b5094f0..27651c316ea 100644 --- a/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java +++ b/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java @@ -53,6 +53,16 @@ public TestServerMonitorListener(final Iterable listenableEventTypes) { events = new ArrayList<>(); } + public void reset() { + lock.lock(); + try { + events.clear(); + condition.signalAll(); + } finally { + lock.unlock(); + } + } + public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { register(event); } @@ -65,7 +75,7 @@ public void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { register(event); } - public void waitForEvents(final Class type, final Predicate matcher, final int count, final Duration duration) + public void waitForEvents(final Class type, final Predicate matcher, final long count, final Duration duration) throws InterruptedException, TimeoutException { assertTrue(listenable(type)); long remainingNanos = duration.toNanos(); diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java index 4d4dbead8e6..43b985c1406 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java @@ -16,6 +16,7 @@ package com.mongodb.client; +import com.mongodb.ClusterFixture; import com.mongodb.MongoClientException; import com.mongodb.MongoClientSettings; import com.mongodb.MongoCommandException; @@ -25,6 +26,10 @@ import com.mongodb.client.model.Updates; import com.mongodb.event.CommandListener; import com.mongodb.event.CommandStartedEvent; +import com.mongodb.event.ServerHeartbeatStartedEvent; +import com.mongodb.event.ServerHeartbeatSucceededEvent; +import com.mongodb.event.TestServerMonitorListener; +import com.mongodb.internal.connection.TestCommandListener; import org.bson.BsonDocument; import org.bson.Document; import org.junit.jupiter.api.AfterAll; @@ -33,22 +38,28 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import static com.mongodb.ClusterFixture.getDefaultDatabaseName; +import static com.mongodb.ClusterFixture.isStandalone; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.bson.assertions.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; // Prose tests for Sessions specification: https://github.com/mongodb/specifications/tree/master/source/sessions // Prose test README: https://github.com/mongodb/specifications/tree/master/source/sessions/tests/README.md @@ -194,6 +205,60 @@ public void shouldThrowOnExplicitSessionIfConnectionDoesNotSupportSessions() thr } } + // Test 20 from #20-drivers-do-not-gossip-clustertime-on-sdam-commands + @Test + public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedException, TimeoutException { + assumeTrue(!isStandalone()); + + TestServerMonitorListener serverMonitorListener = + new TestServerMonitorListener(asList("serverHeartbeatStartedEvent", "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent")); + + TestCommandListener commandListener = new TestCommandListener(); + + try (MongoClient client1 = getMongoClient( + getDirectPrimaryMongoClientSettingsBuilder() + .addCommandListener(commandListener) + .applyToServerSettings(builder -> builder + .heartbeatFrequency(10, MILLISECONDS) + .addServerMonitorListener(serverMonitorListener)) + .build()); + MongoClient client2 = getMongoClient(getDirectPrimaryMongoClientSettingsBuilder() + .build())) { + + Document clusterTime = executePing(client1) + .get("$clusterTime", Document.class); + + client2.getDatabase("test") + .getCollection("test") + .insertOne(new Document("advance", "$clusterTime")); + + serverMonitorListener.reset(); + serverMonitorListener.waitForEvents(ServerHeartbeatStartedEvent.class, serverHeartbeatSucceededEvent -> true, + 1, Duration.ofMillis(20 + ClusterFixture.getPrimaryRTT())); + serverMonitorListener.waitForEvents(ServerHeartbeatSucceededEvent.class, serverHeartbeatSucceededEvent -> true, + 1, Duration.ofMillis(20 + ClusterFixture.getPrimaryRTT())); + + List commandStartedIsMasterEvents = commandListener.getCommandStartedEvents("isMaster"); + List commandStartedHelloEvents = commandListener.getCommandStartedEvents("hello"); + assertFalse(containClusterTime(commandStartedIsMasterEvents, commandStartedHelloEvents)); + + commandListener.reset(); + executePing(client1); + + List pingStartedEvents = commandListener.getCommandStartedEvents("ping"); + assertEquals(1, pingStartedEvents.size()); + BsonDocument sendClusterTime = pingStartedEvents.get(0).getCommand().getDocument("$clusterTime"); + + assertEquals(clusterTime.toBsonDocument(), sendClusterTime, "Cluster time should not have advanced after the first ping"); + } + } + + private static MongoClientSettings.Builder getDirectPrimaryMongoClientSettingsBuilder() { + return MongoClientSettings.builder() + .applyToClusterSettings(ClusterFixture::setDirectConnection); + } + private static MongoClientSettings.Builder getMongocryptdMongoClientSettingsBuilder() { return MongoClientSettings.builder() .applyToClusterSettings(builder -> @@ -209,5 +274,20 @@ private static Process startMongocryptdProcess() throws IOException { processBuilder.redirectOutput(new File("/tmp/mongocryptd.log")); return processBuilder.start(); } + + private static Document executePing(final MongoClient client1) { + return client1.getDatabase("admin") + .runCommand(new Document("ping", 1)); + } + + + private static boolean containClusterTime(final List commandStartedIsMasterEvents, + final List commandStartedHelloEvents) { + return Stream.concat( + commandStartedIsMasterEvents.stream(), + commandStartedHelloEvents.stream() + ).map(CommandStartedEvent::getCommand). + anyMatch(bsonDocument -> bsonDocument.containsKey("$clusterTime")); + } } From 2b1ed169c53dc74b6012cd0474420369dbdb101c Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 15 Jul 2025 13:01:52 -0700 Subject: [PATCH 2/6] Remove unnecessary assertions. --- .../com/mongodb/client/AbstractSessionsProseTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java index 43b985c1406..820ccd84785 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java @@ -234,15 +234,11 @@ public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedExcep .insertOne(new Document("advance", "$clusterTime")); serverMonitorListener.reset(); - serverMonitorListener.waitForEvents(ServerHeartbeatStartedEvent.class, serverHeartbeatSucceededEvent -> true, + serverMonitorListener.waitForEvents(ServerHeartbeatStartedEvent.class, serverHeartbeatStartedEvent -> true, 1, Duration.ofMillis(20 + ClusterFixture.getPrimaryRTT())); serverMonitorListener.waitForEvents(ServerHeartbeatSucceededEvent.class, serverHeartbeatSucceededEvent -> true, 1, Duration.ofMillis(20 + ClusterFixture.getPrimaryRTT())); - List commandStartedIsMasterEvents = commandListener.getCommandStartedEvents("isMaster"); - List commandStartedHelloEvents = commandListener.getCommandStartedEvents("hello"); - assertFalse(containClusterTime(commandStartedIsMasterEvents, commandStartedHelloEvents)); - commandListener.reset(); executePing(client1); From dc29619f4512ecc9a7c5a59bc262c7f2c105c8eb Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 15 Jul 2025 13:05:13 -0700 Subject: [PATCH 3/6] Add comments for clarity. --- .../com/mongodb/client/AbstractSessionsProseTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java index 820ccd84785..d5d121337f1 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java @@ -205,7 +205,10 @@ public void shouldThrowOnExplicitSessionIfConnectionDoesNotSupportSessions() thr } } - // Test 20 from #20-drivers-do-not-gossip-clustertime-on-sdam-commands + /* Test 20 from #20-drivers-do-not-gossip-clustertime-on-sdam-commands + In this test, we check that the cluster time has not been advanced on client1 through the server monitors, after client2 advanced + the cluster time on the deployment/cluster. + */ @Test public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedException, TimeoutException { assumeTrue(!isStandalone()); @@ -233,6 +236,7 @@ public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedExcep .getCollection("test") .insertOne(new Document("advance", "$clusterTime")); + // wait until the client1 processes the next pair of SDAM heartbeat started + succeeded events. serverMonitorListener.reset(); serverMonitorListener.waitForEvents(ServerHeartbeatStartedEvent.class, serverHeartbeatStartedEvent -> true, 1, Duration.ofMillis(20 + ClusterFixture.getPrimaryRTT())); @@ -244,9 +248,9 @@ public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedExcep List pingStartedEvents = commandListener.getCommandStartedEvents("ping"); assertEquals(1, pingStartedEvents.size()); - BsonDocument sendClusterTime = pingStartedEvents.get(0).getCommand().getDocument("$clusterTime"); + BsonDocument sentClusterTime = pingStartedEvents.get(0).getCommand().getDocument("$clusterTime"); - assertEquals(clusterTime.toBsonDocument(), sendClusterTime, "Cluster time should not have advanced after the first ping"); + assertEquals(clusterTime.toBsonDocument(), sentClusterTime, "Cluster time should not have advanced after the first ping"); } } From 523aed2c7209c16d93a5375b12cdd5631ae9f3df Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 15 Jul 2025 13:05:40 -0700 Subject: [PATCH 4/6] Remove method. --- .../com/mongodb/client/AbstractSessionsProseTest.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java index d5d121337f1..014028c61cc 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java @@ -279,15 +279,5 @@ private static Document executePing(final MongoClient client1) { return client1.getDatabase("admin") .runCommand(new Document("ping", 1)); } - - - private static boolean containClusterTime(final List commandStartedIsMasterEvents, - final List commandStartedHelloEvents) { - return Stream.concat( - commandStartedIsMasterEvents.stream(), - commandStartedHelloEvents.stream() - ).map(CommandStartedEvent::getCommand). - anyMatch(bsonDocument -> bsonDocument.containsKey("$clusterTime")); - } } From ad714293c64fe200a077c366e7f08903f3d00c98 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 15 Jul 2025 13:08:27 -0700 Subject: [PATCH 5/6] Add comments. --- .../com/mongodb/client/AbstractSessionsProseTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java index 014028c61cc..d586fbb09a3 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java @@ -213,12 +213,11 @@ public void shouldThrowOnExplicitSessionIfConnectionDoesNotSupportSessions() thr public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedException, TimeoutException { assumeTrue(!isStandalone()); + //given TestServerMonitorListener serverMonitorListener = new TestServerMonitorListener(asList("serverHeartbeatStartedEvent", "serverHeartbeatSucceededEvent", "serverHeartbeatFailedEvent")); - TestCommandListener commandListener = new TestCommandListener(); - try (MongoClient client1 = getMongoClient( getDirectPrimaryMongoClientSettingsBuilder() .addCommandListener(commandListener) @@ -232,6 +231,7 @@ public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedExcep Document clusterTime = executePing(client1) .get("$clusterTime", Document.class); + //when client2.getDatabase("test") .getCollection("test") .insertOne(new Document("advance", "$clusterTime")); @@ -246,6 +246,7 @@ public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedExcep commandListener.reset(); executePing(client1); + //then List pingStartedEvents = commandListener.getCommandStartedEvents("ping"); assertEquals(1, pingStartedEvents.size()); BsonDocument sentClusterTime = pingStartedEvents.get(0).getCommand().getDocument("$clusterTime"); From acd143a627127a969e9863cfbd8c960d3b412bb7 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 15 Jul 2025 14:34:11 -0700 Subject: [PATCH 6/6] Fix static check issues. --- .../com/mongodb/client/AbstractSessionsProseTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java index d586fbb09a3..3682bd64ff0 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractSessionsProseTest.java @@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Stream; import static com.mongodb.ClusterFixture.getDefaultDatabaseName; import static com.mongodb.ClusterFixture.isStandalone; @@ -256,7 +255,7 @@ public void shouldNotGossipClusterTimeInServerMonitors() throws InterruptedExcep } private static MongoClientSettings.Builder getDirectPrimaryMongoClientSettingsBuilder() { - return MongoClientSettings.builder() + return getMongoClientSettingsBuilder() .applyToClusterSettings(ClusterFixture::setDirectConnection); }