From c403bb751f27302fd53d2872734e59027a694dec Mon Sep 17 00:00:00 2001 From: maruimin Date: Tue, 15 Jul 2025 19:00:47 +0800 Subject: [PATCH 1/5] [fix][broker]fix waitingExclusiveProducers not to call --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 0c9f8e471ed91..c1d8a3c273f2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1099,8 +1099,8 @@ protected void handleProducerRemoved(Producer producer) { // decrement usage only if this was a valid producer close USAGE_COUNT_UPDATER.decrementAndGet(this); // this conditional check is an optimization so we don't have acquire the write lock - // and execute following routine if there are no exclusive producers - if (hasExclusiveProducer) { + // and execute following routine if there are no exclusive producers or there is no producer but still got waitingExclusiveProducers to call + if (hasExclusiveProducer || (producers.size() == 0 && !waitingExclusiveProducers.isEmpty())) { lock.writeLock().lock(); try { hasExclusiveProducer = false; From 5b90f7817c86dfc0721c638ca61940cdf949acb9 Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Tue, 15 Jul 2025 20:53:00 +0800 Subject: [PATCH 2/5] [fix][broker]fix checkstyle --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index c1d8a3c273f2f..645dbe1dc4a49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1099,7 +1099,8 @@ protected void handleProducerRemoved(Producer producer) { // decrement usage only if this was a valid producer close USAGE_COUNT_UPDATER.decrementAndGet(this); // this conditional check is an optimization so we don't have acquire the write lock - // and execute following routine if there are no exclusive producers or there is no producer but still got waitingExclusiveProducers to call + // and execute following routine if there are no exclusive producers + // or there is no producer but still got waitingExclusiveProducers to call if (hasExclusiveProducer || (producers.size() == 0 && !waitingExclusiveProducers.isEmpty())) { lock.writeLock().lock(); try { From a4fefae4febbb891fa910d080953214ea2aa03d7 Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Tue, 15 Jul 2025 22:38:58 +0800 Subject: [PATCH 3/5] [fix][broker]add test --- .../broker/service/ExclusiveProducerTest.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 33a34d3fff4e6..792703df4a455 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -241,6 +241,33 @@ public void testProducerTasksCleanupWhenUsingExclusiveProducers(String type, boo Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0)); } + @Test(dataProvider = "topics") + public void testSharedProducerCloseWithWaitForExclusiveProducer(String type, boolean partitioned) throws Exception { + String topic = newTopic(type, partitioned); + Producer p1 = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .accessMode(ProducerAccessMode.Shared) + .create(); + Producer p2 = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .accessMode(ProducerAccessMode.Shared) + .create(); + CompletableFuture> p3Future = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .accessMode(ProducerAccessMode.WaitForExclusive) + .createAsync(); + // sleep 1 second to make sure p1, p2, p3Future are added to broker producers and waitingExclusiveProducers + Thread.sleep(1000L); + // now p3Future is still pending because p1,p2 are active in shared mode. + assertFalse(p3Future.isDone()); + p1.close(); + p2.close(); + // close p1,p2 and p3Future should be done and return a producer. + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(p3Future.isDone()); + }); + } + @Test(dataProvider = "topics") public void existingSharedProducer(String type, boolean partitioned) throws Exception { String topic = newTopic(type, partitioned); From 0171c860aae891ad78d22ebfa708c6de4b43867b Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Tue, 15 Jul 2025 22:43:46 +0800 Subject: [PATCH 4/5] [fix][broker]close p3 in ExclusiveProducerTest --- .../org/apache/pulsar/broker/service/ExclusiveProducerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 792703df4a455..9dc124faa428a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -266,6 +266,7 @@ public void testSharedProducerCloseWithWaitForExclusiveProducer(String type, boo Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { assertTrue(p3Future.isDone()); }); + p3Future.get().close(); } @Test(dataProvider = "topics") From cb9cfd2cee334e445480e55ffec7075b9ced2dfc Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Tue, 15 Jul 2025 23:02:43 +0800 Subject: [PATCH 5/5] [fix][broker]upgrade document and assertion --- .../org/apache/pulsar/broker/service/AbstractTopic.java | 9 +++++---- .../pulsar/broker/service/ExclusiveProducerTest.java | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 645dbe1dc4a49..5856b530bcb2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1098,10 +1098,11 @@ public void removeProducer(Producer producer) { protected void handleProducerRemoved(Producer producer) { // decrement usage only if this was a valid producer close USAGE_COUNT_UPDATER.decrementAndGet(this); - // this conditional check is an optimization so we don't have acquire the write lock - // and execute following routine if there are no exclusive producers - // or there is no producer but still got waitingExclusiveProducers to call - if (hasExclusiveProducer || (producers.size() == 0 && !waitingExclusiveProducers.isEmpty())) { + // this conditional check is an optimization so we only need to acquire the write lock + // and execute following routine when: + // 1. If there was an exclusive producer before. + // 2. If this was the last producer closed and there are waiting exclusive producers + if (hasExclusiveProducer || (producers.isEmpty() && !waitingExclusiveProducers.isEmpty())) { lock.writeLock().lock(); try { hasExclusiveProducer = false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 9dc124faa428a..9a070efa95d4b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -265,6 +265,7 @@ public void testSharedProducerCloseWithWaitForExclusiveProducer(String type, boo // close p1,p2 and p3Future should be done and return a producer. Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { assertTrue(p3Future.isDone()); + assertTrue(p3Future.get().isConnected()); }); p3Future.get().close(); }