Skip to content

Commit f860566

Browse files
committed
Fix broken main branch by providing correct getBitSet implementation
### Motivation Currently the main branch is broken by the concurrent merge of #153 and #151. It's because when a batched message id is constructed from deserialization, there is no `getBitSet` implementation of the internal acker. ### Modifications Add a `bool` parameter to `MessageIdImpl::getBitSet` to indicate whether the message ID is batched. The logic is similar with https://github.com/apache/pulsar/blob/299bd70fdfa023768e94a8ee4347d39337b6cbd4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L325-L327 and https://github.com/apache/pulsar/blob/299bd70fdfa023768e94a8ee4347d39337b6cbd4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L345-L347 Add a `testMessageIdFromBuild` to test the acknowledgment for a message ID without an acker could succeed for a consumer that enables batch index ACK. ### TODO In future, apache/pulsar#19031 might be migrated into the C++ client to fix the consumer that disables batch index ACK.
1 parent 06eab69 commit f860566

File tree

7 files changed

+94
-9
lines changed

7 files changed

+94
-9
lines changed

lib/AckGroupingTracker.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ DECLARE_LOG_OBJECT();
3131

3232
inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId,
3333
CommandAck_AckType ackType) {
34-
const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
34+
const auto& bitSet =
35+
Commands::getMessageIdImpl(msgId)->getBitSet(ackType == CommandAck_AckType_Individual);
3536
auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1);
3637
cnx->sendCommand(cmd);
3738
LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", " << msgId.entryId() << "]");

