Skip to content

Commit 4200a62

Browse files
committed
[improve][client] Introduce PulsarApiMessageId to access fields of MessageIdData
### Motivation Currently the `MessageId` interface hiddens all fields of the `MessageIdData` struct defined in `PulsarApi.proto`. It's usually enough for application users because they don't need to access the fields. But for client developers and developers of other Pulsar ecosystems (e.g. the built-in Kafka connector and the Flink connector in another repo), the `MessageId` interface is too simple and there is no common used abstraction. We can see many code usages like: ```java if (msgId instanceof BatchMessageIdImpl) { // Do type cast and then access fields like ledger id... } else if (msgId instanceof MessageIdImpl) { // Do type cast and then access fields like ledger id... // NOTE: don't put this else if before the previous one because // BatchMessageIdImpl is also a MessageIdImpl } // ... ``` These `MessageId` implementations are used directly. It's a very bad design because any change to the public APIs of these implementations could bring breaking changes. Also, there is a `TopicMessageIdImpl` that each time a `getInnerMessageId()` method must be used to get the underlying `MessageId` object, then do the type assertion and cast again. It makes code unnecessarily complicated. ### Modifications Introduce the `PulsarApiMessageId` interface into the `pulsar-common` module. All `MessageId` implementations so far (except `MultiMessageId`) should extend this interface so we can do the following conversion safely in client code or other modules: ```java long ledgerId = ((PulsarApiMessageId) msgId).getLedgerId(); ``` Regarding the `ack_set` field, use a `BitSet` instead of the `BatchMessageAcker` to record if a message in the batch is acknowledged. Since the `TopicMessageId` is just a proxy of other `MessageId` implementations, it's stored as key or value in the map directly because the `compareTo`/`equal`/`hashCode` methods have the same semantics with the underlying `MessageId`. There is no need to cast the type and call `getInnerMessageId`. Remove all other usages and mark the public methods as deprecated to avoid breaking changes. They could be removed in the next major release.
1 parent 9917aac commit 4200a62

34 files changed

+560
-469
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@
4444
import org.apache.pulsar.client.api.Producer;
4545
import org.apache.pulsar.client.api.PulsarClientException;
4646
import org.apache.pulsar.client.api.SubscriptionType;
47-
import org.apache.pulsar.client.impl.MessageIdImpl;
48-
import org.apache.pulsar.client.impl.TopicMessageImpl;
47+
import org.apache.pulsar.client.api.PulsarApiMessageId;
4948
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
5049
import org.apache.pulsar.common.naming.TopicName;
5150
import org.apache.pulsar.common.util.FutureUtil;
@@ -337,8 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
337336
}
338337
totalMessages++;
339338
consumer1.acknowledge(msg);
340-
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
341-
receivedPtns.add(msgId.getPartitionIndex());
339+
receivedPtns.add(((PulsarApiMessageId) msg.getMessageId()).getPartition());
342340
}
343341

344342
assertTrue(Sets.difference(listener1.activePtns, receivedPtns).isEmpty());
@@ -354,8 +352,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
354352
}
355353
totalMessages++;
356354
consumer2.acknowledge(msg);
357-
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
358-
receivedPtns.add(msgId.getPartitionIndex());
355+
receivedPtns.add(((PulsarApiMessageId) msg.getMessageId()).getPartition());
359356
}
360357
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
361358
assertTrue(Sets.difference(listener2.activePtns, receivedPtns).isEmpty());

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.pulsar.client.api.SubscriptionType;
5151
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
5252
import org.apache.pulsar.client.impl.MessageIdImpl;
53-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
5453
import org.apache.pulsar.common.naming.TopicName;
5554
import org.apache.pulsar.common.util.RelativeTimeUtil;
5655
import org.awaitility.Awaitility;
@@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception {
679678
if (message == null) {
680679
break;
681680
}
682-
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
683-
received.add(topicMessageId.getInnerMessageId());
681+
received.add(message.getMessageId());
684682
}
685683
int msgNumFromPartition1 = list.size() / 2;
686684
int msgNumFromPartition2 = 1;

pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@
3636
import java.util.concurrent.TimeUnit;
3737
import java.util.concurrent.atomic.AtomicInteger;
3838
import java.util.concurrent.atomic.AtomicReference;
39+
3940
import org.apache.pulsar.client.impl.MessageIdImpl;
4041
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
4142
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
42-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
4343
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
4444
import org.apache.pulsar.common.naming.TopicName;
4545
import org.awaitility.Awaitility;
@@ -768,7 +768,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {
768768

769769
for (int i = 0; i < totalMessages; i ++) {
770770
msg = consumer1.receive(5, TimeUnit.SECONDS);
771-
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
771+
Assert.assertEquals(((PulsarApiMessageId) msg.getMessageId()).getPartition(), 2);
772772
consumer1.acknowledge(msg);
773773
}
774774

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import lombok.AllArgsConstructor;
4040
import lombok.Cleanup;
4141
import lombok.extern.slf4j.Slf4j;
42+
import org.apache.pulsar.client.api.PulsarApiMessageId;
4243
import org.apache.pulsar.client.api.Consumer;
4344
import org.apache.pulsar.client.api.ConsumerInterceptor;
4445
import org.apache.pulsar.client.api.Message;
@@ -176,10 +177,10 @@ private AckTestData prepareDataForAck(String topic) throws PulsarClientException
176177
messageIds.add(message.getMessageId());
177178
}
178179
MessageId firstEntryMessageId = messageIds.get(0);
179-
MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl();
180+
MessageId secondEntryMessageId = MessageIdImpl.from((PulsarApiMessageId) messageIds.get(1));
180181
// Verify messages 2 to N must be in the same entry
181182
for (int i = 2; i < messageIds.size(); i++) {
182-
assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
183+
assertEquals(MessageIdImpl.from((PulsarApiMessageId) messageIds.get(i)), secondEntryMessageId);
183184
}
184185

