Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,11 @@ public void removeProducer(Producer producer) {
protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
USAGE_COUNT_UPDATER.decrementAndGet(this);
// this conditional check is an optimization so we don't have acquire the write lock
// and execute following routine if there are no exclusive producers
if (hasExclusiveProducer) {
// this conditional check is an optimization so we only need to acquire the write lock
// and execute following routine when:
// 1. If there was an exclusive producer before.
// 2. If this was the last producer closed and there are waiting exclusive producers
if (hasExclusiveProducer || (producers.isEmpty() && !waitingExclusiveProducers.isEmpty())) {
lock.writeLock().lock();
try {
hasExclusiveProducer = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,35 @@ public void testProducerTasksCleanupWhenUsingExclusiveProducers(String type, boo
Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0));
}

@Test(dataProvider = "topics")
public void testSharedProducerCloseWithWaitForExclusiveProducer(String type, boolean partitioned) throws Exception {
String topic = newTopic(type, partitioned);
Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.accessMode(ProducerAccessMode.Shared)
.create();
Producer<String> p2 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.accessMode(ProducerAccessMode.Shared)
.create();
CompletableFuture<Producer<String>> p3Future = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.accessMode(ProducerAccessMode.WaitForExclusive)
.createAsync();
// sleep 1 second to make sure p1, p2, p3Future are added to broker producers and waitingExclusiveProducers
Thread.sleep(1000L);
// now p3Future is still pending because p1,p2 are active in shared mode.
assertFalse(p3Future.isDone());
p1.close();
p2.close();
// close p1,p2 and p3Future should be done and return a producer.
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(p3Future.isDone());
assertTrue(p3Future.get().isConnected());
});
p3Future.get().close();
}

@Test(dataProvider = "topics")
public void existingSharedProducer(String type, boolean partitioned) throws Exception {
String topic = newTopic(type, partitioned);
Expand Down
Loading