Skip to content

Commit 4029416

Browse files
committed
### Motivation
Currently the `MessageId` interface hiddens all fields of the `MessageIdData` struct defined in `PulsarApi.proto`. It's usually enough for application users because they don't need to access the fields. But for client developers and developers of other Pulsar ecosystems (e.g. the built-in Kafka connector and the Flink connector in another repo), the `MessageId` interface is too simple and there is no common used abstraction. We can see many code usages like: ```java if (msgId instanceof BatchMessageIdImpl) { // Do type cast and then access fields like ledger id... } else if (msgId instanceof MessageIdImpl) { // Do type cast and then access fields like ledger id... // NOTE: don't put this else if before the previous one because // BatchMessageIdImpl is also a MessageIdImpl } // ... ``` These `MessageId` implementations are used directly. It's a very bad design because any change to the public APIs of these implementations could bring breaking changes. Also, there is a `TopicMessageIdImpl` that each time a `getInnerMessageId()` method must be used to get the underlying `MessageId` object, then do the type assertion and cast again. It makes code unnecessarily complicated. ### Modifications Introduce the `PulsarApiMessageId` interface into the `pulsar-common` module. All `MessageId` implementations so far (except `MultiMessageId`) should extend this interface so we can do the following conversion safely in client code or other modules: ```java long ledgerId = ((PulsarApiMessageId) msgId).getLedgerId(); ``` Regarding the `ack_set` field, use a `BitSet` instead of the `BatchMessageAcker` to record if a message in the batch is acknowledged. Since the `TopicMessageId` is just a proxy of other `MessageId` implementations, it's stored as key or value in the map directly because the `compareTo`/`equal`/`hashCode` methods have the same semantics with the underlying `MessageId`. There is no need to cast the type and call `getInnerMessageId`. Remove all other usages and mark the public methods as deprecated to avoid breaking changes. They could be removed in the next major release. Add a `CustomMessageIdTest` to verify any valid `MessageId` implementation works for `seek` and `acknowledge` APIs.
1 parent 01e7eac commit 4029416

35 files changed

+691
-432
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@
4444
import org.apache.pulsar.client.api.Producer;
4545
import org.apache.pulsar.client.api.PulsarClientException;
4646
import org.apache.pulsar.client.api.SubscriptionType;
47-
import org.apache.pulsar.client.impl.MessageIdImpl;
48-
import org.apache.pulsar.client.impl.TopicMessageImpl;
47+
import org.apache.pulsar.client.api.PulsarApiMessageId;
4948
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
5049
import org.apache.pulsar.common.naming.TopicName;
5150
import org.apache.pulsar.common.util.FutureUtil;
@@ -337,8 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
337336
}
338337
totalMessages++;
339338
consumer1.acknowledge(msg);
340-
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
341-
receivedPtns.add(msgId.getPartitionIndex());
339+
receivedPtns.add(((PulsarApiMessageId) msg.getMessageId()).getPartition());
342340
}
343341

