6262import java .util .concurrent .locks .ReentrantReadWriteLock ;
6363import java .util .function .Function ;
6464import java .util .stream .Collectors ;
65+ import javax .annotation .Nullable ;
6566import lombok .AccessLevel ;
6667import lombok .Getter ;
6768import org .apache .commons .lang3 .StringUtils ;
69+ import org .apache .commons .lang3 .tuple .Pair ;
6870import org .apache .pulsar .client .api .Consumer ;
6971import org .apache .pulsar .client .api .ConsumerCryptoFailureAction ;
7072import org .apache .pulsar .client .api .DeadLetterPolicy ;
@@ -204,6 +206,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
204206
205207 private final AtomicReference <ClientCnx > clientCnxUsedForConsumerRegistration = new AtomicReference <>();
206208 private final List <Throwable > previousExceptions = new CopyOnWriteArrayList <Throwable >();
209+ // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
210+ private final Map <Pair <Long , Long >, BatchMessageAcker > batchMessageToAcker = new ConcurrentHashMap <>();
211+
207212 static <T > ConsumerImpl <T > newConsumerImpl (PulsarClientImpl client ,
208213 String topic ,
209214 ConsumerConfigurationData <T > conf ,
@@ -529,6 +534,52 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
529534 return result ;
530535 }
531536
537+ private void processMessageIdBeforeAcknowledge (MessageIdImpl messageId , AckType ackType , int numMessages ) {
538+ if (ackType == AckType .Individual ) {
539+ stats .incrementNumAcksSent (numMessages );
540+ unAckedMessageTracker .remove (messageId );
541+ if (possibleSendToDeadLetterTopicMessages != null ) {
542+ possibleSendToDeadLetterTopicMessages .remove (messageId );
543+ }
544+ } else {
545+ stats .incrementNumAcksSent (unAckedMessageTracker .removeMessagesTill (messageId ));
546+ }
547+ }
548+
549+ private @ Nullable MessageIdImpl getMessageIdToAcknowledge (BatchMessageIdImpl messageId , AckType ackType ) {
550+ if (conf .isBatchIndexAckEnabled ()) {
551+ return messageId ;
552+ }
553+ final BatchMessageAcker acker ;
554+ if (messageId .getAcker () instanceof BatchMessageAckerDisabled ) {
555+ acker = batchMessageToAcker .computeIfAbsent (
556+ Pair .of (messageId .getLedgerId (), messageId .getEntryId ()),
557+ __ -> BatchMessageAcker .newAcker (messageId .getOriginalBatchSize ()));
558+ } else {
559+ acker = messageId .getAcker ();
560+ }
561+ if (ackType == AckType .Individual ) {
562+ if (acker .ackIndividual (messageId .getBatchIndex ())) {
563+ batchMessageToAcker .remove (Pair .of (messageId .getLedgerId (), messageId .getEntryId ()));
564+ return messageId .toMessageIdImpl ();
565+ } else {
566+ return null ;
567+ }
568+ } else {
569+ if (acker .ackCumulative (messageId .getBatchIndex ())) {
570+ batchMessageToAcker .remove (Pair .of (messageId .getLedgerId (), messageId .getEntryId ()));
571+ return messageId .toMessageIdImpl ();
572+ } else {
573+ if (acker .isPrevBatchCumulativelyAcked ()) {
574+ return null ;
575+ } else {
576+ acker .setPrevBatchCumulativelyAcked (true );
577+ return messageId .prevBatchMessageId ();
578+ }
579+ }
580+ }
581+ }
582+
532583 @ Override
533584 protected CompletableFuture <Void > doAcknowledge (MessageId messageId , AckType ackType ,
534585 Map <String , Long > properties ,
@@ -549,13 +600,34 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
549600 return doTransactionAcknowledgeForResponse (messageId , ackType , null , properties ,
550601 new TxnID (txn .getTxnIdMostBits (), txn .getTxnIdLeastBits ()));
551602 }
552- return acknowledgmentsGroupingTracker .addAcknowledgment ((MessageIdImpl ) messageId , ackType , properties );
603+ if (ackType == AckType .Individual ) {
604+ onAcknowledge (messageId , null );
605+ } else {
606+ onAcknowledgeCumulative (messageId , null );
607+ }
608+ if (messageId instanceof BatchMessageIdImpl ) {
609+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl ) messageId ;
610+ MessageIdImpl messageIdImpl = getMessageIdToAcknowledge (batchMessageId , ackType );
611+ if (messageIdImpl == null ) {
612+ return CompletableFuture .completedFuture (null );
613+ } else if (messageIdImpl instanceof BatchMessageIdImpl ) {
614+ return acknowledgmentsGroupingTracker .addBatchIndexAck (
615+ (BatchMessageIdImpl ) messageIdImpl , ackType , properties );
616+ } else {
617+ processMessageIdBeforeAcknowledge (messageIdImpl , ackType , batchMessageId .getOriginalBatchSize ());
618+ return acknowledgmentsGroupingTracker .addAcknowledgment (messageIdImpl , ackType , properties );
619+ }
620+ } else {
621+ MessageIdImpl messageIdImpl = (MessageIdImpl ) messageId ;
622+ processMessageIdBeforeAcknowledge (messageIdImpl , ackType , 1 );
623+ return acknowledgmentsGroupingTracker .addAcknowledgment (messageIdImpl , ackType , properties );
624+ }
553625 }
554626
555627 @ Override
556628 protected CompletableFuture <Void > doAcknowledge (List <MessageId > messageIdList , AckType ackType ,
557629 Map <String , Long > properties , TransactionImpl txn ) {
558-
630+ List < MessageIdImpl > messageIdListToAck = new ArrayList <>();
559631 for (MessageId messageId : messageIdList ) {
560632 checkArgument (messageId instanceof MessageIdImpl );
561633 }
@@ -573,7 +645,24 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
573645 return doTransactionAcknowledgeForResponse (messageIdList , ackType , null ,
574646 properties , new TxnID (txn .getTxnIdMostBits (), txn .getTxnIdLeastBits ()));
575647 } else {
576- return this .acknowledgmentsGroupingTracker .addListAcknowledgment (messageIdList , ackType , properties );
648+ for (MessageId messageId : messageIdList ) {
649+ checkArgument (messageId instanceof MessageIdImpl );
650+ onAcknowledge (messageId , null );
651+ if (messageId instanceof BatchMessageIdImpl ) {
652+ MessageIdImpl messageIdImpl = getMessageIdToAcknowledge ((BatchMessageIdImpl ) messageId , ackType );
653+ if (messageIdImpl != null ) {
654+ if (!(messageIdImpl instanceof BatchMessageIdImpl )) {
655+ processMessageIdBeforeAcknowledge (messageIdImpl , ackType , 1 );
656+ }
657+ messageIdListToAck .add (messageIdImpl );
658+ }
659+ } else {
660+ MessageIdImpl messageIdImpl = (MessageIdImpl ) messageId ;
661+ processMessageIdBeforeAcknowledge (messageIdImpl , ackType , 1 );
662+ messageIdListToAck .add (messageIdImpl );
663+ }
664+ }
665+ return this .acknowledgmentsGroupingTracker .addListAcknowledgment (messageIdListToAck , properties );
577666 }
578667 }
579668
0 commit comments