Skip to content

Commit 06eab69

Browse files
Support batch index acknowledgment (#151)
Fixes #87 ### Modifications - Add an consumer configuration `setBatchIndexAckEnabled` to enable the batch index ACK. When it's enabled, passing the original `MessageId` instead of the `MessageIdImpl` that trunks the batch index to the ACK grouping tracker. - Since now a `BatchedMessageIdImpl` could be accepted in the ACK grouping tracker, fix the compare logic. - Support passing a `BitSet` in `Commands::newAck` and get the internal `BitSet` from `MessageId` in `Commands::newMultiMessageAck`. - Skip the acknowledged batch indexes when receiving batched messages in `ConsumerImpl::receiveIndividualMessagesFromBatch`. ### Verifications Modify `BitSetTest.testSet` to verify the `BitSet::get` method added in this commit. Add `AcknowledgeTest.testBatchIndexAck` to test batch index ACK for all types of acknowledgment: - Individual ACK for a single message - Individual ACK for a list of messages - Cumulative ACK Add `AcknowledgeTest.testMixedCumulativeAck` to test the new compare logic between `BatchedMessageIdImpl` and `MessageIdImpl` works for cumulative ACK.
1 parent 5b26626 commit 06eab69

19 files changed

+280
-31
lines changed

include/pulsar/ConsumerConfiguration.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,23 @@ class PULSAR_PUBLIC ConsumerConfiguration {
563563
*/
564564
bool isStartMessageIdInclusive() const;
565565

566+
/**
567+
* Enable the batch index acknowledgment.
568+
*
569+
* It should be noted that this option can only work when the broker side also enables the batch index
570+
* acknowledgment. See the `acknowledgmentAtBatchIndexLevelEnabled` config in `broker.conf`.
571+
*
572+
* Default: false
573+
*
574+
* @param enabled whether to enable the batch index acknowledgment
575+
*/
576+
ConsumerConfiguration& setBatchIndexAckEnabled(bool enabled);
577+
578+
/**
579+
* The associated getter of setBatchingEnabled
580+
*/
581+
bool isBatchIndexAckEnabled() const;
582+
566583
friend class PulsarWrapper;
567584

568585
private:

lib/AckGroupingTracker.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,20 @@
1919

2020
#include "AckGroupingTracker.h"
2121

22+
#include "BitSet.h"
2223
#include "ClientConnection.h"
2324
#include "Commands.h"
2425
#include "LogUtils.h"
26+
#include "MessageIdImpl.h"
2527

2628
namespace pulsar {
2729

2830
DECLARE_LOG_OBJECT();
2931

3032
inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId,
3133
CommandAck_AckType ackType) {
32-
auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), ackType, -1);
34+
const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
35+
auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1);
3336
cnx->sendCommand(cmd);
3437
LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", " << msgId.entryId() << "]");
3538
}

lib/AckGroupingTrackerEnabled.cc

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "AckGroupingTrackerEnabled.h"
2121

22+
#include <climits>
2223
#include <mutex>
2324

2425
#include "ClientConnection.h"
@@ -27,11 +28,25 @@
2728
#include "ExecutorService.h"
2829
#include "HandlerBase.h"
2930
#include "LogUtils.h"
31+
#include "MessageIdUtil.h"
3032