344342
assertTrue(Sets.difference(listener1.activePtns, receivedPtns).isEmpty());
@@ -354,8 +352,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
354352
}
355353
totalMessages++;
356354
consumer2.acknowledge(msg);
357-
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
358-
receivedPtns.add(msgId.getPartitionIndex());
355+
receivedPtns.add(((PulsarApiMessageId) msg.getMessageId()).getPartition());
359356
}
360357
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
361358
assertTrue(Sets.difference(listener2.activePtns, receivedPtns).isEmpty());

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.pulsar.client.api.SubscriptionType;
5151
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
5252
import org.apache.pulsar.client.impl.MessageIdImpl;
53-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
5453
import org.apache.pulsar.common.naming.TopicName;
5554
import org.apache.pulsar.common.util.RelativeTimeUtil;
5655
import org.awaitility.Awaitility;
@@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception {
679678
if (message == null) {
680679
break;
681680
}
682-
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
683-
received.add(topicMessageId.getInnerMessageId());
681+
received.add(message.getMessageId());
684682
}
685683
int msgNumFromPartition1 = list.size() / 2;
686684
int msgNumFromPartition2 = 1;
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertNotNull;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.TimeUnit;
26+
import lombok.AllArgsConstructor;
27+
import lombok.Cleanup;
28+
import org.testng.annotations.AfterClass;
29+
import org.testng.annotations.BeforeClass;
30+
import org.testng.annotations.Test;
31+
32+
@Test(groups = "broker-api")
33+
public class CustomMessageIdTest extends ProducerConsumerBase {
34+
35+
@BeforeClass
36+
@Override
37+
protected void setup() throws Exception {
38+
super.internalSetup();
39+
super.producerBaseSetup();
40+
41+
}
42+
43+
@AfterClass(alwaysRun = true)
44+
@Override
45+
protected void cleanup() throws Exception {
46+
super.internalCleanup();
47+
}
48+
49+
@Test(timeOut = 30000)
50+
public void testSeek() throws Exception {
51+
final String topic = "persistent://my-property/my-ns/test-seek-" + System.currentTimeMillis();
52+
final var msgIdList = produceMessages(topic);
53+
@Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING)
54+
.topic(topic)
55+
.subscriptionName("sub")
56+
.subscribe();
57+
final int ackIndex = msgIdList.size() / 2 + 1;
58+
consumer.seek(msgIdList.get(ackIndex));
59+
final var msg = consumer.receive(3, TimeUnit.SECONDS);
60+
assertNotNull(msg);
61+
assertEquals(msg.getValue(), "msg-" + (ackIndex + 1));
62+
}
63+
64+
@Test(timeOut = 30000)
65+
public void testAck() throws Exception {
66+
final String topic = "persistent://my-property/my-ns/test-ack-" + System.currentTimeMillis();
67+
@Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING)
68+
.topic(topic)
69+
.subscriptionName("sub")
70+
.isAckReceiptEnabled(true)
71+
.subscribe();
72+
produceMessages(topic);
73+
final int ackIndex = 3;
74+
NonBatchedMessageId messageIdToAck = null;
75+
for (int i = 0; i < 10; i++) {
76+
var msg = consumer.receive();
77+
var msgId = (PulsarApiMessageId) msg.getMessageId();
78+
if (i == ackIndex) {
79+
messageIdToAck = new NonBatchedMessageId(msgId.getLedgerId(), msgId.getEntryId());
80+
}
81+
}
82+
assertNotNull(messageIdToAck);
83+
consumer.acknowledgeCumulative(messageIdToAck);
84+
consumer.redeliverUnacknowledgedMessages();
85+
var msg = consumer.receive(3, TimeUnit.SECONDS);
86+
assertNotNull(msg);
87+
assertEquals(msg.getValue(), "msg-" + (ackIndex + 1));
88+
}
89+
90+
private List<NonBatchedMessageId> produceMessages(String topic) throws PulsarClientException {
91+
@Cleanup final var producer = pulsarClient.newProducer(Schema.STRING)
92+
.topic(topic)
93+
.enableBatching(false)
94+
.create();
95+
final var msgIdList = new ArrayList<NonBatchedMessageId>();
96+
for (int i = 0; i < 10; i++) {
97+
final var msgId = (PulsarApiMessageId) producer.send("msg-" + i);
98+
msgIdList.add(new NonBatchedMessageId(msgId.getLedgerId(), msgId.getEntryId()));
99+
}
100+
return msgIdList;
101+
}
102+
103+
@AllArgsConstructor
104+
private static class NonBatchedMessageId implements PulsarApiMessageId {
105+
// For non-batched message id in a single topic, only ledger id and entry id are required
106+
107+
private final long ledgerId;
108+
private final long entryId;
109+
110+
@Override
111+
public byte[] toByteArray() {
112+
return new byte[0]; // dummy implementation
113+
}
114+
115+
@Override
116+
public long getLedgerId() {
117+
return ledgerId;
118+
}
119+
120+
@Override
121+
public long getEntryId() {
122+
return entryId;
123+
}
124+
}
125+
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.pulsar.client.impl.MessageIdImpl;
4040
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
4141
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
42-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
4342
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
4443
import org.apache.pulsar.common.naming.TopicName;
4544
import org.awaitility.Awaitility;
@@ -768,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {
768767

769768
for (int i = 0; i < totalMessages; i ++) {
770769
msg = consumer1.receive(5, TimeUnit.SECONDS);
771-
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
770+
Assert.assertEquals(((PulsarApiMessageId) msg.getMessageId()).getPartition(), 2);
772771
consumer1.acknowledge(msg);
773772
}
774773

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import lombok.AllArgsConstructor;
4040
import lombok.Cleanup;
4141
import lombok.extern.slf4j.Slf4j;
42+
import org.apache.pulsar.client.api.PulsarApiMessageId;
4243
import org.apache.pulsar.client.api.Consumer;
4344
import org.apache.pulsar.client.api.ConsumerInterceptor;
4445
import org.apache.pulsar.client.api.Message;
@@ -176,10 +177,10 @@ private AckTestData prepareDataForAck(String topic) throws PulsarClientException
176177
messageIds.add(message.getMessageId());
177178
}
178179
MessageId firstEntryMessageId = messageIds.get(0);
179-
MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl();
180+
MessageId secondEntryMessageId = MessageIdImpl.from((PulsarApiMessageId) messageIds.get(1));
180181
// Verify messages 2 to N must be in the same entry
181182
for (int i = 2; i < messageIds.size(); i++) {
182-
assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
183+
assertEquals(MessageIdImpl.from((PulsarApiMessageId) messageIds.get(i)), secondEntryMessageId);
183184
}
184185

185186
assertTrue(interceptor.individualAckedMessageIdList.isEmpty());

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
118118
Message<byte[]> message = consumer.receive();
119119
assertEquals(new String(message.getData()), messagePrefix + i);
120120
MessageId messageId = message.getMessageId();
121-
if (topicType == TopicType.PARTITIONED) {
122-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
123-
}
124121
assertTrue(messageIds.remove(messageId), "Failed to receive message");
125122
}
126123
log.info("Remaining message IDs = {}", messageIds);
@@ -166,9 +163,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls
166163