185186
assertTrue(interceptor.individualAckedMessageIdList.isEmpty());

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
118118
Message<byte[]> message = consumer.receive();
119119
assertEquals(new String(message.getData()), messagePrefix + i);
120120
MessageId messageId = message.getMessageId();
121-
if (topicType == TopicType.PARTITIONED) {
122-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
123-
}
124121
assertTrue(messageIds.remove(messageId), "Failed to receive message");
125122
}
126123
log.info("Remaining message IDs = {}", messageIds);
@@ -166,9 +163,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls
166163

167164
for (int i = 0; i < numberOfMessages; i++) {
168165
MessageId messageId = consumer.receive().getMessageId();
169-
if (topicType == TopicType.PARTITIONED) {
170-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
171-
}
172166
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
173167
}
174168
log.info("Remaining message IDs = {}", messageIds);

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import lombok.Cleanup;
3131
import lombok.extern.slf4j.Slf4j;
3232
import org.apache.pulsar.broker.BrokerTestUtil;
33+
import org.apache.pulsar.client.api.PulsarApiMessageId;
3334
import org.apache.pulsar.client.api.Consumer;
3435
import org.apache.pulsar.client.api.Message;
35-
import org.apache.pulsar.client.api.MessageId;
3636
import org.apache.pulsar.client.api.Producer;
3737
import org.apache.pulsar.client.api.ProducerConsumerBase;
3838
import org.apache.pulsar.client.api.Schema;
@@ -291,7 +291,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
291291
.negativeAckRedeliveryDelay(100, TimeUnit.SECONDS)
292292
.subscribe();
293293

294-
MessageId messageId = new MessageIdImpl(3, 1, 0);
294+
PulsarApiMessageId messageId = new MessageIdImpl(3, 1, 0);
295295
TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1", "topic-1", messageId);
296296
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0);
297297
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1);

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@
3838
import org.apache.pulsar.client.admin.Topics;
3939
import org.apache.pulsar.client.api.Message;
4040
import org.apache.pulsar.client.api.MessageId;
41+
import org.apache.pulsar.client.api.PulsarApiMessageId;
4142
import org.apache.pulsar.client.cli.NoSplitter;
42-
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
43-
import org.apache.pulsar.client.impl.MessageIdImpl;
4443
import org.apache.pulsar.common.util.RelativeTimeUtil;
4544

4645
@Parameters(commandDescription = "Operations on persistent topics. The persistent-topics "
@@ -611,12 +610,11 @@ void run() throws PulsarAdminException {
611610
if (++position != 1) {
612611
System.out.println("-------------------------------------------------------------------------\n");
613612
}
614-
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
615-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
613+
PulsarApiMessageId msgId = (PulsarApiMessageId) msg.getMessageId();
614+
if (msgId.isBatch()) {
616615
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
617616
+ msgId.getBatchIndex());
618617
} else {
619-
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
620618
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
621619
}
622620
if (msg.getProperties().size() > 0) {

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.pulsar.client.admin.Topics;
5656
import org.apache.pulsar.client.api.Message;
5757
import org.apache.pulsar.client.api.MessageId;
58+
import org.apache.pulsar.client.api.PulsarApiMessageId;
5859
import org.apache.pulsar.client.api.SubscriptionType;
5960
import org.apache.pulsar.client.cli.NoSplitter;
6061
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -1192,12 +1193,11 @@ void run() throws PulsarAdminException {
11921193
if (++position != 1) {
11931194
System.out.println("-------------------------------------------------------------------------\n");
11941195
}
1196+
PulsarApiMessageId msgId = (PulsarApiMessageId) msg.getMessageId();
11951197
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1196-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
11971198
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
11981199
+ msgId.getBatchIndex());
11991200
} else {
1200-
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
12011201
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
12021202
}
12031203

@@ -1251,12 +1251,11 @@ void run() throws PulsarAdminException {
12511251
MessageImpl message =
12521252
(MessageImpl) getTopics().examineMessage(persistentTopic, initialPosition, messagePosition);
12531253

1254-
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1255-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
1254+
PulsarApiMessageId msgId = (PulsarApiMessageId) message.getMessageId();
1255+
if (msgId.isBatch()) {
12561256
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
12571257
+ msgId.getBatchIndex());
12581258
} else {
1259-
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
12601259
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
12611260
}
12621261

@@ -1310,12 +1309,11 @@ void run() throws PulsarAdminException {
13101309
System.out.println("Cannot find any messages based on ledgerId:"
13111310
+ ledgerId + " entryId:" + entryId);
13121311
} else {
1313-
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1314-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
1312+
PulsarApiMessageId msgId = (PulsarApiMessageId) message.getMessageId();
1313+
if (msgId.isBatch()) {
13151314
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
13161315
+ msgId.getBatchIndex());
13171316
} else {
1318-
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
13191317
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
13201318
}
13211319

pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.CompletableFuture;
2424
import org.apache.pulsar.client.api.MessageId;
25+
import org.apache.pulsar.client.api.PulsarApiMessageId;
2526
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
2627

2728
/**
@@ -31,7 +32,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
3132

3233
boolean isDuplicate(MessageId messageId);
3334

34-
CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
35+
CompletableFuture<Void> addAcknowledgment(PulsarApiMessageId msgId, AckType ackType,
36+
Map<String, Long> properties);
3537

3638
CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, AckType ackType,
3739
Map<String, Long> properties);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.BitSet;
2222

23+
@Deprecated
2324
public class BatchMessageAcker {
2425

2526
private BatchMessageAcker() {

0 commit comments

Comments
 (0)