Skip to content

Commit c52a4c0

Browse files
3pacccccccrossoverJie
authored andcommitted
[fix][broker] Fix exclusive producer creation when last shared producer closes (apache#24516)
Co-authored-by: crossoverJie <[email protected]> (cherry picked from commit efe9dc8) (cherry picked from commit 102fdb3)
1 parent c522e8b commit c52a4c0

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,9 +1078,11 @@ public void removeProducer(Producer producer) {
10781078
protected void handleProducerRemoved(Producer producer) {
10791079
// decrement usage only if this was a valid producer close
10801080
USAGE_COUNT_UPDATER.decrementAndGet(this);
1081-
// this conditional check is an optimization so we don't have acquire the write lock
1082-
// and execute following routine if there are no exclusive producers
1083-
if (hasExclusiveProducer) {
1081+
// this conditional check is an optimization so we only need to acquire the write lock
1082+
// and execute following routine when:
1083+
// 1. If there was an exclusive producer before.
1084+
// 2. If this was the last producer closed and there are waiting exclusive producers
1085+
if (hasExclusiveProducer || (producers.isEmpty() && !waitingExclusiveProducers.isEmpty())) {
10841086
lock.writeLock().lock();
10851087
try {
10861088
hasExclusiveProducer = false;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,35 @@ public void testProducerTasksCleanupWhenUsingExclusiveProducers(String type, boo
240240
Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0));
241241
}
242242

243+
@Test(dataProvider = "topics")
244+
public void testSharedProducerCloseWithWaitForExclusiveProducer(String type, boolean partitioned) throws Exception {
245+
String topic = newTopic(type, partitioned);
246+
Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
247+
.topic(topic)
248+
.accessMode(ProducerAccessMode.Shared)
249+
.create();
250+
Producer<String> p2 = pulsarClient.newProducer(Schema.STRING)
251+
.topic(topic)
252+
.accessMode(ProducerAccessMode.Shared)
253+
.create();
254+
CompletableFuture<Producer<String>> p3Future = pulsarClient.newProducer(Schema.STRING)
255+
.topic(topic)
256+
.accessMode(ProducerAccessMode.WaitForExclusive)
257+
.createAsync();
258+
// sleep 1 second to make sure p1, p2, p3Future are added to broker producers and waitingExclusiveProducers
259+
Thread.sleep(1000L);
260+
// now p3Future is still pending because p1,p2 are active in shared mode.
261+
assertFalse(p3Future.isDone());
262+
p1.close();
263+
p2.close();
264+
// close p1,p2 and p3Future should be done and return a producer.
265+
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
266+
assertTrue(p3Future.isDone());
267+
assertTrue(p3Future.get().isConnected());
268+
});
269+
p3Future.get().close();
270+
}
271+
243272
@Test(dataProvider = "topics")
244273
public void existingSharedProducer(String type, boolean partitioned) throws Exception {
245274
String topic = newTopic(type, partitioned);

0 commit comments

Comments
 (0)