Skip to content

Commit d272825

Browse files
[improve][broker] PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload Buffer Parsing (#24523)
1 parent 1e57827 commit d272825

File tree

10 files changed

+177
-165
lines changed

10 files changed

+177
-165
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2837,31 +2837,16 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestampAsync(long
28372837
"Get message ID by timestamp on a non-persistent topic is not allowed");
28382838
}
28392839
final PersistentTopic persistentTopic = (PersistentTopic) topic;
2840+
final var compactionService = persistentTopic.getTopicCompactionService();
28402841

2841-
return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> {
2842-
if (lastEntry == null) {
2843-
return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
2844-
}
2845-
MessageMetadata metadata;
2846-
Position position = lastEntry.getPosition();
2847-
try {
2848-
metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer());
2849-
} finally {
2850-
lastEntry.release();
2851-
}
2852-
if (timestamp == metadata.getPublishTime()) {
2853-
return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(),
2854-
position.getEntryId(), topicName.getPartitionIndex()));
2855-
} else if (timestamp < metadata.getPublishTime()) {
2842+
return compactionService.getLastMessagePosition().thenCompose(messagePosition -> {
2843+
if (timestamp == messagePosition.publishTime()) {
2844+
return CompletableFuture.completedFuture(new MessageIdImpl(messagePosition.ledgerId(),
2845+
messagePosition.entryId(), topicName.getPartitionIndex()));
2846+
} else if (timestamp < messagePosition.publishTime()) {
28562847
return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
2857-
.thenApply(compactedEntry -> {
2858-
try {
2859-
return new MessageIdImpl(compactedEntry.getLedgerId(),
2860-
compactedEntry.getEntryId(), topicName.getPartitionIndex());
2861-
} finally {
2862-
compactedEntry.release();
2863-
}
2864-
});
2848+
.thenApply(__ -> new MessageIdImpl(__.getLedgerId(), __.getEntryId(),
2849+
topicName.getPartitionIndex()));
28652850
} else {
28662851
return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
28672852
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 8 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import io.netty.util.concurrent.FastThreadLocal;
4242
import io.netty.util.concurrent.Promise;
4343
import io.netty.util.concurrent.ScheduledFuture;
44-
import java.io.IOException;
4544
import java.net.InetSocketAddress;
4645
import java.net.SocketAddress;
4746
import java.util.ArrayList;
@@ -137,7 +136,6 @@
137136
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
138137
import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
139138
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
140-
import org.apache.pulsar.common.api.proto.CompressionType;
141139
import org.apache.pulsar.common.api.proto.FeatureFlags;
142140
import org.apache.pulsar.common.api.proto.KeySharedMeta;
143141
import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -148,10 +146,7 @@
148146
import org.apache.pulsar.common.api.proto.ProtocolVersion;
149147
import org.apache.pulsar.common.api.proto.Schema;
150148
import org.apache.pulsar.common.api.proto.ServerError;
151-
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
152149
import org.apache.pulsar.common.api.proto.TxnAction;
153-
import org.apache.pulsar.common.compression.CompressionCodec;
154-
import org.apache.pulsar.common.compression.CompressionCodecProvider;
155150
import org.apache.pulsar.common.intercept.InterceptException;
156151
import org.apache.pulsar.common.lookup.data.LookupData;
157152
import org.apache.pulsar.common.naming.Metadata;
@@ -2275,7 +2270,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
22752270
.thenApply(lastPosition -> {
22762271
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
22772272

2278-
Position markDeletePosition = null;
2273+
Position markDeletePosition = PositionFactory.EARLIEST;
22792274
if (consumer.getSubscription() instanceof PersistentSubscription) {
22802275
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
22812276
.getMarkDeletedPosition();
@@ -2336,8 +2331,7 @@ private void getLargestBatchIndexWhenPossible(
23362331
} else {
23372332
// if readCompacted is false, we need to return MessageId.earliest
23382333
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
2339-
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
2340-
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
2334+
markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()));
23412335
}
23422336
return;
23432337
}
@@ -2396,47 +2390,19 @@ public String toString() {
23962390

23972391
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
23982392
lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
2399-
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
2400-
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
2393+
markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()));
24012394
}
24022395
});
24032396
});
24042397
}
24052398

