From 4e9acdab2b8441bd84741a9dc954f1f6e210d36b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 16 Jul 2025 16:42:44 +0800 Subject: [PATCH 1/2] [improve][broker] PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload Buffer Parsing --- .../admin/impl/PersistentTopicsBase.java | 31 ++----- .../pulsar/broker/service/ServerCnx.java | 77 ++--------------- .../pulsar/client/impl/RawBatchConverter.java | 12 ++- .../pulsar/compaction/CompactedTopicImpl.java | 17 ++-- .../PulsarTopicCompactionService.java | 83 +++++++++++++++---- .../compaction/TopicCompactionService.java | 36 ++++---- .../pulsar/compaction/CompactorTest.java | 6 +- .../GetLastMessageIdCompactedTest.java | 41 +++++++++ .../TopicCompactionServiceTest.java | 24 ------ pulsar-common/src/main/proto/PulsarApi.proto | 10 +++ 10 files changed, 177 insertions(+), 160 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 604af932cca0a..e67f91148b527 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2837,31 +2837,16 @@ protected CompletableFuture internalGetMessageIdByTimestampAsync(long "Get message ID by timestamp on a non-persistent topic is not allowed"); } final PersistentTopic persistentTopic = (PersistentTopic) topic; + final var compactionService = persistentTopic.getTopicCompactionService(); - return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> { - if (lastEntry == null) { - return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger()); - } - MessageMetadata metadata; - Position position = lastEntry.getPosition(); - try { - metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer()); - } finally { - lastEntry.release(); - } - if (timestamp == metadata.getPublishTime()) { - return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(), - position.getEntryId(), topicName.getPartitionIndex())); - } else if (timestamp < metadata.getPublishTime()) { + return compactionService.getLastMessagePosition().thenCompose(messagePosition -> { + if (timestamp == messagePosition.publishTime()) { + return CompletableFuture.completedFuture(new MessageIdImpl(messagePosition.ledgerId(), + messagePosition.entryId(), topicName.getPartitionIndex())); + } else if (timestamp < messagePosition.publishTime()) { return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp) - .thenApply(compactedEntry -> { - try { - return new MessageIdImpl(compactedEntry.getLedgerId(), - compactedEntry.getEntryId(), topicName.getPartitionIndex()); - } finally { - compactedEntry.release(); - } - }); + .thenApply(__ -> new MessageIdImpl(__.getLedgerId(), __.getEntryId(), + topicName.getPartitionIndex())); } else { return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3aa365323ec7c..f029d922db9d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -42,7 +42,6 @@ import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; @@ -138,7 +137,6 @@ import org.apache.pulsar.common.api.proto.CommandUnsubscribe; import org.apache.pulsar.common.api.proto.CommandWatchTopicList; import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; -import org.apache.pulsar.common.api.proto.CompressionType; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; @@ -149,10 +147,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.Schema; import org.apache.pulsar.common.api.proto.ServerError; -import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; -import org.apache.pulsar.common.compression.CompressionCodec; -import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.Metadata; @@ -2283,7 +2278,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) .thenApply(lastPosition -> { int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - Position markDeletePosition = null; + Position markDeletePosition = PositionFactory.EARLIEST; if (consumer.getSubscription() instanceof PersistentSubscription) { markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() .getMarkDeletedPosition(); @@ -2344,8 +2339,7 @@ private void getLargestBatchIndexWhenPossible( } else { // if readCompacted is false, we need to return MessageId.earliest writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + markDeletePosition.getLedgerId(), markDeletePosition.getEntryId())); } return; } @@ -2404,8 +2398,7 @@ public String toString() { writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(), lastPosition.getEntryId(), partitionIndex, largestBatchIndex, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + markDeletePosition.getLedgerId(), markDeletePosition.getEntryId())); } }); }); @@ -2413,38 +2406,11 @@ public String toString() { private void handleLastMessageIdFromCompactionService(PersistentTopic persistentTopic, long requestId, int partitionIndex, Position markDeletePosition) { - persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> { - if (entry != null) { - try { - // in this case, all the data has been compacted, so return the last position - // in the compacted ledger to the client - ByteBuf payload = entry.getDataBuffer(); - MessageMetadata metadata = Commands.parseMessageMetadata(payload); - int largestBatchIndex; - try { - largestBatchIndex = calculateTheLastBatchIndexInBatch(metadata, payload); - } catch (IOException ioEx) { - writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, - "Failed to deserialize batched message from the last entry of the compacted Ledger: " - + ioEx.getMessage())); - return; - } - writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); - } finally { - entry.release(); - } - } else { - // in this case, the ledgers been removed except the current ledger - // and current ledger without any data - writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - -1, -1, partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); - } - }).exceptionally(ex -> { + persistentTopic.getTopicCompactionService().getLastMessagePosition().thenAccept(position -> + writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, position.ledgerId(), position.entryId(), + partitionIndex, position.batchIndex(), markDeletePosition.getLedgerId(), + markDeletePosition.getEntryId())) + ).exceptionally(ex -> { writeAndFlush(Commands.newError( requestId, ServerError.MetadataError, "Failed to read last entry of the compacted Ledger " @@ -2453,33 +2419,6 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent }); } - private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException { - int batchSize = metadata.getNumMessagesInBatch(); - if (batchSize <= 1){ - return -1; - } - if (metadata.hasCompression()) { - var tmp = payload; - CompressionType compressionType = metadata.getCompression(); - CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); - int uncompressedSize = metadata.getUncompressedSize(); - payload = codec.decode(payload, uncompressedSize); - tmp.release(); - } - SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); - int lastBatchIndexInBatch = -1; - for (int i = 0; i < batchSize; i++){ - ByteBuf singleMessagePayload = - Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize); - singleMessagePayload.release(); - if (singleMessageMetadata.isCompactedOut()){ - continue; - } - lastBatchIndexInBatch = i; - } - return lastBatchIndexInBatch; - } - private CompletableFuture isNamespaceOperationAllowed(NamespaceName namespaceName, NamespaceOperation operation) { if (!service.isAuthorizationEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index d8c491dab2906..64ddea3ec6ab1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -151,7 +151,7 @@ public static Optional rebatchMessage(RawMessage msg, ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize); try { int batchSize = metadata.getNumMessagesInBatch(); - int messagesRetained = 0; + final var retainedBatchIndexes = new ArrayList(); SingleMessageMetadata emptyMetadata = new SingleMessageMetadata().setCompactedOut(true); SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); @@ -169,7 +169,7 @@ public static Optional rebatchMessage(RawMessage msg, Unpooled.EMPTY_BUFFER, batchBuffer); } else if (!singleMessageMetadata.hasPartitionKey()) { if (retainNullKey) { - messagesRetained++; + retainedBatchIndexes.add(i); Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, singleMessagePayload, batchBuffer); } else { @@ -178,7 +178,7 @@ public static Optional rebatchMessage(RawMessage msg, } } else if (filter.test(singleMessageMetadata.getPartitionKey(), id) && singleMessagePayload.readableBytes() > 0) { - messagesRetained++; + retainedBatchIndexes.add(i); Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata, singleMessagePayload, batchBuffer); } else { @@ -189,10 +189,14 @@ public static Optional rebatchMessage(RawMessage msg, singleMessagePayload.release(); } - if (messagesRetained > 0) { + if (!retainedBatchIndexes.isEmpty()) { int newUncompressedSize = batchBuffer.readableBytes(); ByteBuf compressedPayload = codec.encode(batchBuffer); + metadata.clearCompactedBatchIndexes(); + for (int index : retainedBatchIndexes) { + metadata.addCompactedBatchIndexe(index); + } metadata.setUncompressedSize(newUncompressedSize); ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index cc317fa99f630..db1aecf3887b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -345,18 +345,23 @@ CompletableFuture findFirstMatchEntry(final Predicate predicate) { var compactedTopicContextFuture = this.getCompactedTopicContextFuture(); if (compactedTopicContextFuture == null) { - return CompletableFuture.completedFuture(null); + return CompletableFuture.failedFuture(new IllegalStateException( + "CompactedTopicContext is not initialized")); } return compactedTopicContextFuture.thenCompose(compactedTopicContext -> { LedgerHandle lh = compactedTopicContext.getLedger(); CompletableFuture promise = new CompletableFuture<>(); findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh); - return promise.thenCompose(index -> { - if (index == null) { - return CompletableFuture.completedFuture(null); + return promise.thenCompose(index -> readEntries(lh, index, index).thenApply(entries -> { + if (entries.size() != 1) { + for (final var entry : entries) { + entry.release(); + } + throw new IllegalStateException("Read " + entries.size() + " entries from the compacted ledger " + + lh + " entry " + index); } - return readEntries(lh, index, index).thenApply(entries -> entries.get(0)); - }); + return entries.get(0); + })); }); } private static void findFirstMatchIndexLoop(final Predicate predicate, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java index e123c11fd79c2..ea724cb6480e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -22,18 +22,22 @@ import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY; import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED; import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; +import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; import org.jspecify.annotations.NonNull; @@ -98,34 +102,81 @@ public CompletableFuture> readCompactedEntries(@NonNull Position sta return resultFuture; } - @Override - public CompletableFuture readLastCompactedEntry() { - return compactedTopic.readLastEntryOfCompactedLedger(); - } - @Override public CompletableFuture getLastCompactedPosition() { return CompletableFuture.completedFuture(compactedTopic.getCompactionHorizon().orElse(null)); } @Override - public CompletableFuture findEntryByPublishTime(long publishTime) { + public CompletableFuture findEntryByPublishTime(long publishTime) { final Predicate predicate = entry -> { return Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >= publishTime; }; - return compactedTopic.findFirstMatchEntry(predicate); + return compactedTopic.findFirstMatchEntry(predicate).thenApply(entry -> { + try { + return PositionFactory.create(entry.getLedgerId(), entry.getEntryId()); + } finally { + entry.release(); + } + }); } @Override - public CompletableFuture findEntryByEntryIndex(long entryIndex) { - final Predicate predicate = entry -> { - BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); - if (brokerEntryMetadata == null || !brokerEntryMetadata.hasIndex()) { - return false; + public CompletableFuture getLastMessagePosition() { + return compactedTopic.readLastEntryOfCompactedLedger().thenApply(entry -> { + if (entry == null) { + return MessagePosition.EARLIEST; } - return brokerEntryMetadata.getIndex() >= entryIndex; - }; - return compactedTopic.findFirstMatchEntry(predicate); + try { + final var payload = entry.getDataBuffer(); + final var metadata = Commands.parseMessageMetadata(payload); + final var batchSize = metadata.getNumMessagesInBatch(); + final var publishTime = metadata.getPublishTime(); + if (batchSize <= 1) { + return new MessagePosition(entry.getLedgerId(), entry.getEntryId(), -1, publishTime); + } + final int compactedBatchIndexesCount = metadata.getCompactedBatchIndexesCount(); + if (compactedBatchIndexesCount > 0) { + final var batchIndex = metadata.getCompactedBatchIndexeAt(compactedBatchIndexesCount - 1); + return new MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchIndex, publishTime); + } + // Encrypted messages won't be compacted + if (metadata.getEncryptionKeysCount() > 0) { + return new MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchSize - 1, publishTime); + } + final ByteBuf uncompressedPayload; + if (metadata.hasCompression()) { + final var codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); + final var uncompressedSize = metadata.getUncompressedSize(); + uncompressedPayload = codec.decode(payload, uncompressedSize); + } else { + uncompressedPayload = payload.retain(); + } + try { + SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + int batchIndex = -1; + for (int i = 0; i < batchSize; i++){ + final var singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, + singleMessageMetadata, i, batchSize); + singleMessagePayload.release(); + if (singleMessageMetadata.isCompactedOut()){ + continue; + } + batchIndex = i; + } + if (batchIndex < 0) { + throw new IllegalStateException("No valid message in entry " + entry.getPosition()); + } + return new MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchIndex, publishTime); + } finally { + uncompressedPayload.release(); + } + } catch (IOException e) { + throw new CompletionException(e); + } finally { + entry.release(); + } + }); } public CompactedTopicImpl getCompactedTopic() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java index 11ce916f3ea9a..6153f4cfa8962 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java @@ -47,13 +47,6 @@ public interface TopicCompactionService extends AutoCloseable { */ CompletableFuture> readCompactedEntries(@NonNull Position startPosition, int numberOfEntriesToRead); - /** - * Read the last compacted entry from the TopicCompactionService. - * - * @return a future that will be completed with the compacted last entry, this entry can be null. - */ - CompletableFuture readLastCompactedEntry(); - /** * Get the last compacted position from the TopicCompactionService. * @@ -67,14 +60,27 @@ public interface TopicCompactionService extends AutoCloseable { * @param publishTime the publish time of entry. * @return the first entry metadata that greater or equal to target publishTime, this entry can be null. */ - CompletableFuture findEntryByPublishTime(long publishTime); + CompletableFuture findEntryByPublishTime(long publishTime); /** - * Find the first entry that greater or equal to target entryIndex, - * if an entry that broker entry metadata is missed, then it will be skipped and find the next match entry. - * - * @param entryIndex the index of entry. - * @return the first entry that greater or equal to target entryIndex, this entry can be null. - */ - CompletableFuture findEntryByEntryIndex(long entryIndex); + * Retrieve the position of the last message before compaction. + * + * @return A future that completes with the position of the last message before compaction, or + * {@link MessagePosition#EARLIEST} if no such message exists. + */ + CompletableFuture getLastMessagePosition(); + + /** + * Represents the position of a message. + *

+ * The `ledgerId` and `entryId` together specify the exact entry to which the message belongs. For batched messages, + * the `batchIndex` field indicates the index of the message within the batch. If the message is not part of a + * batch, the `batchIndex` field is set to -1. The `publishTime` field corresponds to the publishing time of the + * entry's metadata, providing a timestamp for when the entry was published. + *

+ */ + record MessagePosition(long ledgerId, long entryId, int batchIndex, long publishTime) { + + public static final MessagePosition EARLIEST = new MessagePosition(-1L, -1L, 0, 0); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index c7deab62460df..b330f56cbbc94 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -400,11 +400,11 @@ public void testCompactedWithConcurrentSend() throws Exception { }); Position lastCompactedPosition = topicCompactionService.getLastCompactedPosition().get(); - Entry lastCompactedEntry = topicCompactionService.readLastCompactedEntry().get(); + final var lastMessagePosition = topicCompactionService.getLastMessagePosition().get(); Assert.assertTrue(PositionFactory.create(lastCompactedPosition.getLedgerId(), - lastCompactedPosition.getEntryId()).compareTo(lastCompactedEntry.getLedgerId(), - lastCompactedEntry.getEntryId()) >= 0); + lastCompactedPosition.getEntryId()).compareTo(lastMessagePosition.ledgerId(), + lastMessagePosition.entryId()) >= 0); future.join(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 252ee939aace3..c5ee7a0a77628 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -29,6 +29,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; @@ -39,6 +41,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -57,6 +60,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker-impl") public class GetLastMessageIdCompactedTest extends ProducerConsumerBase { @@ -516,4 +520,41 @@ public void testReaderStuckWithCompaction(boolean enabledBatch) throws Exception assertNotEquals(message, null); } } + + @Test(timeOut = 30000) + public void testGetLastMessageIdForEncryptedMessage() throws Exception { + final var topic = BrokerTestUtil.newUniqueName("tp"); + final var ecdsaPublickeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic) + .batchingMaxBytes(Integer.MAX_VALUE) + .batchingMaxMessages(Integer.MAX_VALUE) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublickeyFile) + .create(); + producer.newMessage().key("k0").value("v0").sendAsync(); + producer.newMessage().key("k0").value("v1").sendAsync(); + producer.newMessage().key("k1").value("v0").sendAsync(); + producer.newMessage().key("k1").value(null).sendAsync(); + producer.flush(); + triggerCompactionAndWait(topic); + + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") + .readCompacted(true).defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe(); + final var msgId = (MessageIdAdv) consumer.getLastMessageIds().get(0); + // Compaction does not work for encrypted messages + assertEquals(msgId.getBatchIndex(), 3); + + @Cleanup final var reader = pulsarClient.newReader(Schema.STRING).topic(topic) + .startMessageId(MessageId.earliest).topic(topic).readCompacted(true) + .defaultCryptoKeyReader(ecdsaPrivateKeyFile).create(); + MessageIdAdv readMsgId = (MessageIdAdv) MessageId.earliest; + while (reader.hasMessageAvailable()) { + final var msg = reader.readNext(); + log.info("Read key: {}, value: {}", msg.getKey(), Optional.ofNullable(msg.getValue()).orElse("(null)")); + readMsgId = (MessageIdAdv) msg.getMessageId(); + } + assertEquals(readMsgId, msgId); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java index 8654e595c2a97..2ce56b4351f71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -173,29 +173,5 @@ public void test() throws Exception { List entries2 = service.readCompactedEntries(PositionFactory.EARLIEST, 1).join(); assertEquals(entries2.size(), 1); - - Entry entry = service.findEntryByEntryIndex(0).join(); - BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); - assertNotNull(brokerEntryMetadata); - assertEquals(brokerEntryMetadata.getIndex(), 2); - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - assertEquals(metadata.getPartitionKey(), "a"); - entry.release(); - - entry = service.findEntryByEntryIndex(3).join(); - brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); - assertNotNull(brokerEntryMetadata); - assertEquals(brokerEntryMetadata.getIndex(), 4); - metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - assertEquals(metadata.getPartitionKey(), "b"); - entry.release(); - - entry = service.findEntryByPublishTime(startTime).join(); - brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); - assertNotNull(brokerEntryMetadata); - assertEquals(brokerEntryMetadata.getIndex(), 2); - metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - assertEquals(metadata.getPartitionKey(), "a"); - entry.release(); } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index eacec33169e34..a03daf05f997c 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -163,6 +163,16 @@ message MessageMetadata { // Indicate if the message partition key is set optional bool null_partition_key = 30 [default = false]; + + // Indicates the indexes of messages retained in the batch after compaction. When a batch is compacted, + // some messages may be removed (compacted out). For example, if the original batch contains: + // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will retain only `k0 => v0` and `k2 => v2`. + // In this case, this field will be set to `[0, 2]`, and the payload buffer will only include the retained messages. + // + // Note: Batches compacted by older versions of the compaction service do not include this field. For such batches, + // the `compacted_out` field in `SingleMessageMetadata` must be checked to identify and filter out compacted + // messages (e.g., `k1 => v1` and `k1 => null` in the example above). + repeated int32 compacted_batch_indexes = 31; } message SingleMessageMetadata { From 609a97285e9853d093d3ba7a1702acaf9bf1ad9e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 16 Jul 2025 17:12:47 +0800 Subject: [PATCH 2/2] Fix checkstyle --- .../test/java/org/apache/pulsar/compaction/CompactorTest.java | 1 - .../apache/pulsar/compaction/TopicCompactionServiceTest.java | 4 ---- 2 files changed, 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index b330f56cbbc94..0aeb49312be57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -45,7 +45,6 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.BrokerTestUtil; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java index 2ce56b4351f71..da4ce33db4003 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -21,7 +21,6 @@ import static org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.testng.Assert.assertEquals; -import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.fail; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; @@ -40,11 +39,8 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; -import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.protocol.Commands; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test;