Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2837,31 +2837,16 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestampAsync(long
"Get message ID by timestamp on a non-persistent topic is not allowed");
}
final PersistentTopic persistentTopic = (PersistentTopic) topic;
final var compactionService = persistentTopic.getTopicCompactionService();

return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> {
if (lastEntry == null) {
return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
}
MessageMetadata metadata;
Position position = lastEntry.getPosition();
try {
metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer());
} finally {
lastEntry.release();
}
if (timestamp == metadata.getPublishTime()) {
return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(),
position.getEntryId(), topicName.getPartitionIndex()));
} else if (timestamp < metadata.getPublishTime()) {
return compactionService.getLastMessagePosition().thenCompose(messagePosition -> {
if (timestamp == messagePosition.publishTime()) {
return CompletableFuture.completedFuture(new MessageIdImpl(messagePosition.ledgerId(),
messagePosition.entryId(), topicName.getPartitionIndex()));
} else if (timestamp < messagePosition.publishTime()) {
return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
.thenApply(compactedEntry -> {
try {
return new MessageIdImpl(compactedEntry.getLedgerId(),
compactedEntry.getEntryId(), topicName.getPartitionIndex());
} finally {
compactedEntry.release();
}
});
.thenApply(__ -> new MessageIdImpl(__.getLedgerId(), __.getEntryId(),
topicName.getPartitionIndex()));
} else {
return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
Expand Down Expand Up @@ -138,7 +137,6 @@
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
Expand All @@ -149,10 +147,7 @@
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.Metadata;
Expand Down Expand Up @@ -2283,7 +2278,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
.thenApply(lastPosition -> {
int partitionIndex = TopicName.getPartitionIndex(topic.getName());

Position markDeletePosition = null;
Position markDeletePosition = PositionFactory.EARLIEST;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
Expand Down Expand Up @@ -2344,8 +2339,7 @@ private void getLargestBatchIndexWhenPossible(
} else {
// if readCompacted is false, we need to return MessageId.earliest
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()));
}
return;
}
Expand Down Expand Up @@ -2404,47 +2398,19 @@ public String toString() {

writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()));
}
});
});
}

private void handleLastMessageIdFromCompactionService(PersistentTopic persistentTopic, long requestId,
int partitionIndex, Position markDeletePosition) {
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> {
if (entry != null) {
try {
// in this case, all the data has been compacted, so return the last position
// in the compacted ledger to the client
ByteBuf payload = entry.getDataBuffer();
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
int largestBatchIndex;
try {
largestBatchIndex = calculateTheLastBatchIndexInBatch(metadata, payload);
} catch (IOException ioEx) {
writeAndFlush(Commands.newError(requestId, ServerError.MetadataError,
"Failed to deserialize batched message from the last entry of the compacted Ledger: "
+ ioEx.getMessage()));
return;
}
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
} finally {
entry.release();
}
} else {
// in this case, the ledgers been removed except the current ledger
// and current ledger without any data
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-1, -1, partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
}
}).exceptionally(ex -> {
persistentTopic.getTopicCompactionService().getLastMessagePosition().thenAccept(position ->
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, position.ledgerId(), position.entryId(),
partitionIndex, position.batchIndex(), markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId()))
).exceptionally(ex -> {
writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to read last entry of the compacted Ledger "
Expand All @@ -2453,33 +2419,6 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent
});
}

private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
int batchSize = metadata.getNumMessagesInBatch();
if (batchSize <= 1){
return -1;
}
if (metadata.hasCompression()) {
var tmp = payload;
CompressionType compressionType = metadata.getCompression();
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = metadata.getUncompressedSize();
payload = codec.decode(payload, uncompressedSize);
tmp.release();
}
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
int lastBatchIndexInBatch = -1;
for (int i = 0; i < batchSize; i++){
ByteBuf singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
singleMessagePayload.release();
if (singleMessageMetadata.isCompactedOut()){
continue;
}
lastBatchIndexInBatch = i;
}
return lastBatchIndexInBatch;
}

private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
NamespaceOperation operation) {
if (!service.isAuthorizationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
try {
int batchSize = metadata.getNumMessagesInBatch();
int messagesRetained = 0;
final var retainedBatchIndexes = new ArrayList<Integer>();

SingleMessageMetadata emptyMetadata = new SingleMessageMetadata().setCompactedOut(true);
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
Expand All @@ -169,7 +169,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
Unpooled.EMPTY_BUFFER, batchBuffer);
} else if (!singleMessageMetadata.hasPartitionKey()) {
if (retainNullKey) {
messagesRetained++;
retainedBatchIndexes.add(i);
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
Expand All @@ -178,7 +178,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
}
} else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
retainedBatchIndexes.add(i);
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
Expand All @@ -189,10 +189,14 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
singleMessagePayload.release();
}

if (messagesRetained > 0) {
if (!retainedBatchIndexes.isEmpty()) {
int newUncompressedSize = batchBuffer.readableBytes();
ByteBuf compressedPayload = codec.encode(batchBuffer);

metadata.clearCompactedBatchIndexes();
for (int index : retainedBatchIndexes) {
metadata.addCompactedBatchIndexe(index);
}
metadata.setUncompressedSize(newUncompressedSize);

ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,18 +345,23 @@ CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> predicate) {
var compactedTopicContextFuture = this.getCompactedTopicContextFuture();

if (compactedTopicContextFuture == null) {
return CompletableFuture.completedFuture(null);
return CompletableFuture.failedFuture(new IllegalStateException(
"CompactedTopicContext is not initialized"));
}
return compactedTopicContextFuture.thenCompose(compactedTopicContext -> {
LedgerHandle lh = compactedTopicContext.getLedger();
CompletableFuture<Long> promise = new CompletableFuture<>();
findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh);
return promise.thenCompose(index -> {
if (index == null) {
return CompletableFuture.completedFuture(null);
return promise.thenCompose(index -> readEntries(lh, index, index).thenApply(entries -> {
if (entries.size() != 1) {
for (final var entry : entries) {
entry.release();
}
throw new IllegalStateException("Read " + entries.size() + " entries from the compacted ledger "
+ lh + " entry " + index);
}
return readEntries(lh, index, index).thenApply(entries -> entries.get(0));
});
return entries.get(0);
}));
});
}
private static void findFirstMatchIndexLoop(final Predicate<Entry> predicate,
Expand Down
Loading
Loading