Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1812,20 +1812,31 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
remoteAddress, redeliver.getConsumerId(),
redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null);
}

CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
if (redeliver.hasConsumerEpoch()) {
consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch());
boolean hasConsumerEpoch = redeliver.hasConsumerEpoch();
List<MessageIdData> messageIdsList = redeliver.getMessageIdsList();
int messageIdsCount = redeliver.getMessageIdsCount();
long consumerId = redeliver.getConsumerId();
long consumerEpoch = redeliver.getConsumerEpoch();
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture != null) {
consumerFuture.thenAccept((consumer) -> {
if (messageIdsCount > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(messageIdsList);
} else {
consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH);
if (hasConsumerEpoch) {
consumer.redeliverUnacknowledgedMessages(consumerEpoch);
} else {
consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH);
}
}
}
}).exceptionally(e -> {
// if consumerFuture completed exceptionally, don't need to process this redeliver command
// because, consumer will reconnect
log.warn("[{}] ignore this redeliverUnacknowledged request from consumer {}, consumerEpoch {}",
remoteAddress, redeliver.getConsumerId(),
redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null, e);
return null;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,20 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc
}

private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
// redeliver epoch is bigger than current consumer epoch, so don't need to handle this redeliver request
if (consumerEpoch < consumer.getConsumerEpoch()) {
log.warn("[{}-{}] Update epoch, old epoch [{}] bigger than new epoch [{}]",
name, consumer, consumer.getConsumerEpoch(), consumerEpoch);
return;
}

if (consumerEpoch > consumer.getConsumerEpoch()) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]",
name, consumer, consumer.getConsumerEpoch(), consumerEpoch);
}
consumer.setConsumerEpoch(consumerEpoch);
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]",
name, consumer, consumer.getConsumerEpoch(), consumerEpoch);
}

consumer.setConsumerEpoch(consumerEpoch);

if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
name, consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -317,6 +318,70 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
assertEquals(message.getValue(), test3);
}

@Test
public void testRedeliveryBrokerIgnoreSmallerEpoch() throws Exception{
final String topic = "testRedeliveryBrokerAbortSmallerEpoch";
final String subName = "my-sub";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

consumer.redeliverUnacknowledgedMessages();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(
"persistent://public/default/testRedeliveryBrokerAbortSmallerEpoch", false).get().get();
Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch() == 1);
consumer.setConsumerEpoch(0);
producer.send("Hello Pulsar!");

// ignore this redeliver request
consumer.redeliverUnacknowledgedMessages();
consumer.receive();
assertEquals(persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch(), 1);
}

@Test
public void testRedeliveryCommandDontCheckClientConnectionState() throws Exception{
final String topic = "testRedeliveryCommandDontCheckClientConnectionState";
final String subName = "my-sub";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

assertEquals(consumer.getState(), HandlerState.State.Ready);
consumer.setState(HandlerState.State.Connecting);
producer.send("Hello Pulsar!");
consumer.receive();
consumer.redeliverUnacknowledgedMessages();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(
"persistent://public/default/testRedeliveryCommandDontCheckClientConnectionState",
false).get().get();
Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch() == 1);

// redeliver success, consumer also can receive message again
consumer.receive();
}

@Test(dataProvider = "enableBatch")
public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception {
final String topic = "testRedeliveryAddEpochAndPermits";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1914,8 +1914,10 @@ public void redeliverUnacknowledgedMessages() {
incomingQueueLock.unlock();
}

// is channel is connected, we should send redeliver command to broker
if (cnx != null && isConnected(cnx)) {
// If a subscription command has been sent to the broker, it is necessary to allow the redelivery
// request to be sent to the broker without checking the connection state, as failing to do so would
// result in the client consumer epoch being bigger than the broker consumer epoch.
if (cnx != null) {
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise());
if (currentSize > 0) {
Expand All @@ -1925,9 +1927,6 @@ public void redeliverUnacknowledgedMessages() {
log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,
consumerName, currentSize);
}
} else {
log.warn("[{}] Send redeliver messages command but the client is reconnect or close, "
+ "so don't need to send redeliver command to broker", this);
}
}
}
Expand Down