lib/BatchMessageAcker.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,16 @@ class BatchMessageAcker {
3737
// by deserializing from raw bytes.
3838
virtual bool ackIndividual(int32_t) { return false; }
3939
virtual bool ackCumulative(int32_t) { return false; }
40+
virtual const BitSet& getBitSet() noexcept {
41+
static BitSet emptyBitSet;
42+
return emptyBitSet;
43+
}
4044

4145
bool shouldAckPreviousMessageId() noexcept {
4246
bool expectedValue = false;
4347
return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
4448
}
4549

46-
const BitSet& getBitSet() const noexcept { return bitSet_; }
47-
4850
private:
4951
// When a batched message is acknowledged cumulatively, the previous message id will be acknowledged
5052
// without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to
@@ -80,6 +82,8 @@ class BatchMessageAckerImpl : public BatchMessageAcker {
8082
return bitSet_.isEmpty();
8183
}
8284

85+
const BitSet& getBitSet() const noexcept { return bitSet_; }
86+
8387
private:
8488
BitSet bitSet_;
8589
mutable std::mutex mutex_;

lib/BatchedMessageIdImpl.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,21 @@ class BatchedMessageIdImpl : public MessageIdImpl {
4141

4242
bool shouldAckPreviousMessageId() const { return acker_->shouldAckPreviousMessageId(); }
4343

44-
const BitSet& getBitSet() const noexcept override { return acker_->getBitSet(); }
44+
const BitSet& getBitSet(bool individual) const noexcept override {
45+
const auto& bitSet = acker_->getBitSet();
46+
if (bitSet.isEmpty()) {
47+
thread_local BitSet threadLocalBitSet;
48+
threadLocalBitSet = BitSet{batchSize_};
49+
threadLocalBitSet.set(0, batchSize_);
50+
if (individual) {
51+
threadLocalBitSet.clear(batchIndex_);
52+
} else {
53+
threadLocalBitSet.clear(0, batchIndex_ + 1);
54+
}
55+
return threadLocalBitSet;
56+
}
57+
return bitSet;
58+
}
4559

4660
MessageId getPreviousMessageId() {
4761
return MessageIdBuilder().ledgerId(ledgerId_).entryId(entryId_ - 1).partition(partition_).build();

lib/Commands.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const std::set<Me
450450
auto newMsgId = ack->add_message_id();
451451
newMsgId->set_ledgerid(msgId.ledgerId());
452452
newMsgId->set_entryid(msgId.entryId());
453-
for (auto x : getMessageIdImpl(msgId)->getBitSet()) {
453+
for (auto x : getMessageIdImpl(msgId)->getBitSet(true)) {
454454
newMsgId->add_ack_set(x);
455455
}
456456
}

lib/MessageIdImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class MessageIdImpl {
4646
const std::string& getTopicName() { return *topicName_; }
4747
void setTopicName(const std::string& topicName) { topicName_ = &topicName; }
4848

49-
virtual const BitSet& getBitSet() const noexcept {
49+
virtual const BitSet& getBitSet(bool individual) const noexcept {
5050
static const BitSet emptyBitSet;
5151
return emptyBitSet;
5252
}

tests/AcknowledgeTest.cc

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
#include <gtest/gtest.h>
2020
#include <pulsar/Client.h>
21+
#include <pulsar/MessageIdBuilder.h>
2122

2223
#include <chrono>
2324
#include <set>
@@ -302,4 +303,55 @@ TEST_F(AcknowledgeTest, testMixedCumulativeAck) {
302303
ASSERT_EQ(ResultTimeout, consumer.getConsumer().receive(msg, 1000));
303304
}
304305

306+
TEST_F(AcknowledgeTest, testMessageIdFromBuild) {
307+
Client client(lookupUrl);
308+
const std::string topic = "test-message-id-from-build-" + unique_str();
309+
310+
ConsumerWrapper consumer0;
311+
consumer0.initialize(client, topic, "sub-0", false);
312+
313+
ConsumerWrapper consumer1;
314+
consumer1.initialize(client, topic, "sub-1", true /* batch index ACK enabled */);
315+
316+
Producer producer;
317+
auto producerConf =
318+
ProducerConfiguration().setBatchingMaxMessages(100).setBatchingMaxPublishDelayMs(3600 * 1000);
319+
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
320+
321+
constexpr int numMessages = 5;
322+
for (int i = 0; i < numMessages; i++) {
323+
producer.sendAsync(MessageBuilder().setContent("msg").build(), nullptr);
324+
}
325+
producer.flush();
326+
327+
std::vector<MessageId> msgIds;
328+
consumer0.receiveAtMost(numMessages);
329+
for (auto&& msgId : consumer0.messageIdList()) {
330+
msgIds.emplace_back(MessageIdBuilder()
331+
.ledgerId(msgId.ledgerId())
332+
.entryId(msgId.entryId())
333+
.batchIndex(msgId.batchIndex())
334+
.batchSize(msgId.batchSize())
335+
.build());
336+
}
337+
338+
consumer0.acknowledgeMessageIdAndRestart(msgIds[0], AckType::INDIVIDUAL);
339+
consumer1.acknowledgeMessageIdAndRestart(msgIds[0], AckType::INDIVIDUAL);
340+
341+
Message msg;
342+
ASSERT_EQ(ResultOk, consumer0.getConsumer().receive(msg, 1000));
343+
EXPECT_EQ(msg.getMessageId().batchIndex(), 0);
344+
ASSERT_EQ(ResultOk, consumer1.getConsumer().receive(msg, 1000));
345+
EXPECT_EQ(msg.getMessageId().batchIndex(), 1);
346+
347+
consumer0.acknowledgeMessageIdAndRestart(msgIds[3], AckType::CUMULATIVE);
348+
consumer1.acknowledgeMessageIdAndRestart(msgIds[3], AckType::CUMULATIVE);
349+
350+
ASSERT_EQ(ResultOk, consumer0.getConsumer().receive(msg, 1000));
351+
EXPECT_EQ(msg.getMessageId().batchIndex(), 0);
352+
ASSERT_EQ(ResultOk, consumer1.getConsumer().receive(msg, 1000));
353+
EXPECT_EQ(msg.getMessageId().batchIndex(), 3 + 1);
354+
client.close();
355+
}
356+
305357
INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));

tests/ConsumerWrapper.h

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class ConsumerWrapper {
5454
// Enable the stats for cumulative ack
5555
conf_.setUnAckedMessagesTimeoutMs(10000);
5656
conf_.setBatchIndexAckEnabled(enableBatchIndexAck);
57+
conf_.setAckGroupingTimeMs(0); // send ACK immediately
5758
ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_));
5859
}
5960

@@ -95,9 +96,16 @@ class ConsumerWrapper {
9596
// the acknowledgment by restarting the consumer.
9697
void acknowledgeAndRestart(const std::vector<size_t>& indexes, AckType ackType) {
9798
acknowledge(indexes, ackType);
98-
messageIdList_.clear();
99-
consumer_.close();
100-
ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_));
99+
restart();
100+
}
101+
102+
void acknowledgeMessageIdAndRestart(MessageId msgId, AckType ackType) {
103+
if (ackType == AckType::CUMULATIVE) {
104+
consumer_.acknowledgeCumulative(msgId);
105+
} else {
106+
consumer_.acknowledge(msgId);
107+
}
108+
restart();
101109
}
102110

103111
Consumer& getConsumer() noexcept { return consumer_; }
@@ -109,6 +117,12 @@ class ConsumerWrapper {
109117
ConsumerConfiguration conf_;
110118
Consumer consumer_;
111119
std::vector<MessageId> messageIdList_;
120+
121+
void restart() {
122+
messageIdList_.clear();
123+
consumer_.close();
124+
ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_));
125+
}
112126
};
113127

114128
} // namespace pulsar

0 commit comments

Comments
 (0)