167164
for (int i = 0; i < numberOfMessages; i++) {
168165
MessageId messageId = consumer.receive().getMessageId();
169-
if (topicType == TopicType.PARTITIONED) {
170-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
171-
}
172166
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
173167
}
174168
log.info("Remaining message IDs = {}", messageIds);

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import lombok.Cleanup;
3131
import lombok.extern.slf4j.Slf4j;
3232
import org.apache.pulsar.broker.BrokerTestUtil;
33+
import org.apache.pulsar.client.api.PulsarApiMessageId;
3334
import org.apache.pulsar.client.api.Consumer;
3435
import org.apache.pulsar.client.api.Message;
35-
import org.apache.pulsar.client.api.MessageId;
3636
import org.apache.pulsar.client.api.Producer;
3737
import org.apache.pulsar.client.api.ProducerConsumerBase;
3838
import org.apache.pulsar.client.api.Schema;
@@ -291,7 +291,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
291291
.negativeAckRedeliveryDelay(100, TimeUnit.SECONDS)
292292
.subscribe();
293293

294-
MessageId messageId = new MessageIdImpl(3, 1, 0);
294+
PulsarApiMessageId messageId = new MessageIdImpl(3, 1, 0);
295295
TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1", "topic-1", messageId);
296296
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0);
297297
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1);

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@
3838
import org.apache.pulsar.client.admin.Topics;
3939
import org.apache.pulsar.client.api.Message;
4040
import org.apache.pulsar.client.api.MessageId;
41+
import org.apache.pulsar.client.api.PulsarApiMessageId;
4142
import org.apache.pulsar.client.cli.NoSplitter;
42-
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
43-
import org.apache.pulsar.client.impl.MessageIdImpl;
4443
import org.apache.pulsar.common.util.RelativeTimeUtil;
4544

4645
@Parameters(commandDescription = "Operations on persistent topics. The persistent-topics "
@@ -611,12 +610,11 @@ void run() throws PulsarAdminException {
611610
if (++position != 1) {
612611
System.out.println("-------------------------------------------------------------------------\n");
613612
}
614-
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
615-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
613+
PulsarApiMessageId msgId = (PulsarApiMessageId) msg.getMessageId();
614+
if (msgId.isBatch()) {
616615
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
617616
+ msgId.getBatchIndex());
618617
} else {
619-
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
620618
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
621619
}
622620
if (msg.getProperties().size() > 0) {

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.pulsar.client.admin.Topics;
5656
import org.apache.pulsar.client.api.Message;
5757
import org.apache.pulsar.client.api.MessageId;
58+
import org.apache.pulsar.client.api.PulsarApiMessageId;
5859
import org.apache.pulsar.client.api.SubscriptionType;
5960
import org.apache.pulsar.client.cli.NoSplitter;
6061
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -1192,12 +1193,11 @@ void run() throws PulsarAdminException {
11921193
if (++position != 1) {
11931194
System.out.println("-------------------------------------------------------------------------\n");
11941195
}
1196+
PulsarApiMessageId msgId = (PulsarApiMessageId) msg.getMessageId();
11951197
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1196-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
11971198
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
11981199
+ msgId.getBatchIndex());
11991200
} else {
1200-
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
12011201
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
12021202
}
12031203

@@ -1251,12 +1251,11 @@ void run() throws PulsarAdminException {
12511251
MessageImpl message =
12521252
(MessageImpl) getTopics().examineMessage(persistentTopic, initialPosition, messagePosition);
12531253

1254-
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1255-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
1254+
PulsarApiMessageId msgId = (PulsarApiMessageId) message.getMessageId();
1255+
if (msgId.isBatch()) {
12561256
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
12571257
+ msgId.getBatchIndex());
12581258
} else {
1259-
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
12601259
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
12611260
}
12621261

@@ -1310,12 +1309,11 @@ void run() throws PulsarAdminException {
13101309
System.out.println("Cannot find any messages based on ledgerId:"
13111310
+ ledgerId + " entryId:" + entryId);
13121311
} else {
1313-
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1314-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
1312+
PulsarApiMessageId msgId = (PulsarApiMessageId) message.getMessageId();
1313+
if (msgId.isBatch()) {
13151314
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
13161315
+ msgId.getBatchIndex());
13171316
} else {
1318-
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
13191317
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
13201318
}
13211319

pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.CompletableFuture;
2424
import org.apache.pulsar.client.api.MessageId;
25+
import org.apache.pulsar.client.api.PulsarApiMessageId;
2526
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
2627

2728
/**
@@ -31,7 +32,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
3132

3233
boolean isDuplicate(MessageId messageId);
3334

34-
CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
35+
CompletableFuture<Void> addAcknowledgment(PulsarApiMessageId msgId, AckType ackType,
36+
Map<String, Long> properties);
3537

3638
CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, AckType ackType,
3739
Map<String, Long> properties);

0 commit comments

Comments
 (0)