Skip to content

Commit 42a6969

Browse files
authored
[fix][broker] Return if AbstractDispatcherSingleActiveConsumer closed (apache#19934)
1 parent 067e3c0 commit 42a6969

File tree

7 files changed

+62
-4
lines changed

7 files changed

+62
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
161161
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
162162
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer);
163163
consumer.disconnect();
164+
return;
164165
}
165166

166167
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
4747
protected final Subscription subscription;
4848

4949
private CompletableFuture<Void> closeFuture = null;
50-
private final String name;
50+
protected final String name;
5151
protected final Rate msgDrop;
5252
protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
5353
TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.pulsar.common.api.proto.KeySharedMeta;
4040
import org.apache.pulsar.common.api.proto.KeySharedMode;
4141
import org.apache.pulsar.common.protocol.Commands;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
4244

4345
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
4446

@@ -84,6 +86,11 @@ public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topi
8486

8587
@Override
8688
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
89+
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
90+
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
91+
consumer.disconnect();
92+
return;
93+
}
8794
super.addConsumer(consumer);
8895
try {
8996
selector.addConsumer(consumer);
@@ -168,4 +175,6 @@ public KeySharedMode getKeySharedMode() {
168175
public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
169176
return (ksm.getKeySharedMode() == this.keySharedMode);
170177
}
178+
179+
private static final Logger log = LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class);
171180
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.service.persistent;
2020

2121
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
22+
import com.google.common.annotations.VisibleForTesting;
2223
import io.netty.util.concurrent.FastThreadLocal;
2324
import java.util.ArrayList;
2425
import java.util.Collections;
@@ -100,8 +101,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
100101
}
101102
}
102103

104+
@VisibleForTesting
105+
public StickyKeyConsumerSelector getSelector() {
106+
return selector;
107+
}
108+
103109
@Override
104110
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
111+
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
112+
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
113+
consumer.disconnect();
114+
return;
115+
}
105116
super.addConsumer(consumer);
106117
try {
107118
selector.addConsumer(consumer);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,20 @@ private void verifyActiveConsumerChange(CommandActiveConsumerChange change,
233233
assertEquals(isActive, change.isIsActive());
234234
}
235235

236+
@Test(timeOut = 10000)
237+
public void testAddConsumerWhenClosed() throws Exception {
238+
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, pulsarTestContext.getBrokerService());
239+
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
240+
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
241+
SubType.Failover, 0, topic, sub);
242+
pdfc.close().get();
243+
244+
Consumer consumer = mock(Consumer.class);
245+
pdfc.addConsumer(consumer);
246+
verify(consumer, times(1)).disconnect();
247+
assertEquals(0, pdfc.consumers.size());
248+
}
249+
236250
@Test
237251
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
238252
PersistentTopic topic =

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.mockito.Mockito.verify;
3232
import static org.mockito.Mockito.when;
3333
import static org.testng.Assert.assertEquals;
34+
import static org.testng.Assert.assertTrue;
3435
import static org.testng.Assert.fail;
3536
import io.netty.buffer.ByteBuf;
3637
import io.netty.buffer.Unpooled;
@@ -47,6 +48,7 @@
4748
import org.apache.pulsar.broker.service.EntryBatchSizes;
4849
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
4950
import org.apache.pulsar.broker.service.RedeliveryTracker;
51+
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
5052
import org.apache.pulsar.common.api.proto.MessageMetadata;
5153
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
5254
import org.apache.pulsar.common.protocol.Commands;
@@ -62,6 +64,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
6264
private ServiceConfiguration configMock;
6365

6466
private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher;
67+
private StickyKeyConsumerSelector selector;
6568

6669
final String topicName = "non-persistent://public/default/testTopic";
6770

@@ -88,10 +91,19 @@ public void setup() throws Exception {
8891
doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
8992

9093
subscriptionMock = mock(NonPersistentSubscription.class);
91-
94+
selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
9295
nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
93-
topicMock, subscriptionMock,
94-
new HashRangeAutoSplitStickyKeyConsumerSelector());
96+
topicMock, subscriptionMock, selector);
97+
}
98+
99+
@Test(timeOut = 10000)
100+
public void testAddConsumerWhenClosed() throws Exception {
101+
nonpersistentDispatcher.close().get();
102+
Consumer consumer = mock(Consumer.class);
103+
nonpersistentDispatcher.addConsumer(consumer);
104+
verify(consumer, times(1)).disconnect();
105+
assertEquals(0, nonpersistentDispatcher.getConsumers().size());
106+
assertTrue(selector.getConsumerKeyHashRanges().isEmpty());
95107
}
96108

97109
@Test(timeOut = 10000)

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import static org.mockito.Mockito.verify;
3636
import static org.mockito.Mockito.when;
3737
import static org.testng.Assert.assertEquals;
38+
import static org.testng.Assert.assertTrue;
3839
import static org.testng.Assert.fail;
3940
import io.netty.buffer.ByteBuf;
4041
import io.netty.buffer.Unpooled;
@@ -159,6 +160,16 @@ public void cleanup() {
159160
}
160161
}
161162

163+
@Test(timeOut = 10000)
164+
public void testAddConsumerWhenClosed() throws Exception {
165+
persistentDispatcher.close().get();
166+
Consumer consumer = mock(Consumer.class);
167+
persistentDispatcher.addConsumer(consumer);
168+
verify(consumer, times(1)).disconnect();
169+
assertEquals(0, persistentDispatcher.getConsumers().size());
170+
assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
171+
}
172+
162173
@Test
163174
public void testSendMarkerMessage() {
164175
try {

0 commit comments

Comments
 (0)