24062399
private void handleLastMessageIdFromCompactionService(PersistentTopic persistentTopic, long requestId,
24072400
int partitionIndex, Position markDeletePosition) {
2408-
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> {
2409-
if (entry != null) {
2410-
try {
2411-
// in this case, all the data has been compacted, so return the last position
2412-
// in the compacted ledger to the client
2413-
ByteBuf payload = entry.getDataBuffer();
2414-
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
2415-
int largestBatchIndex;
2416-
try {
2417-
largestBatchIndex = calculateTheLastBatchIndexInBatch(metadata, payload);
2418-
} catch (IOException ioEx) {
2419-
writeAndFlush(Commands.newError(requestId, ServerError.MetadataError,
2420-
"Failed to deserialize batched message from the last entry of the compacted Ledger: "
2421-
+ ioEx.getMessage()));
2422-
return;
2423-
}
2424-
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
2425-
entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex,
2426-
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
2427-
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
2428-
} finally {
2429-
entry.release();
2430-
}
2431-
} else {
2432-
// in this case, the ledgers been removed except the current ledger
2433-
// and current ledger without any data
2434-
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
2435-
-1, -1, partitionIndex, -1,
2436-
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
2437-
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
2438-
}
2439-
}).exceptionally(ex -> {
2401+
persistentTopic.getTopicCompactionService().getLastMessagePosition().thenAccept(position ->
2402+
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, position.ledgerId(), position.entryId(),
2403+
partitionIndex, position.batchIndex(), markDeletePosition.getLedgerId(),
2404+
markDeletePosition.getEntryId()))
2405+
).exceptionally(ex -> {
24402406
writeAndFlush(Commands.newError(
24412407
requestId, ServerError.MetadataError,
24422408
"Failed to read last entry of the compacted Ledger "
@@ -2445,33 +2411,6 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent
24452411
});
24462412
}
24472413

2448-
private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
2449-
int batchSize = metadata.getNumMessagesInBatch();
2450-
if (batchSize <= 1){
2451-
return -1;
2452-
}
2453-
if (metadata.hasCompression()) {
2454-
var tmp = payload;
2455-
CompressionType compressionType = metadata.getCompression();
2456-
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
2457-
int uncompressedSize = metadata.getUncompressedSize();
2458-
payload = codec.decode(payload, uncompressedSize);
2459-
tmp.release();
2460-
}
2461-
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
2462-
int lastBatchIndexInBatch = -1;
2463-
for (int i = 0; i < batchSize; i++){
2464-
ByteBuf singleMessagePayload =
2465-
Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
2466-
singleMessagePayload.release();
2467-
if (singleMessageMetadata.isCompactedOut()){
2468-
continue;
2469-
}
2470-
lastBatchIndexInBatch = i;
2471-
}
2472-
return lastBatchIndexInBatch;
2473-
}
2474-
24752414
private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
24762415
NamespaceOperation operation) {
24772416
if (!service.isAuthorizationEnabled()) {

pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
151151
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
152152
try {
153153
int batchSize = metadata.getNumMessagesInBatch();
154-
int messagesRetained = 0;
154+
final var retainedBatchIndexes = new ArrayList<Integer>();
155155

156156
SingleMessageMetadata emptyMetadata = new SingleMessageMetadata().setCompactedOut(true);
157157
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
@@ -169,7 +169,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
169169
Unpooled.EMPTY_BUFFER, batchBuffer);
170170
} else if (!singleMessageMetadata.hasPartitionKey()) {
171171
if (retainNullKey) {
172-
messagesRetained++;
172+
retainedBatchIndexes.add(i);
173173
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
174174
singleMessagePayload, batchBuffer);
175175
} else {
@@ -178,7 +178,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
178178
}
179179
} else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
180180
&& singleMessagePayload.readableBytes() > 0) {
181-
messagesRetained++;
181+
retainedBatchIndexes.add(i);
182182
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
183183
singleMessagePayload, batchBuffer);
184184
} else {
@@ -189,10 +189,14 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
189189
singleMessagePayload.release();
190190
}
191191

192-
if (messagesRetained > 0) {
192+
if (!retainedBatchIndexes.isEmpty()) {
193193
int newUncompressedSize = batchBuffer.readableBytes();
194194
ByteBuf compressedPayload = codec.encode(batchBuffer);
195195

196+
metadata.clearCompactedBatchIndexes();
197+
for (int index : retainedBatchIndexes) {
198+
metadata.addCompactedBatchIndexe(index);
199+
}
196200
metadata.setUncompressedSize(newUncompressedSize);
197201

198202
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -345,18 +345,23 @@ CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> predicate) {
345345
var compactedTopicContextFuture = this.getCompactedTopicContextFuture();
346346

347347
if (compactedTopicContextFuture == null) {
348-
return CompletableFuture.completedFuture(null);
348+
return CompletableFuture.failedFuture(new IllegalStateException(
349+
"CompactedTopicContext is not initialized"));
349350
}
350351
return compactedTopicContextFuture.thenCompose(compactedTopicContext -> {
351352
LedgerHandle lh = compactedTopicContext.getLedger();
352353
CompletableFuture<Long> promise = new CompletableFuture<>();
353354
findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh);
354-
return promise.thenCompose(index -> {
355-
if (index == null) {
356-
return CompletableFuture.completedFuture(null);
355+
return promise.thenCompose(index -> readEntries(lh, index, index).thenApply(entries -> {
356+
if (entries.size() != 1) {
357+
for (final var entry : entries) {
358+
entry.release();
359+
}
360+
throw new IllegalStateException("Read " + entries.size() + " entries from the compacted ledger "
361+
+ lh + " entry " + index);
357362
}
358-
return readEntries(lh, index, index).thenApply(entries -> entries.get(0));
359-
});
363+
return entries.get(0);
364+
}));
360365
});
361366
}
362367
private static void findFirstMatchIndexLoop(final Predicate<Entry> predicate,

0 commit comments

Comments
 (0)