4141import io .netty .util .concurrent .FastThreadLocal ;
4242import io .netty .util .concurrent .Promise ;
4343import io .netty .util .concurrent .ScheduledFuture ;
44- import java .io .IOException ;
4544import java .net .InetSocketAddress ;
4645import java .net .SocketAddress ;
4746import java .util .ArrayList ;
137136import org .apache .pulsar .common .api .proto .CommandUnsubscribe ;
138137import org .apache .pulsar .common .api .proto .CommandWatchTopicList ;
139138import org .apache .pulsar .common .api .proto .CommandWatchTopicListClose ;
140- import org .apache .pulsar .common .api .proto .CompressionType ;
141139import org .apache .pulsar .common .api .proto .FeatureFlags ;
142140import org .apache .pulsar .common .api .proto .KeySharedMeta ;
143141import org .apache .pulsar .common .api .proto .KeySharedMode ;
148146import org .apache .pulsar .common .api .proto .ProtocolVersion ;
149147import org .apache .pulsar .common .api .proto .Schema ;
150148import org .apache .pulsar .common .api .proto .ServerError ;
151- import org .apache .pulsar .common .api .proto .SingleMessageMetadata ;
152149import org .apache .pulsar .common .api .proto .TxnAction ;
153- import org .apache .pulsar .common .compression .CompressionCodec ;
154- import org .apache .pulsar .common .compression .CompressionCodecProvider ;
155150import org .apache .pulsar .common .intercept .InterceptException ;
156151import org .apache .pulsar .common .lookup .data .LookupData ;
157152import org .apache .pulsar .common .naming .Metadata ;
@@ -2275,7 +2270,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
22752270 .thenApply (lastPosition -> {
22762271 int partitionIndex = TopicName .getPartitionIndex (topic .getName ());
22772272
2278- Position markDeletePosition = null ;
2273+ Position markDeletePosition = PositionFactory . EARLIEST ;
22792274 if (consumer .getSubscription () instanceof PersistentSubscription ) {
22802275 markDeletePosition = ((PersistentSubscription ) consumer .getSubscription ()).getCursor ()
22812276 .getMarkDeletedPosition ();
@@ -2336,8 +2331,7 @@ private void getLargestBatchIndexWhenPossible(
23362331 } else {
23372332 // if readCompacted is false, we need to return MessageId.earliest
23382333 writeAndFlush (Commands .newGetLastMessageIdResponse (requestId , -1 , -1 , partitionIndex , -1 ,
2339- markDeletePosition != null ? markDeletePosition .getLedgerId () : -1 ,
2340- markDeletePosition != null ? markDeletePosition .getEntryId () : -1 ));
2334+ markDeletePosition .getLedgerId (), markDeletePosition .getEntryId ()));
23412335 }
23422336 return ;
23432337 }
@@ -2396,47 +2390,19 @@ public String toString() {
23962390
23972391 writeAndFlush (Commands .newGetLastMessageIdResponse (requestId , lastPosition .getLedgerId (),
23982392 lastPosition .getEntryId (), partitionIndex , largestBatchIndex ,
2399- markDeletePosition != null ? markDeletePosition .getLedgerId () : -1 ,
2400- markDeletePosition != null ? markDeletePosition .getEntryId () : -1 ));
2393+ markDeletePosition .getLedgerId (), markDeletePosition .getEntryId ()));
24012394 }
24022395 });
24032396 });
24042397 }
24052398
24062399 private void handleLastMessageIdFromCompactionService (PersistentTopic persistentTopic , long requestId ,
24072400 int partitionIndex , Position markDeletePosition ) {
2408- persistentTopic .getTopicCompactionService ().readLastCompactedEntry ().thenAccept (entry -> {
2409- if (entry != null ) {
2410- try {
2411- // in this case, all the data has been compacted, so return the last position
2412- // in the compacted ledger to the client
2413- ByteBuf payload = entry .getDataBuffer ();
2414- MessageMetadata metadata = Commands .parseMessageMetadata (payload );
2415- int largestBatchIndex ;
2416- try {
2417- largestBatchIndex = calculateTheLastBatchIndexInBatch (metadata , payload );
2418- } catch (IOException ioEx ) {
2419- writeAndFlush (Commands .newError (requestId , ServerError .MetadataError ,
2420- "Failed to deserialize batched message from the last entry of the compacted Ledger: "
2421- + ioEx .getMessage ()));
2422- return ;
2423- }
2424- writeAndFlush (Commands .newGetLastMessageIdResponse (requestId ,
2425- entry .getLedgerId (), entry .getEntryId (), partitionIndex , largestBatchIndex ,
2426- markDeletePosition != null ? markDeletePosition .getLedgerId () : -1 ,
2427- markDeletePosition != null ? markDeletePosition .getEntryId () : -1 ));
2428- } finally {
2429- entry .release ();
2430- }
2431- } else {
2432- // in this case, the ledgers been removed except the current ledger
2433- // and current ledger without any data
2434- writeAndFlush (Commands .newGetLastMessageIdResponse (requestId ,
2435- -1 , -1 , partitionIndex , -1 ,
2436- markDeletePosition != null ? markDeletePosition .getLedgerId () : -1 ,
2437- markDeletePosition != null ? markDeletePosition .getEntryId () : -1 ));
2438- }
2439- }).exceptionally (ex -> {
2401+ persistentTopic .getTopicCompactionService ().getLastMessagePosition ().thenAccept (position ->
2402+ writeAndFlush (Commands .newGetLastMessageIdResponse (requestId , position .ledgerId (), position .entryId (),
2403+ partitionIndex , position .batchIndex (), markDeletePosition .getLedgerId (),
2404+ markDeletePosition .getEntryId ()))
2405+ ).exceptionally (ex -> {
24402406 writeAndFlush (Commands .newError (
24412407 requestId , ServerError .MetadataError ,
24422408 "Failed to read last entry of the compacted Ledger "
@@ -2445,33 +2411,6 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent
24452411 });
24462412 }
24472413
2448- private int calculateTheLastBatchIndexInBatch (MessageMetadata metadata , ByteBuf payload ) throws IOException {
2449- int batchSize = metadata .getNumMessagesInBatch ();
2450- if (batchSize <= 1 ){
2451- return -1 ;
2452- }
2453- if (metadata .hasCompression ()) {
2454- var tmp = payload ;
2455- CompressionType compressionType = metadata .getCompression ();
2456- CompressionCodec codec = CompressionCodecProvider .getCompressionCodec (compressionType );
2457- int uncompressedSize = metadata .getUncompressedSize ();
2458- payload = codec .decode (payload , uncompressedSize );
2459- tmp .release ();
2460- }
2461- SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata ();
2462- int lastBatchIndexInBatch = -1 ;
2463- for (int i = 0 ; i < batchSize ; i ++){
2464- ByteBuf singleMessagePayload =
2465- Commands .deSerializeSingleMessageInBatch (payload , singleMessageMetadata , i , batchSize );
2466- singleMessagePayload .release ();
2467- if (singleMessageMetadata .isCompactedOut ()){
2468- continue ;
2469- }
2470- lastBatchIndexInBatch = i ;
2471- }
2472- return lastBatchIndexInBatch ;
2473- }
2474-
24752414 private CompletableFuture <Boolean > isNamespaceOperationAllowed (NamespaceName namespaceName ,
24762415 NamespaceOperation operation ) {
24772416 if (!service .isAuthorizationEnabled ()) {
0 commit comments