3133
namespace pulsar {
3234

3335
DECLARE_LOG_OBJECT();
3436

37+
// Define a customized compare logic whose difference with the default compare logic of MessageId is:
38+
// When two MessageId objects are in the same entry, if only one of them is a message in the batch, treat
39+
// it as a smaller one.
40+
static int compare(const MessageId& lhs, const MessageId& rhs) {
41+
int result = compareLedgerAndEntryId(lhs, rhs);
42+
if (result != 0) {
43+
return result;
44+
} else {
45+
return internal::compare(lhs.batchIndex() < 0 ? INT_MAX : lhs.batchIndex(),
46+
rhs.batchIndex() < 0 ? INT_MAX : rhs.batchIndex());
47+
}
48+
}
49+
3550
AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
3651
const HandlerBasePtr& handlerPtr, uint64_t consumerId,
3752
long ackGroupingTimeMs, long ackGroupingMaxSize)
@@ -58,7 +73,7 @@ bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
5873
{
5974
// Check if the message ID is already ACKed by a previous (or pending) cumulative request.
6075
std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
61-
if (msgId <= this->nextCumulativeAckMsgId_) {
76+
if (compare(msgId, this->nextCumulativeAckMsgId_) <= 0) {
6277
return true;
6378
}
6479
}
@@ -88,7 +103,7 @@ void AckGroupingTrackerEnabled::addAcknowledgeList(const MessageIdList& msgIds)
88103

89104
void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& msgId) {
90105
std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
91-
if (msgId > this->nextCumulativeAckMsgId_) {
106+
if (compare(msgId, this->nextCumulativeAckMsgId_) > 0) {
92107
this->nextCumulativeAckMsgId_ = msgId;
93108
this->requireCumulativeAck_ = true;
94109
}

lib/BatchMessageAcker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class BatchMessageAcker {
4343
return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
4444
}
4545

46+
const BitSet& getBitSet() const noexcept { return bitSet_; }
47+
4648
private:
4749
// When a batched message is acknowledged cumulatively, the previous message id will be acknowledged
4850
// without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to

lib/BatchedMessageIdImpl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class BatchedMessageIdImpl : public MessageIdImpl {
4141

4242
bool shouldAckPreviousMessageId() const { return acker_->shouldAckPreviousMessageId(); }
4343

44+
const BitSet& getBitSet() const noexcept override { return acker_->getBitSet(); }
45+
4446
MessageId getPreviousMessageId() {
4547
return MessageIdBuilder().ledgerId(ledgerId_).entryId(entryId_ - 1).partition(partition_).build();
4648
}

lib/BitSet.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class BitSet {
3939

4040
BitSet(int32_t numBits) : words_((numBits / 64) + ((numBits % 64 == 0) ? 0 : 1)) { assert(numBits > 0); }
4141

42+
BitSet(Data&& words) : words_(std::move(words)), wordsInUse_(words_.size()) {}
43+
4244
// Support range loop like:
4345
// ```c++
4446
// BitSet bitSet(129);
@@ -55,6 +57,15 @@ class BitSet {
5557
*/
5658
bool isEmpty() const noexcept { return wordsInUse_ == 0; }
5759

60+
/**
61+
* Returns the value of the bit with the specific index. The value is {@code true} if the bit with the
62+
* index {@code bitIndex} is currently set in this {@code BitSet}; otherwise, the result is {@code false}.
63+
*
64+
* @param bitIndex the bit index
65+
* @return the value of the bit with the specified index
66+
*/
67+
bool get(int32_t bitIndex) const;
68+
5869
/**
5970
* Sets the bits from the specified {@code fromIndex} (inclusive) to the
6071
* specified {@code toIndex} (exclusive) to {@code true}.
@@ -164,6 +175,12 @@ class BitSet {
164175
}
165176
};
166177

178+
inline bool BitSet::get(int32_t bitIndex) const {
179+
assert(bitIndex >= 0);
180+
auto wordIndex_ = wordIndex(bitIndex);
181+
return (wordIndex_ < wordsInUse_) && ((words_[wordIndex_] & (1L << bitIndex)) != 0);
182+
}
183+
167184
inline void BitSet::set(int32_t fromIndex, int32_t toIndex) {
168185
assert(fromIndex < toIndex && fromIndex >= 0 && toIndex >= 0);
169186
if (fromIndex == toIndex) {

lib/Commands.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "BatchMessageAcker.h"
3030
#include "BatchedMessageIdImpl.h"
31+
#include "BitSet.h"
3132
#include "ChunkMessageIdImpl.h"
3233
#include "LogUtils.h"
3334
#include "MessageImpl.h"
@@ -420,7 +421,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
420421
return writeMessageWithSize(cmd);
421422
}
422423

423-
SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
424+
SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet,
424425
CommandAck_AckType ackType, CommandAck_ValidationError validationError) {
425426
BaseCommand cmd;
426427
cmd.set_type(BaseCommand::ACK);
@@ -433,6 +434,9 @@ SharedBuffer Commands::newAck(uint64_t consumerId, int64_t ledgerId, int64_t ent
433434
auto* msgId = ack->add_message_id();
434435
msgId->set_ledgerid(ledgerId);
435436
msgId->set_entryid(entryId);
437+
for (auto x : ackSet) {
438+
msgId->add_ack_set(x);
439+
}
436440
return writeMessageWithSize(cmd);
437441
}
438442

@@ -446,6 +450,9 @@ SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const std::set<Me
446450
auto newMsgId = ack->add_message_id();
447451
newMsgId->set_ledgerid(msgId.ledgerId());
448452
newMsgId->set_entryid(msgId.entryId());
453+
for (auto x : getMessageIdImpl(msgId)->getBitSet()) {
454+
newMsgId->add_ack_set(x);
455+
}
449456
}
450457
return writeMessageWithSize(cmd);
451458
}

lib/Commands.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class BatchMessageAcker;
3939
using BatchMessageAckerPtr = std::shared_ptr<BatchMessageAcker>;
4040
class MessageIdImpl;
4141
using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
42+
class BitSet;
4243

4344
namespace proto {
4445
class BaseCommand;
@@ -112,7 +113,7 @@ class Commands {
112113
bool userProvidedProducerName, bool encrypted,
113114
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch);
114115

115-
static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
116+
static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet,
116117
CommandAck_AckType ackType, CommandAck_ValidationError validationError);
117118
static SharedBuffer newMultiMessageAck(uint64_t consumerId, const std::set<MessageId>& msgIds);
118119

lib/ConsumerConfiguration.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,4 +287,11 @@ const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const {
287287
return impl_->batchReceivePolicy;
288288
}
289289

290+
ConsumerConfiguration& ConsumerConfiguration::setBatchIndexAckEnabled(bool enabled) {
291+
impl_->batchIndexAckEnabled = enabled;
292+
return *this;
293+
}
294+
295+
bool ConsumerConfiguration::isBatchIndexAckEnabled() const { return impl_->batchIndexAckEnabled; }
296+
290297
} // namespace pulsar

lib/ConsumerConfigurationImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ struct ConsumerConfigurationImpl {
5656
bool autoAckOldestChunkedMessageOnQueueFull{false};
5757
bool startMessageIdInclusive{false};
5858
long expireTimeOfIncompleteChunkedMessageMs{60000};
59+
bool batchIndexAckEnabled{false};
5960
};
6061
} // namespace pulsar
6162
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */

0 commit comments

Comments
 (0)