-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
I am doing some testing on Shared subscription and Batch messages with the current Pulsar master.
The behaviour that I am observing is that when you have Batch messages the Consumer is sending flow control messages for more messages that it can handle.
This is how to reproduce the problem:
- write 100.000 messages using batching
- start a Consumer with a Shared subscription (from the beginning of the topic)
- you will see that the PersistentDispatcherMultipleConsumers
consumerFlowtrigger the read of many messages
This is happening because consumerFlow calls readMoreEntries() and readMoreEntries() sees that there are messages to be re-delivered, because the consumer still haven't acknowledged them.
This is turn requests the ManagedCursor to read the data from storage.
I have observed this behaviour while working on offloader performances, but it also happens with regular BK based ledgers.
This simple test case reproduces the problem, I append it to this test
pulsar/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
Line 66 in 1ba180c
| public class CreateSubscriptionTest extends ProducerConsumerBase { |
@Test
public void testConsumerFlowOnSharedSubscription() throws Exception {
String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
String subName = "my-sub";
int numMessages = 20_000;
final CountDownLatch count = new CountDownLatch(numMessages);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Shared)
.topic(topic)
.subscriptionName(subName)
.messageListener(new MessageListener<byte[]>() {
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
//log.info("received {} - {}", msg, count.getCount());
consumer.acknowledgeAsync(msg);
count.countDown();
}
})
.subscribe();
Producer<byte[]> producer = pulsarClient
.newProducer()
.blockIfQueueFull(true)
.enableBatching(true)
.topic(topic)
.create()) {
consumer.pause();
byte[] message = "foo".getBytes(StandardCharsets.UTF_8);
List<CompletableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
futures.add(producer.sendAsync(message).whenComplete( (id,e) -> {
if (e != null) {
log.error("error", e);
}
}));
if (futures.size() == 1000) {
FutureUtil.waitForAll(futures).get();
futures.clear();
}
}
producer.flush();
consumer.resume();
assertTrue(count.await(20, TimeUnit.SECONDS));
}
}