Skip to content

Commit efe9dc8

Browse files
[fix][broker] Fix exclusive producer creation when last shared producer closes (#24516)
Co-authored-by: crossoverJie <[email protected]>
1 parent 39ac65a commit efe9dc8

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,9 +1098,11 @@ public void removeProducer(Producer producer) {
10981098
protected void handleProducerRemoved(Producer producer) {
10991099
// decrement usage only if this was a valid producer close
11001100
USAGE_COUNT_UPDATER.decrementAndGet(this);
1101-
// this conditional check is an optimization so we don't have acquire the write lock
1102-
// and execute following routine if there are no exclusive producers
1103-
if (hasExclusiveProducer) {
1101+
// this conditional check is an optimization so we only need to acquire the write lock
1102+
// and execute following routine when:
1103+
// 1. If there was an exclusive producer before.
1104+
// 2. If this was the last producer closed and there are waiting exclusive producers
1105+
if (hasExclusiveProducer || (producers.isEmpty() && !waitingExclusiveProducers.isEmpty())) {
11041106
lock.writeLock().lock();
11051107
try {
11061108
hasExclusiveProducer = false;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,35 @@ public void testProducerTasksCleanupWhenUsingExclusiveProducers(String type, boo
241241
Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0));
242242
}
243243

244+
@Test(dataProvider = "topics")
245+
public void testSharedProducerCloseWithWaitForExclusiveProducer(String type, boolean partitioned) throws Exception {
246+
String topic = newTopic(type, partitioned);
247+
Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
248+
.topic(topic)
249+
.accessMode(ProducerAccessMode.Shared)
250+
.create();
251+
Producer<String> p2 = pulsarClient.newProducer(Schema.STRING)
252+
.topic(topic)
253+
.accessMode(ProducerAccessMode.Shared)
254+
.create();
255+
CompletableFuture<Producer<String>> p3Future = pulsarClient.newProducer(Schema.STRING)
256+
.topic(topic)
257+
.accessMode(ProducerAccessMode.WaitForExclusive)
258+
.createAsync();
259+
// sleep 1 second to make sure p1, p2, p3Future are added to broker producers and waitingExclusiveProducers
260+
Thread.sleep(1000L);
261+
// now p3Future is still pending because p1,p2 are active in shared mode.
262+
assertFalse(p3Future.isDone());
263+
p1.close();
264+
p2.close();
265+
// close p1,p2 and p3Future should be done and return a producer.
266+
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
267+
assertTrue(p3Future.isDone());
268+
assertTrue(p3Future.get().isConnected());
269+
});
270+
p3Future.get().close();
271+
}
272+
244273
@Test(dataProvider = "topics")
245274
public void existingSharedProducer(String type, boolean partitioned) throws Exception {
246275
String topic = newTopic(type, partitioned);

0 commit comments

Comments
 (0)