diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java index 70db427afce4f..11a7f6b95186b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import lombok.AllArgsConstructor; +import lombok.Data; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -118,6 +120,25 @@ interface DeleteCallback { void deleteFailed(ManagedLedgerException exception, Object ctx); } + interface CursorDeleteCallback extends DeleteCallback { + void deleteComplete(Object ctx, List positionAckStates); + } + + @AllArgsConstructor + @Data + class PositionAckState { + Position position; + BatchMsgAckResType batchMsgAckResType; + int batchMessageAckCount; + } + + enum BatchMsgAckResType { + AckAllAtOnce, + FirstPartialAck, + PartialAck, + LatestPartialAck; + } + interface TerminateCallback { void terminateComplete(Position lastCommittedPosition, Object ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b72b392a18dfb..c2881c150f515 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -20,6 +20,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.BatchMsgAckResType; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.PositionAckState; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; @@ -2351,7 +2354,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { @Override - public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallback callback, Object ctx) { + public void asyncDelete(Iterable positions, DeleteCallback callback, Object ctx) { if (isClosed()) { callback.deleteFailed(new ManagedLedgerException .CursorAlreadyClosedException("Cursor was already closed"), ctx); @@ -2362,12 +2365,12 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb lock.writeLock().lock(); boolean skipMarkDeleteBecauseAckedNothing = false; + List positionAckNotices = new ArrayList<>(); try { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}", ledger.getName(), name, positions, individualDeletedMessages, markDeletePosition); } - for (Position pos : positions) { Position position = requireNonNull(pos); if (ledger.getLastConfirmedEntry().compareTo(position) < 0) { @@ -2391,8 +2394,12 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); if (ackSet == null || ackSet.length == 0) { - if (batchDeletedIndexes != null) { - batchDeletedIndexes.remove(position); + BitSet bitSet; + if (batchDeletedIndexes == null || (bitSet = batchDeletedIndexes.remove(position)) == null) { + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.AckAllAtOnce, -1)); + } else { + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.LatestPartialAck, + bitSet.cardinality())); } // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will // make the RangeSet recognize the "continuity" between adjacent Positions. @@ -2413,18 +2420,27 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb individualDeletedMessages); } } else if (batchDeletedIndexes != null) { - final var givenBitSet = BitSet.valueOf(ackSet); - final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet); - if (givenBitSet != bitSet) { - bitSet.and(givenBitSet); - } - if (bitSet.isEmpty()) { - Position previousPosition = ledger.getPreviousPosition(position); - individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), - previousPosition.getEntryId(), - position.getLedgerId(), position.getEntryId()); - MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this); - batchDeletedIndexes.remove(position); + final var ackingBitSet = BitSet.valueOf(ackSet); + final var combinedBitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> ackingBitSet); + if (ackingBitSet != combinedBitSet) { + int unAckedBefore = combinedBitSet.cardinality(); + combinedBitSet.and(ackingBitSet); + int unAckedAfter = combinedBitSet.cardinality(); + if (combinedBitSet.isEmpty()) { + Position previousPosition = ledger.getPreviousPosition(position); + individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), + previousPosition.getEntryId(), + position.getLedgerId(), position.getEntryId()); + MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this); + batchDeletedIndexes.remove(position); + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.LatestPartialAck, + unAckedBefore - unAckedAfter)); + } else { + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.PartialAck, + unAckedBefore - unAckedAfter)); + } + } else { + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.FirstPartialAck, -1)); } } } @@ -2478,7 +2494,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } finally { lock.writeLock().unlock(); if (skipMarkDeleteBecauseAckedNothing) { - callback.deleteComplete(ctx); + completeDeleteCallback(callback, ctx, positionAckNotices); } } @@ -2486,7 +2502,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { isDirty = true; updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null); - callback.deleteComplete(ctx); + completeDeleteCallback(callback, ctx, positionAckNotices); return; } @@ -2497,7 +2513,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { - callback.deleteComplete(ctx); + completeDeleteCallback(callback, ctx, positionAckNotices); } @Override @@ -2517,6 +2533,15 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { } } + private void completeDeleteCallback(DeleteCallback deleteCallback, Object ctx, + List positionAckStates) { + if (deleteCallback instanceof AsyncCallbacks.CursorDeleteCallback cursorDeleteCallback) { + cursorDeleteCallback.deleteComplete(ctx, positionAckStates); + } else { + deleteCallback.deleteComplete(ctx); + } + } + // update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition private void updateLastMarkDeleteEntryToLatest(final Position newPosition, final Map properties) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index f074f234b873f..c092a6990a90f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -326,7 +326,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i private void individualAcknowledgeMessageIfNeeded(List positions, Map properties) { if (!(subscription instanceof PulsarCompactorSubscription)) { - subscription.acknowledgeMessage(positions, AckType.Individual, properties); + subscription.acknowledgeMessage(positions, AckType.Individual, properties, null, false); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 3edf6e2d68140..123ff104661fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -549,7 +549,7 @@ public CompletableFuture messageAcked(CommandAck ack) { .thenApply(unused -> 1L); } else { List positionsAcked = Collections.singletonList(position); - subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties); + subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties, this, false); future = CompletableFuture.completedFuture(1L); } } else { @@ -562,74 +562,47 @@ public CompletableFuture messageAcked(CommandAck ack) { return future .thenApply(v -> { - this.messageAckRate.recordEvent(v); - this.messageAckCounter.add(v); + // The case that is typed individual ack without transaction will deal metrics after a callback + // that after cursor deleting positions, so we may receive a 0 value here. + ackMetricRecord(v); return null; }); } + public void ackMetricRecord(long msgCountAcked) { + if (msgCountAcked > 0) { + this.messageAckRate.recordEvent(msgCountAcked); + this.messageAckCounter.add(msgCountAcked); + } + } + //this method is for individual ack not carry the transaction private CompletableFuture individualAckNormal(CommandAck ack, Map properties) { - List> positionsAcked = new ArrayList<>(); - long totalAckCount = 0; + List positionsAcked = new ArrayList<>(); for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); Position position; - ObjectIntPair ackOwnerConsumerAndBatchSize = - getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId()); - Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); - long ackedCount; - int batchSize = ackOwnerConsumerAndBatchSize.rightInt(); if (msgId.getAckSetsCount() > 0) { long[] ackSets = new long[msgId.getAckSetsCount()]; for (int j = 0; j < msgId.getAckSetsCount(); j++) { ackSets[j] = msgId.getAckSetAt(j); } position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), ackSets); - ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer); if (isTransactionEnabled()) { //sync the batch position bit set point, in order to delete the position in pending acks if (Subscription.isIndividualAckMode(subType)) { - ((PersistentSubscription) subscription) - .syncBatchPositionBitSetForPendingAck(position); + ((PersistentSubscription) subscription).syncBatchPositionBitSetForPendingAck(position); } } - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); } else { - position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); - ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer); - if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) { - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); - updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); - } + position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); } - - positionsAcked.add(Pair.of(ackOwnerConsumer, position)); + positionsAcked.add(position); checkAckValidationError(ack, position); - - totalAckCount += ackedCount; - } - subscription.acknowledgeMessage(positionsAcked.stream() - .map(Pair::getRight) - .collect(Collectors.toList()), AckType.Individual, properties); - CompletableFuture completableFuture = new CompletableFuture<>(); - completableFuture.complete(totalAckCount); - if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { - completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionPair -> { - Consumer ackOwnerConsumer = positionPair.getLeft(); - Position position = positionPair.getRight(); - //check if the position can remove from the consumer pending acks. - // the bit set is empty in pending ack handle. - if (AckSetStateUtil.hasAckSet(position)) { - if (((PersistentSubscription) subscription) - .checkIsCanDeleteConsumerPendingAck(position)) { - removePendingAcks(ackOwnerConsumer, position); - } - } - })); } - return completableFuture; + subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties, this, false); + return CompletableFuture.completedFuture(0L); } @@ -745,6 +718,7 @@ private long getAckedCountForTransactionAck(int batchSize, long[] ackSets) { } private long getUnAckedCountForBatchIndexLevelEnabled(Position position, int batchSize) { + // TODO compare with the cursor. long unAckedCount = batchSize; if (isAcknowledgmentAtBatchIndexLevelEnabled) { long[] cursorAckSet = getCursorAckSet(position); @@ -1194,11 +1168,24 @@ public Subscription getSubscription() { return subscription; } - private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { + public int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { int unackedMsgs = 0; if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) { subscription.addUnAckedMessages(ackedMessages); unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); + if (log.isDebugEnabled()) { + if (ackedMessages > 0) { + log.debug("[{}][{}]{}-{}-{} delivered out {} messages, un-ack-msg: {}", + topicName, consumer.subscription.getName(), + consumer.cnx(), consumer.consumerId(), consumer.consumerName(), + ackedMessages, consumer.getUnackedMessages()); + } else { + log.debug("[{}][{}]{}-{}-{} acknowledged/redelivered {} messages, un-ack-msg: {}", + topicName, consumer.subscription.getName(), + consumer.cnx(), consumer.consumerId(), consumer.consumerName(), + -ackedMessages, consumer.getUnackedMessages()); + } + } } if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) { negativeUnackedMsgsTimestamp = System.currentTimeMillis(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 7a728a037dc62..c716db55e2d79 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -304,11 +304,15 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH * @return true if the pending ack was removed, false otherwise */ public boolean remove(long ledgerId, long entryId) { + return removeAndReturn(ledgerId, entryId) != null; + } + + public IntIntPair removeAndReturn(long ledgerId, long entryId) { try { writeLock.lock(); Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { - return false; + return null; } IntIntPair removedEntry = ledgerMap.remove(entryId); boolean removed = removedEntry != null; @@ -319,7 +323,7 @@ public boolean remove(long ledgerId, long entryId) { if (removed && ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); } - return removed; + return removedEntry; } finally { writeLock.unlock(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 452c30b45febb..2ab042c956bdb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; +import org.jspecify.annotations.Nullable; public interface Subscription extends MessageExpirer { @@ -48,7 +49,13 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException { void consumerFlow(Consumer consumer, int additionalNumberOfMessages); - void acknowledgeMessage(List positions, AckType ackType, Map properties); + /** + * @param ackFrom It can be null, and it will always be null if {@param ackType} is {@link AckType#Cumulative}. + * The performance will be improved, if this param is the owner consumer that received the messages + * who are being acked when {@param ackType} is {@link AckType#Individual}. + */ + void acknowledgeMessage(List positions, AckType ackType, Map properties, + @Nullable Consumer ackFrom, boolean triggeredByTxnCommit); String getTopicName(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index d469ce1daa1a8..d984f58915015 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -197,7 +197,8 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { } @Override - public void acknowledgeMessage(List position, AckType ackType, Map properties) { + public void acknowledgeMessage(List position, AckType ackType, Map properties, + Consumer ackFrom, boolean triggerByTxnCommit) { // No-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c35d802f43d54..dd5e577642a8b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -362,10 +362,6 @@ public synchronized void readMoreEntries() { Position markDeletePosition = cursor.getMarkDeletedPosition(); if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) { redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); - for (Consumer consumer : consumerList) { - consumer.getPendingAcks() - .removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); - } lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition; } @@ -779,9 +775,7 @@ protected final synchronized boolean sendMessagesToConsumers(ReadType readType, * if you need to change it. */ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { - if (needTrimAckedMessages()) { - cursor.trimDeletedEntries(entries); - } + cursor.trimDeletedEntries(entries); lastNumberOfEntriesProcessed = 0; int entriesToDispatch = entries.size(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 44a3a13022a82..8ca4f6cfed083 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import io.netty.buffer.ByteBuf; +import it.unimi.dsi.fastutil.ints.IntIntPair; import java.io.IOException; import java.util.Collections; import java.util.LinkedHashMap; @@ -39,9 +40,12 @@ import java.util.stream.Collectors; import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.AsyncCallbacks.BatchMsgAckResType; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.CursorDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.PositionAckState; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -52,7 +56,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ScanOutcome; +import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -93,6 +99,8 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.PositionInPendingAckStats; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -415,7 +423,8 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { } @Override - public void acknowledgeMessage(List positions, AckType ackType, Map properties) { + public void acknowledgeMessage(List positions, AckType ackType, Map properties, + @Nullable Consumer ackFrom, boolean triggeredByTxnCommit) { cursor.updateLastActive(); Position previousMarkDeletePosition = cursor.getMarkDeletedPosition(); @@ -437,7 +446,12 @@ public void acknowledgeMessage(List positions, AckType ackType, Map { if ((cursor.isMessageDeleted(position))) { @@ -538,6 +552,150 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { } }; + private final DeleteCallback deleteCallbackWithHandlingAckState = new CursorDeleteCallback() { + @Override + public void deleteComplete(Object context, List positionAckStates) { + ImmutablePair ctx = (ImmutablePair) context; + Consumer ackFrom = ctx.getLeft(); + Position previousMarkDeletePosition = ctx.getRight(); + if (log.isDebugEnabled()) { + // The value of the param "context" is a position. + log.debug("[{}][{}] Deleted message at {}", topicName, subName, previousMarkDeletePosition); + } + // Update pendingAcks, un-ack-messages, consumer.metrics. + if (Subscription.isIndividualAckMode(getType())) { + PersistentSubscription.this.updatePendingAckMessagesAfterAcknowledged(ackFrom, positionAckStates); + } + // Signal the dispatcher. + if (dispatcher != null) { + dispatcher.afterAckMessages(null, context); + } + notifyTheMarkDeletePositionMoveForwardIfNeeded(previousMarkDeletePosition); + } + + @Override + public void deleteComplete(Object context) { + deleteCallback.deleteComplete(context); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + deleteCallback.deleteFailed(exception, ctx); + } + }; + + private void updatePendingAckMessagesAfterAcknowledged(Consumer ackFrom, List ackedPositions) { + Dispatcher dispatcher0 = getDispatcher(); + if (dispatcher0 != null) { + /* + * There is a race condition which leads us to add this "synchronized" block. + * - consumer-1 received msg-A + * - consumption task is in-progress + * - topic was unloaded + * - reset messages to consumer-2 + * At this moment, race-condition occurs: + * - consumer-1 is acknowledging msg-A + * - dispatcher is delivering msg-A to consumer-2 + * Issue: broker received the acknowledging of msg-A, but no consumer has pending acknowledge that relate + * to msg-A so broker can not know how many single messages in the batched message. + * Solve: to get a precise messages number, this "synchronized" block is needed. + */ + synchronized (dispatcher0) { + updatePendingAckMessagesAfterAcknowledged0(ackFrom, ackedPositions); + } + } else { + updatePendingAckMessagesAfterAcknowledged0(ackFrom, ackedPositions); + } + } + + private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List ackedPositions) { + int totalMsgAcked = 0; + for (PositionAckState ackState : ackedPositions) { + Position position = ackState.getPosition(); + final long ledgerId = position.getLedgerId(); + final long entryId = position.getEntryId(); + final boolean positionRemovedFromCursor = + ackState.getBatchMsgAckResType() == BatchMsgAckResType.AckAllAtOnce + || ackState.getBatchMsgAckResType() == BatchMsgAckResType.LatestPartialAck; + // Find the messages' owner and update un-acknowledged messages. + Consumer owner = null; + IntIntPair batchSizeAndHashPair = ackFrom == null ? null + : (positionRemovedFromCursor + ? ackFrom.getPendingAcks().removeAndReturn(ledgerId, entryId) + : ackFrom.getPendingAcks().get(ledgerId, entryId)); + if (batchSizeAndHashPair != null) { + owner = ackFrom; + } else { + for (Consumer consumer : getConsumers()) { + if (consumer == ackFrom) { + continue; + } + batchSizeAndHashPair = positionRemovedFromCursor + ? consumer.getPendingAcks().removeAndReturn(ledgerId, entryId) + : consumer.getPendingAcks().get(ledgerId, entryId); + if (batchSizeAndHashPair != null) { + // Continue find the owner + owner = consumer; + break; + } + } + } + if (owner == null) { + // Since we can not get how many msgs that were attempted to ack, just plus 1 into the + // "attemptAckMsgs". + totalMsgAcked++; + log.info("[{}][{}]{}-{}-{} skipped to reduce un-ack-msgs for {}:{}, because could not find the" + + " message's owner. consumer size: {}. It may caused by a concurrency acknowledging" + + " and reconnection", + topicName, subName, + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + ledgerId, entryId, getConsumers().size()); + continue; + } + // Calculate messages actually acked in batch. + int actualAcked = 0; + if (ackState.getBatchMsgAckResType() == BatchMsgAckResType.AckAllAtOnce) { + // All messages in batch were acked at once. + actualAcked = Math.max(batchSizeAndHashPair.firstInt(), 1); + } else if (ackState.getBatchMsgAckResType() == BatchMsgAckResType.FirstPartialAck) { + // First part of batch message acked. + // Regarding this case, only consumer knows how many messages in batch were acked, because + // the cursor do not know how many messages in the batch, but "consumer.pendingAcks" knows. + long[] ackSetWords = AckSetStateUtil.getAckSetArrayOrNull(position); + if (ackSetWords != null) { + BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(ackSetWords); + actualAcked = Math.max(batchSizeAndHashPair.firstInt() - ackSet.cardinality(), 0); + ackSet.recycle(); + } + } else { + // Following part of batch message acked. + // Regarding this case, only cursor know how many messages in batch were acked, because + // "consumer.pendingAcks" does not know how many messages were acked count before, only "cursor" + // knows. + actualAcked = ackState.getBatchMessageAckCount(); + } + totalMsgAcked += actualAcked; + // Reduce un-acknowledged messages. + owner.addAndGetUnAckedMsgs(owner, -actualAcked); + owner.updateBlockedConsumerOnUnackedMsgs(owner); + if (log.isDebugEnabled()) { + log.debug("[{}][{}] {}-{}-{} {}-{}-{} acknowledged {} messages, un-ack-msg: {}, position: {}:{}" + + " ack state: {}", + topicName, subName, owner.cnx(), owner.consumerId(), owner.consumerName(), + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + actualAcked, owner.getUnackedMessages(), ledgerId, entryId, ackState); + } + } + // Consumer metrics. + if (ackFrom != null) { + ackFrom.ackMetricRecord(totalMsgAcked); + } + } + private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) { Position oldMD = oldPosition; Position newMD = cursor.getMarkDeletedPosition(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java index fe13aeb572e2e..4237f74a74ccc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; @@ -60,7 +61,8 @@ public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic compact } @Override - public void acknowledgeMessage(List positions, AckType ackType, Map properties) { + public void acknowledgeMessage(List positions, AckType ackType, Map properties, + Consumer ackFrom, boolean triggerByTxnCommit) { checkArgument(ackType == AckType.Cumulative); checkArgument(positions.size() == 1); checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index b21fe7acfdb6f..33c46ca8542d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -204,7 +204,8 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName()); if (sub != null) { - sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); + sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, + Collections.emptyMap(), null, false); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster. log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription", @@ -213,7 +214,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { true /* replicateSubscriptionState */, Collections.emptyMap()) .thenAccept(subscriptionCreated -> { subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos), - AckType.Cumulative, Collections.emptyMap()); + AckType.Cumulative, Collections.emptyMap(), null, false); }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 591842927f35b..5a3af0a2c750f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -465,7 +465,7 @@ private void internalCommitTxn(TxnID txnID, Map properties, long l } persistentSubscription.acknowledgeMessage( Collections.singletonList(cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties); + AckType.Cumulative, properties, null, true); cumulativeAckOfTransaction = null; commitFuture.complete(null); }).exceptionally(e -> { @@ -755,7 +755,7 @@ protected void handleCommit(TxnID txnID, AckType ackType, Map prop if (this.cumulativeAckOfTransaction != null) { persistentSubscription.acknowledgeMessage( Collections.singletonList(this.cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties); + AckType.Cumulative, properties, null, true); } this.cumulativeAckOfTransaction = null; } else { @@ -774,7 +774,7 @@ private void individualAckCommitCommon(TxnID txnID, Map properties) { if (currentTxn != null) { persistentSubscription.acknowledgeMessage(new ArrayList<>(currentTxn.values()), - AckType.Individual, properties); + AckType.Individual, properties, null, true); individualAckOfTransaction.remove(txnID); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerConcurrencyAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerConcurrencyAckTest.java new file mode 100644 index 0000000000000..8e8dd3c64b153 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerConcurrencyAckTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerConcurrencyAckTest extends BrokerTestBase { + + @DataProvider(name = "brokerParams") + public static Object[][] brokerParams() { + return new Object[][]{ + // subscriptionSharedUseClassicPersistentImplementation, acknowledgmentAtBatchIndexLevelEnabled + {false, true}, + {false, false}, + {true, true}, + {true, false} + }; + } + + @Factory(dataProvider = "brokerParams") + public ConsumerConcurrencyAckTest(boolean subscriptionSharedUseClassicPersistentImplementation, + boolean acknowledgmentAtBatchIndexLevelEnabled) { + conf.setSubscriptionSharedUseClassicPersistentImplementation( + subscriptionSharedUseClassicPersistentImplementation); + conf.setAcknowledgmentAtBatchIndexLevelEnabled(acknowledgmentAtBatchIndexLevelEnabled); + } + + @BeforeClass + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider + public Object[][] argsOfTestAcknowledgeConcurrently() { + // enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2 + return new Object[][] { + {true, true, true}, + {true, false, false}, + {true, true, false}, + {true, false, true}, + {false, true, true}, + {false, false, false}, + {false, true, false}, + {false, false, true}, + }; + } + + /** + * Test: one message may be acknowledged by two consumers at the same time. + * Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. + * 1. Consumer-1 received messages. + * 2. Unload the topic. + * 3. The message may be sent to consumer-2, but the consumption of consumer-1 is still in-progress now. + * 4. Consumer-1 and consumer-2 acknowledge the message concurrently. + */ + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + log.info("start test. classic dispatcher: {}, broker enabled batch ack: {}, enableBatchSend: {}," + + " enableBatchIndexAcknowledgment1: {}, enableBatchIndexAcknowledgment2: {}", + pulsar.getConfig().isSubscriptionSharedUseClassicPersistentImplementation(), + pulsar.getConfig().isAcknowledgmentAtBatchIndexLevelEnabled(), + enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2 + ); + PulsarClientImpl client1 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + PulsarClientImpl client2 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + final int msgCount = 6; + final CountDownLatch acknowledgeSignal1 = new CountDownLatch(1); + final CountDownLatch acknowledgeSignal2 = new CountDownLatch(1); + final CountDownLatch acknowledgeSignal3 = new CountDownLatch(1); + final CountDownLatch acknowledgeFinishedSignal = new CountDownLatch(3); + final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + final String subscription1 = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscription1, MessageId.earliest); + Producer producer = client1.newProducer(Schema.STRING).topic(topic) + .enableBatching(enableBatchSend).batchingMaxMessages(4).create(); + for (int i = 0; i < msgCount; i++) { + producer.sendAsync(i + ""); + } + + // Consumer-1 using user threads to consume asynchronously, it will not acknowledge messages one by one. + ConsumerImpl consumer1 = (ConsumerImpl) client1.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subscription1) + .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment1) + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c1") + .subscriptionType(SubscriptionType.Shared).subscribe(); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer1.numMessagesInQueue(), msgCount); + }); + consumer1.pause(); + List msgReceivedC11 = new ArrayList<>(); + List msgReceivedC12 = new ArrayList<>(); + List msgReceivedC13 = new ArrayList<>(); + for (int i = 0; i < msgCount; i++) { + Message msg = consumer1.receive(); + assertNotNull(msg); + if (i % 4 == 0) { + msgReceivedC11.add(msg.getMessageId()); + } else if (i % 3 == 0) { + msgReceivedC12.add(msg.getMessageId()); + } else if (i % 2 == 0) { + msgReceivedC13.add(msg.getMessageId()); + } else { + msgReceivedC13.add(msg.getMessageId()); + } + } + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer1.acknowledge(msgReceivedC11); + acknowledgeSignal2.await(); + consumer1.acknowledge(msgReceivedC12); + acknowledgeSignal3.await(); + consumer1.acknowledge(msgReceivedC13); + consumer1.resume(); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-1 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + + // After a topic unloading, the messages will be resent to consumer-2. + // Consumer-2 using user threads to consume asynchronously, it will not acknowledge messages one by one. + ConsumerImpl consumer2 = (ConsumerImpl) client2.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subscription1) + .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment2) + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c2") + .subscriptionType(SubscriptionType.Shared).subscribe(); + admin.topics().unload(topic); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer2.numMessagesInQueue(), msgCount); + }); + List msgReceivedC21 = new ArrayList<>(); + List msgReceivedC22 = new ArrayList<>(); + List msgReceivedC23 = new ArrayList<>(); + for (int i = 0; i < msgCount; i++) { + Message msg = consumer2.receive(); + assertNotNull(msg); + if (i % 4 == 0) { + msgReceivedC21.add(msg.getMessageId()); + } else if (i % 3 == 0) { + msgReceivedC22.add(msg.getMessageId()); + } else if (i % 2 == 0) { + msgReceivedC23.add(msg.getMessageId()); + } else { + msgReceivedC23.add(msg.getMessageId()); + } + } + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer2.acknowledge(msgReceivedC21); + acknowledgeSignal2.await(); + consumer2.acknowledge(msgReceivedC22); + acknowledgeSignal3.await(); + consumer2.acknowledge(msgReceivedC23); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-2 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + // Start another thread to mock a consumption repeatedly. + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer2.acknowledge(msgReceivedC21); + acknowledgeSignal2.await(); + consumer2.acknowledge(msgReceivedC22); + acknowledgeSignal3.await(); + consumer2.acknowledge(msgReceivedC23); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-2 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + + // Trigger concurrently acknowledge. + acknowledgeSignal1.countDown(); + Thread.sleep(1000); + acknowledgeSignal2.countDown(); + Thread.sleep(1000); + acknowledgeSignal3.countDown(); + + // Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. + acknowledgeFinishedSignal.await(); + Awaitility.await().untilAsserted(() -> { + SubscriptionStats stats = admin.topics().getStats(topic).getSubscriptions().get(subscription1); + log.info("backlog: {}, unack: {}", stats.getMsgBacklog(), stats.getUnackedMessages()); + assertEquals(stats.getMsgBacklog(), 0); + assertEquals(stats.getUnackedMessages(), 0); + for (ConsumerStats consumerStats : stats.getConsumers()) { + assertEquals(consumerStats.getUnackedMessages(), 0); + } + }); + + // cleanup. + consumer1.close(); + consumer2.close(); + producer.close(); + client1.close(); + client2.close(); + admin.topics().delete(topic, false); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 31a9b7f95d676..ee6a7173a22c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -26,6 +26,7 @@ import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -84,7 +85,7 @@ public void setup() throws Exception { doReturn(Codec.encode("sub-1")).when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", cursor, false)); - doNothing().when(sub).acknowledgeMessage(any(), any(), any()); + doNothing().when(sub).acknowledgeMessage(any(), any(), any(), any(), anyBoolean()); } @AfterMethod(alwaysRun = true) @@ -124,7 +125,7 @@ public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throw commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any()); + verify(sub, never()).acknowledgeMessage(any(), any(), any(), any(), anyBoolean()); } @Test(timeOut = 5000, dataProvider = "notIndividualAckModes") @@ -139,7 +140,7 @@ public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) th commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, times(1)).acknowledgeMessage(any(), any(), any()); + verify(sub, times(1)).acknowledgeMessage(any(), any(), any(), any(), anyBoolean()); } @Test(timeOut = 5000) @@ -155,6 +156,6 @@ public void testAckWithMoreThanNoneMessageIds() throws Exception { commandAck.addMessageId().setEntryId(0L).setLedgerId(2L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any()); + verify(sub, never()).acknowledgeMessage(any(), any(), any(), any(), anyBoolean()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index e36a940b4ea5c..a4f873339c6a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1776,7 +1776,7 @@ public void testCompactorSubscription() { Position position = PositionFactory.create(1, 1); long ledgerId = 0xc0bfefeL; sub.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, - Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId)); + Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId), null, false); verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index b945f5abcbc4d..7bf4621d055f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -86,7 +86,7 @@ public void testMarkerDeleteTimes() throws Exception { spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false); Position position = managedLedger.addEntry("test".getBytes()); persistentSubscription.acknowledgeMessage(Collections.singletonList(position), - AckType.Individual, Collections.emptyMap()); + AckType.Individual, Collections.emptyMap(), null, false); verify(managedLedger, times(0)).asyncReadEntry(any(), any(), any()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 360be2e435ab1..058d54847c06f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -191,7 +191,8 @@ public void testCanAcknowledgeAndAbortForTransaction() throws Exception { positionList.add(PositionFactory.create(3, 5)); // Acknowledge from normal consumer will succeed ignoring message acked by ongoing transaction. - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, + Collections.emptyMap(), null, false); //Abort txn. persistentSubscription.endTxn(txnID1.getMostSigBits(), txnID2.getLeastSigBits(), TxnAction.ABORT_VALUE, -1); @@ -223,7 +224,7 @@ public void testAcknowledgeUpdateCursorLastActive() throws Exception { positionList.add(PositionFactory.create(1, 1)); long beforeAcknowledgeTimestamp = System.currentTimeMillis(); Thread.sleep(1); - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null, true); // `acknowledgeMessage` should update cursor last active assertTrue(persistentSubscription.cursor.getLastActive() > beforeAcknowledgeTimestamp); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index 4f773e8a124b0..b9e9d532a2945 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -246,7 +246,7 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception { // and it won't clear the last message in cursor batch index ack set consumer.acknowledgeAsync(messageIds[1], commitTwice).get(); assertEquals(batchDeletedIndexes.size(), 1); - assertEquals(testPersistentSubscription.getConsumers().get(0).getPendingAcks().size(), 0); + assertEquals(testPersistentSubscription.getConsumers().get(0).getPendingAcks().size(), 1); // the messages has been produced were all acked, the memory in broker for the messages has been cleared. commitTwice.commit().get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index f43e2a1c672c7..ecd94a452c382 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doAnswer; @@ -2136,7 +2137,7 @@ public void testDeleteCompactedLedgerWithSlowAck() throws Exception { Thread.sleep(500); } }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq( - CommandAck.AckType.Cumulative), Mockito.any()); + CommandAck.AckType.Cumulative), Mockito.any(), any(), anyBoolean()); // trigger compaction admin.topics().triggerCompaction(topicName);