-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Open
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
- Pulsar version: 3.0.x
Issue Description
When a consumer is connected and actively receiving messages, invoking Subscription.expireMessages(int expireTimeInSeconds)
should not interfere with its message delivery pipeline. This method is intended to expire messages for disconnected consumers, not to remove data visible to a live consumer.
However, the expiring messages feature causes expired messages to become inaccessible to the connected consumer, making them appear lost, even though they are still present in the topic.
Error messages
Reproducing the issue
You can add and run this test in the org.apache.pulsar.client.api.SimpleProducerConsumerTest
.
@Test
public void testConsumeWhenMessageExpired() throws Exception {
String topicName = "persistent://public/default/testConsumeWhenExpired";
String subName = "sub";
int receiverQueueSize = 1;
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(receiverQueueSize).subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
List<MessageId> messageIds = new ArrayList<>();
messageIds.add(producer.send(("my-message-0-ok").getBytes()));
messageIds.add(producer.send(("my-message-1-ok").getBytes()));
Thread.sleep(3 * 1000);
messageIds.add(producer.send(("my-message-2-ok").getBytes()));
messageIds.add(producer.send(("my-message-3-ok").getBytes()));
Thread.sleep(3 * 1000);
// Reset current position to 5 seconds ago.
consumer.seek(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(5));
messageIds.add(producer.send(("my-message-4-expired").getBytes()));
messageIds.add(producer.send(("my-message-5-expired").getBytes()));
// Sleep 5 seconds to wait the messages to be expired.
Thread.sleep(5 * 1000);
messageIds.add(producer.send(("my-message-6-ok").getBytes()));
Optional<Topic> topicReference = pulsar.getBrokerService().getTopicReference(topicName);
assertThat(topicReference).isPresent();
// Expire messages.
topicReference.ifPresent(topic -> {
Subscription subscription = topic.getSubscription(subName);
subscription.expireMessages(5);
});
List<String> receivedMessages = new ArrayList<>();
while (true) {
Message<byte[]> receive = consumer.receive(2, TimeUnit.SECONDS);
if (receive == null) {
break;
}
String msg = new String(receive.getData(), StandardCharsets.UTF_8);
receivedMessages.add(msg);
}
List<String> expectedMessages = Lists.newArrayList("my-message-2-ok", "my-message-3-ok", "my-message-4-expired", "my-message-5-expired", "my-message-6-ok");
assertThat(receivedMessages)
.isEqualTo(expectedMessages);
// Debug
System.out.println("All messageIds:");
messageIds.forEach(System.out::println);
}
Additional information
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug