6262import java .util .concurrent .locks .ReentrantReadWriteLock ;
6363import java .util .function .Function ;
6464import java .util .stream .Collectors ;
65+ import javax .annotation .Nullable ;
6566import lombok .AccessLevel ;
6667import lombok .Getter ;
6768import org .apache .commons .lang3 .StringUtils ;
69+ import org .apache .commons .lang3 .tuple .Pair ;
6870import org .apache .pulsar .client .api .Consumer ;
6971import org .apache .pulsar .client .api .ConsumerCryptoFailureAction ;
7072import org .apache .pulsar .client .api .DeadLetterPolicy ;
@@ -204,6 +206,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
204206
205207 private final AtomicReference <ClientCnx > clientCnxUsedForConsumerRegistration = new AtomicReference <>();
206208 private final List <Throwable > previousExceptions = new CopyOnWriteArrayList <Throwable >();
209+ // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
210+ private final Map <Pair <Long , Long >, BatchMessageAcker > batchMessageToAcker = new ConcurrentHashMap <>();
211+
207212 static <T > ConsumerImpl <T > newConsumerImpl (PulsarClientImpl client ,
208213 String topic ,
209214 ConsumerConfigurationData <T > conf ,
@@ -529,6 +534,51 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
529534 return result ;
530535 }
531536
537+ private void processMessageIdBeforeAcknowledge (MessageIdImpl messageId , AckType ackType , int numMessages ) {
538+ if (ackType == AckType .Individual ) {
539+ stats .incrementNumAcksSent (numMessages );
540+ unAckedMessageTracker .remove (messageId );
541+ if (possibleSendToDeadLetterTopicMessages != null ) {
542+ possibleSendToDeadLetterTopicMessages .remove (messageId );
543+ }
544+ } else {
545+ stats .incrementNumAcksSent (unAckedMessageTracker .removeMessagesTill (messageId ));
546+ }
547+ }
548+
549+ private @ Nullable MessageIdImpl getMessageIdToAcknowledge (BatchMessageIdImpl messageId , AckType ackType ) {
550+ final BatchMessageAcker acker ;
551+ if (messageId .getAcker () instanceof BatchMessageAckerDisabled ) {
552+ acker = batchMessageToAcker .computeIfAbsent (
553+ Pair .of (messageId .getLedgerId (), messageId .getEntryId ()),
554+ __ -> BatchMessageAcker .newAcker (messageId .getOriginalBatchSize ()));
555+ } else {
556+ acker = messageId .getAcker ();
557+ }
558+ if (ackType == AckType .Individual ) {
559+ if (acker .ackIndividual (messageId .getBatchIndex ())) {
560+ batchMessageToAcker .remove (Pair .of (messageId .getLedgerId (), messageId .getEntryId ()));
561+ return messageId .toMessageIdImpl ();
562+ } else {
563+ return conf .isBatchIndexAckEnabled () ? messageId : null ;
564+ }
565+ } else {
566+ if (acker .ackCumulative (messageId .getBatchIndex ())) {
567+ batchMessageToAcker .remove (Pair .of (messageId .getLedgerId (), messageId .getEntryId ()));
568+ return messageId .toMessageIdImpl ();
569+ } else if (conf .isBatchIndexAckEnabled ()) {
570+ return messageId ;
571+ } else {
572+ if (acker .isPrevBatchCumulativelyAcked ()) {
573+ return null ;
574+ } else {
575+ acker .setPrevBatchCumulativelyAcked (true );
576+ return messageId .prevBatchMessageId ();
577+ }
578+ }
579+ }
580+ }
581+
532582 @ Override
533583 protected CompletableFuture <Void > doAcknowledge (MessageId messageId , AckType ackType ,
534584 Map <String , Long > properties ,
@@ -549,13 +599,34 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
549599 return doTransactionAcknowledgeForResponse (messageId , ackType , null , properties ,
550600 new TxnID (txn .getTxnIdMostBits (), txn .getTxnIdLeastBits ()));
551601 }
552- return acknowledgmentsGroupingTracker .addAcknowledgment ((MessageIdImpl ) messageId , ackType , properties );
602+ if (ackType == AckType .Individual ) {
603+ onAcknowledge (messageId , null );
604+ } else {
605+ onAcknowledgeCumulative (messageId , null );
606+ }
607+ if (messageId instanceof BatchMessageIdImpl ) {
608+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl ) messageId ;
609+ MessageIdImpl messageIdImpl = getMessageIdToAcknowledge (batchMessageId , ackType );
610+ if (messageIdImpl == null ) {
611+ return CompletableFuture .completedFuture (null );
612+ } else if (messageIdImpl instanceof BatchMessageIdImpl ) {
613+ return acknowledgmentsGroupingTracker .addBatchIndexAck (
614+ (BatchMessageIdImpl ) messageIdImpl , ackType , properties );
615+ } else {
616+ processMessageIdBeforeAcknowledge (messageIdImpl , ackType , batchMessageId .getOriginalBatchSize ());
617+ return acknowledgmentsGroupingTracker .addAcknowledgment (messageIdImpl , ackType , properties );
618+ }
619+ } else {
620+ MessageIdImpl messageIdImpl = (MessageIdImpl ) messageId ;
621+ processMessageIdBeforeAcknowledge (messageIdImpl , ackType , 1 );
622+ return acknowledgmentsGroupingTracker .addAcknowledgment (messageIdImpl , ackType , properties );
623+ }
553624 }
554625
555626 @ Override
556627 protected CompletableFuture <Void > doAcknowledge (List <MessageId > messageIdList , AckType ackType ,
557628 Map <String , Long > properties , TransactionImpl txn ) {
558-
629+ List < MessageIdImpl > messageIdListToAck = new ArrayList <>();
559630 for (MessageId messageId : messageIdList ) {
560631 checkArgument (messageId instanceof MessageIdImpl );
561632 }
@@ -573,7 +644,26 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
573644 return doTransactionAcknowledgeForResponse (messageIdList , ackType , null ,
574645 properties , new TxnID (txn .getTxnIdMostBits (), txn .getTxnIdLeastBits ()));
575646 } else {
576- return this .acknowledgmentsGroupingTracker .addListAcknowledgment (messageIdList , ackType , properties );
647+ for (MessageId messageId : messageIdList ) {
648+ checkArgument (messageId instanceof MessageIdImpl );
649+ onAcknowledge (messageId , null );
650+ if (messageId instanceof BatchMessageIdImpl ) {
651+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl ) messageId ;
652+ MessageIdImpl messageIdImpl = getMessageIdToAcknowledge (batchMessageId , ackType );
653+ if (messageIdImpl != null ) {
654+ if (!(messageIdImpl instanceof BatchMessageIdImpl )) {
655+ processMessageIdBeforeAcknowledge (messageIdImpl , ackType ,
656+ batchMessageId .getOriginalBatchSize ());
657+ } // else: batch index ACK
658+ messageIdListToAck .add (messageIdImpl );
659+ }
660+ } else {
661+ MessageIdImpl messageIdImpl = (MessageIdImpl ) messageId ;
662+ processMessageIdBeforeAcknowledge (messageIdImpl , ackType , 1 );
663+ messageIdListToAck .add (messageIdImpl );
664+ }
665+ }
666+ return this .acknowledgmentsGroupingTracker .addListAcknowledgment (messageIdListToAck , properties );
577667 }
578668 }
579669
0 commit comments