4747import java .util .Map ;
4848import java .util .Optional ;
4949import java .util .Set ;
50+ import java .util .concurrent .CompletableFuture ;
5051import java .util .concurrent .ConcurrentSkipListMap ;
5152import java .util .concurrent .CountDownLatch ;
5253import java .util .concurrent .TimeUnit ;
9293import org .apache .bookkeeper .mledger .proto .MLDataFormats .ManagedLedgerInfo .LedgerInfo ;
9394import org .apache .bookkeeper .mledger .proto .MLDataFormats .MessageRange ;
9495import org .apache .bookkeeper .mledger .proto .MLDataFormats .PositionInfo ;
96+ import org .apache .bookkeeper .mledger .proto .MLDataFormats .StringProperty ;
9597import org .apache .commons .lang3 .tuple .Pair ;
9698import org .apache .pulsar .common .util .collections .BitSetRecyclable ;
9799import org .apache .pulsar .common .util .collections .ConcurrentOpenLongPairRangeSet ;
@@ -108,6 +110,7 @@ public class ManagedCursorImpl implements ManagedCursor {
108110 protected final ManagedLedgerConfig config ;
109111 protected final ManagedLedgerImpl ledger ;
110112 private final String name ;
113+ private volatile Map <String , String > cursorProperties ;
111114 private final BookKeeper .DigestType digestType ;
112115
113116 protected volatile PositionImpl markDeletePosition ;
@@ -278,6 +281,7 @@ public interface VoidCallback {
278281
279282 ManagedCursorImpl (BookKeeper bookkeeper , ManagedLedgerConfig config , ManagedLedgerImpl ledger , String cursorName ) {
280283 this .bookkeeper = bookkeeper ;
284+ this .cursorProperties = Collections .emptyMap ();
281285 this .config = config ;
282286 this .ledger = ledger ;
283287 this .name = cursorName ;
@@ -313,6 +317,52 @@ public Map<String, Long> getProperties() {
313317 return lastMarkDeleteEntry != null ? lastMarkDeleteEntry .properties : Collections .emptyMap ();
314318 }
315319
320+ @ Override
321+ public Map <String , String > getCursorProperties () {
322+ return cursorProperties ;
323+ }
324+
325+ @ Override
326+ public CompletableFuture <Void > setCursorProperties (Map <String , String > cursorProperties ) {
327+ CompletableFuture <Void > updateCursorPropertiesResult = new CompletableFuture <>();
328+ ledger .getStore ().asyncGetCursorInfo (ledger .getName (), name , new MetaStoreCallback <ManagedCursorInfo >() {
329+ @ Override
330+ public void operationComplete (ManagedCursorInfo info , Stat stat ) {
331+ ManagedCursorInfo copy = ManagedCursorInfo
332+ .newBuilder (info )
333+ .clearCursorProperties ()
334+ .addAllCursorProperties (buildStringPropertiesMap (cursorProperties ))
335+ .build ();
336+ ledger .getStore ().asyncUpdateCursorInfo (ledger .getName (),
337+ name , copy , stat , new MetaStoreCallback <Void >() {
338+ @ Override
339+ public void operationComplete (Void result , Stat stat ) {
340+ log .info ("[{}] Updated ledger cursor: {} properties {}" , ledger .getName (),
341+ name , cursorProperties );
342+ ManagedCursorImpl .this .cursorProperties = cursorProperties ;
343+ cursorLedgerStat = stat ;
344+ updateCursorPropertiesResult .complete (result );
345+ }
346+
347+ @ Override
348+ public void operationFailed (MetaStoreException e ) {
349+ log .error ("[{}] Error while updating ledger cursor: {} properties {}" , ledger .getName (),
350+ name , cursorProperties , e );
351+ updateCursorPropertiesResult .completeExceptionally (e );
352+ }
353+ });
354+ }
355+
356+ @ Override
357+ public void operationFailed (MetaStoreException e ) {
358+ log .error ("[{}] Error while updating ledger cursor: {} properties {}" , ledger .getName (),
359+ name , cursorProperties , e );
360+ updateCursorPropertiesResult .completeExceptionally (e );
361+ }
362+ });
363+ return updateCursorPropertiesResult ;
364+ }
365+
316366 @ Override
317367 public boolean putProperty (String key , Long value ) {
318368 if (lastMarkDeleteEntry != null ) {
@@ -361,6 +411,18 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
361411 cursorLedgerStat = stat ;
362412 lastActive = info .getLastActive () != 0 ? info .getLastActive () : lastActive ;
363413
414+
415+ Map <String , String > recoveredCursorProperties = Collections .emptyMap ();
416+ if (info .getCursorPropertiesCount () > 0 ) {
417+ // Recover properties map
418+ recoveredCursorProperties = Maps .newHashMap ();
419+ for (int i = 0 ; i < info .getCursorPropertiesCount (); i ++) {
420+ StringProperty property = info .getCursorProperties (i );
421+ recoveredCursorProperties .put (property .getName (), property .getValue ());
422+ }
423+ }
424+ cursorProperties = recoveredCursorProperties ;
425+
364426 if (info .getCursorsLedgerId () == -1L ) {
365427 // There is no cursor ledger to read the last position from. It means the cursor has been properly
366428 // closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
@@ -380,7 +442,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
380442 }
381443 }
382444
383- recoveredCursor (recoveredPosition , recoveredProperties , null );
445+ recoveredCursor (recoveredPosition , recoveredProperties , recoveredCursorProperties , null );
384446 callback .operationComplete ();
385447 } else {
386448 // Need to proceed and read the last entry in the specified ledger to find out the last position
@@ -410,7 +472,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
410472 log .error ("[{}] Error opening metadata ledger {} for consumer {}: {}" , ledger .getName (), ledgerId , name ,
411473 BKException .getMessage (rc ));
412474 // Rewind to oldest entry available
413- initialize (getRollbackPosition (info ), Collections .emptyMap (), callback );
475+ initialize (getRollbackPosition (info ), Collections .emptyMap (), Collections . emptyMap (), callback );
414476 return ;
415477 } else if (rc != BKException .Code .OK ) {
416478 log .warn ("[{}] Error opening metadata ledger {} for consumer {}: {}" , ledger .getName (), ledgerId , name ,
@@ -426,7 +488,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
426488 log .warn ("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger" ,
427489 ledger .getName (), ledgerId , name );
428490 // Rewind to last cursor snapshot available
429- initialize (getRollbackPosition (info ), Collections .emptyMap (), callback );
491+ initialize (getRollbackPosition (info ), Collections .emptyMap (), cursorProperties , callback );
430492 return ;
431493 }
432494
@@ -438,7 +500,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
438500 log .error ("[{}] Error reading from metadata ledger {} for consumer {}: {}" , ledger .getName (),
439501 ledgerId , name , BKException .getMessage (rc1 ));
440502 // Rewind to oldest entry available
441- initialize (getRollbackPosition (info ), Collections .emptyMap (), callback );
503+ initialize (getRollbackPosition (info ), Collections .emptyMap (), cursorProperties , callback );
442504 return ;
443505 } else if (rc1 != BKException .Code .OK ) {
444506 log .warn ("[{}] Error reading from metadata ledger {} for consumer {}: {}" , ledger .getName (),
@@ -476,7 +538,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
476538 && positionInfo .getBatchedEntryDeletionIndexInfoCount () > 0 ) {
477539 recoverBatchDeletedIndexes (positionInfo .getBatchedEntryDeletionIndexInfoList ());
478540 }
479- recoveredCursor (position , recoveredProperties , lh );
541+ recoveredCursor (position , recoveredProperties , cursorProperties , lh );
480542 callback .operationComplete ();
481543 }, null );
482544 };
@@ -547,6 +609,7 @@ private void recoverBatchDeletedIndexes (
547609 }
548610
549611 private void recoveredCursor (PositionImpl position , Map <String , Long > properties ,
612+ Map <String , String > cursorProperties ,
550613 LedgerHandle recoveredFromCursorLedger ) {
551614 // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
552615 // we need to move to the next existing ledger
@@ -564,7 +627,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
564627 position = ledger .getLastPosition ();
565628 }
566629 log .info ("[{}] Cursor {} recovered to position {}" , ledger .getName (), name , position );
567-
630+ this . cursorProperties = cursorProperties ;
568631 messagesConsumedCounter = -getNumberOfEntries (Range .openClosed (position , ledger .getLastPosition ()));
569632 markDeletePosition = position ;
570633 persistentMarkDeletePosition = position ;
@@ -577,8 +640,9 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
577640 STATE_UPDATER .set (this , State .NoLedger );
578641 }
579642
580- void initialize (PositionImpl position , Map <String , Long > properties , final VoidCallback callback ) {
581- recoveredCursor (position , properties , null );
643+ void initialize (PositionImpl position , Map <String , Long > properties , Map <String , String > cursorProperties ,
644+ final VoidCallback callback ) {
645+ recoveredCursor (position , properties , cursorProperties , null );
582646 if (log .isDebugEnabled ()) {
583647 log .debug ("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}" ,
584648 ledger .getName (), name , messagesConsumedCounter , markDeletePosition , readPosition );
@@ -2383,6 +2447,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
23832447 .setLastActive (lastActive ); //
23842448
23852449 info .addAllProperties (buildPropertiesMap (properties ));
2450+ info .addAllCursorProperties (buildStringPropertiesMap (cursorProperties ));
23862451 if (persistIndividualDeletedMessageRanges ) {
23872452 info .addAllIndividualDeletedMessages (buildIndividualDeletedMessageRanges ());
23882453 if (config .isDeletionAtBatchIndexLevelEnabled ()) {
@@ -2589,7 +2654,22 @@ public void deleteComplete(int rc, Object ctx) {
25892654 }, LedgerMetadataUtils .buildAdditionalMetadataForCursor (name ));
25902655 }
25912656
2592- private List <LongProperty > buildPropertiesMap (Map <String , Long > properties ) {
2657+ private CompletableFuture <Void > deleteLedgerAsync (LedgerHandle ledgerHandle ) {
2658+ ledger .mbean .startCursorLedgerDeleteOp ();
2659+ CompletableFuture <Void > future = new CompletableFuture <>();
2660+ bookkeeper .asyncDeleteLedger (ledgerHandle .getId (), (int rc , Object ctx ) -> {
2661+ future .complete (null );
2662+ ledger .mbean .endCursorLedgerDeleteOp ();
2663+ if (rc != BKException .Code .OK ) {
2664+ log .warn ("[{}] Failed to delete orphan ledger {}" , ledger .getName (),
2665+ ledgerHandle .getId ());
2666+ }
2667+ }, null );
2668+ return future ;
2669+ }
2670+
2671+
2672+ private static List <LongProperty > buildPropertiesMap (Map <String , Long > properties ) {
25932673 if (properties .isEmpty ()) {
25942674 return Collections .emptyList ();
25952675 }
@@ -2603,6 +2683,20 @@ private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
26032683 return longProperties ;
26042684 }
26052685
2686+ private static List <StringProperty > buildStringPropertiesMap (Map <String , String > properties ) {
2687+ if (properties == null || properties .isEmpty ()) {
2688+ return Collections .emptyList ();
2689+ }
2690+
2691+ List <StringProperty > stringProperties = Lists .newArrayList ();
2692+ properties .forEach ((name , value ) -> {
2693+ StringProperty sp = StringProperty .newBuilder ().setName (name ).setValue (value ).build ();
2694+ stringProperties .add (sp );
2695+ });
2696+
2697+ return stringProperties ;
2698+ }
2699+
26062700 private List <MLDataFormats .MessageRange > buildIndividualDeletedMessageRanges () {
26072701 lock .readLock ().lock ();
26082702 try {
0 commit comments