@@ -579,8 +579,7 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
579579
580580 return policyCacheInitMap .computeIfAbsent (namespace , (k ) -> {
581581 final CompletableFuture <SystemTopicClient .Reader <PulsarEvent >> readerCompletableFuture =
582- createSystemTopicClient (namespace );
583- readerCaches .put (namespace , readerCompletableFuture );
582+ newReader (namespace );
584583 final CompletableFuture <Void > initFuture = readerCompletableFuture
585584 .thenCompose (reader -> {
586585 final CompletableFuture <Void > stageFuture = new CompletableFuture <>();
@@ -594,9 +593,8 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
594593 if (closed .get ()) {
595594 return null ;
596595 }
597- log .error ("[{}] Failed to create reader on __change_events topic" ,
598- namespace , ex );
599- cleanCacheAndCloseReader (namespace , false );
596+ cleanPoliciesCacheInitMap (
597+ namespace , readerCompletableFuture .isCompletedExceptionally ());
600598 } catch (Throwable cleanupEx ) {
601599 // Adding this catch to avoid break callback chain
602600 log .error ("[{}] Failed to cleanup reader on __change_events topic" ,
@@ -610,6 +608,20 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
610608 });
611609 }
612610
611+ private CompletableFuture <SystemTopicClient .Reader <PulsarEvent >> newReader (NamespaceName ns ) {
612+ return readerCaches .compute (ns , (__ , existingFuture ) -> {
613+ if (existingFuture == null ) {
614+ return createSystemTopicClient (ns );
615+ }
616+
617+ if (existingFuture .isDone () && existingFuture .isCompletedExceptionally ()) {
618+ return existingFuture .exceptionallyCompose (ex ->
619+ isAlreadyClosedException (ex ) ? existingFuture : createSystemTopicClient (ns ));
620+ }
621+ return existingFuture ;
622+ });
623+ }
624+
613625 protected CompletableFuture <SystemTopicClient .Reader <PulsarEvent >> createSystemTopicClient (
614626 NamespaceName namespace ) {
615627 if (closed .get ()) {
@@ -633,7 +645,9 @@ private void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
633645 }
634646 AtomicInteger bundlesCount = ownedBundlesCountPerNamespace .get (namespace );
635647 if (bundlesCount == null || bundlesCount .decrementAndGet () <= 0 ) {
636- cleanCacheAndCloseReader (namespace , true , true );
648+ cleanPoliciesCacheInitMap (namespace , true );
649+ cleanWriterCache (namespace );
650+ cleanOwnedBundlesCount (namespace );
637651 }
638652 }
639653
@@ -665,15 +679,16 @@ public boolean test(NamespaceBundle namespaceBundle) {
665679 private void initPolicesCache (SystemTopicClient .Reader <PulsarEvent > reader , CompletableFuture <Void > future ) {
666680 if (closed .get ()) {
667681 future .completeExceptionally (new BrokerServiceException (getClass ().getName () + " is closed." ));
668- cleanCacheAndCloseReader (reader .getSystemTopic ().getTopicName ().getNamespaceObject (), false );
682+ cleanPoliciesCacheInitMap (reader .getSystemTopic ().getTopicName ().getNamespaceObject (), true );
669683 return ;
670684 }
671685 reader .hasMoreEventsAsync ().whenComplete ((hasMore , ex ) -> {
672686 if (ex != null ) {
673687 log .error ("[{}] Failed to check the move events for the system topic" ,
674688 reader .getSystemTopic ().getTopicName (), ex );
675689 future .completeExceptionally (ex );
676- cleanCacheAndCloseReader (reader .getSystemTopic ().getTopicName ().getNamespaceObject (), false );
690+ cleanPoliciesCacheInitMap (reader .getSystemTopic ().getTopicName ().getNamespaceObject (),
691+ isAlreadyClosedException (ex ));
677692 return ;
678693 }
679694 if (hasMore ) {
@@ -692,7 +707,8 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
692707 log .error ("[{}] Failed to read event from the system topic." ,
693708 reader .getSystemTopic ().getTopicName (), e );
694709 future .completeExceptionally (e );
695- cleanCacheAndCloseReader (reader .getSystemTopic ().getTopicName ().getNamespaceObject (), false );
710+ cleanPoliciesCacheInitMap (reader .getSystemTopic ().getTopicName ().getNamespaceObject (),
711+ isAlreadyClosedException (ex ));
696712 return null ;
697713 });
698714 } else {
@@ -718,10 +734,45 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
718734 });
719735 }
720736
721- private void cleanCacheAndCloseReader (@ NonNull NamespaceName namespace , boolean cleanOwnedBundlesCount ) {
722- cleanCacheAndCloseReader (namespace , cleanOwnedBundlesCount , false );
737+
738+ private void cleanPoliciesCacheInitMap (@ NonNull NamespaceName namespace , boolean closeReader ) {
739+ if (!closeReader ) {
740+ policyCacheInitMap .remove (namespace );
741+ return ;
742+ }
743+
744+ TopicPolicyMessageHandlerTracker topicPolicyMessageHandlerTracker =
745+ topicPolicyMessageHandlerTrackers .remove (namespace );
746+ if (topicPolicyMessageHandlerTracker != null ) {
747+ topicPolicyMessageHandlerTracker .close ();
748+ }
749+
750+ CompletableFuture <SystemTopicClient .Reader <PulsarEvent >> readerFuture = readerCaches .remove (namespace );
751+ policyCacheInitMap .compute (namespace , (k , v ) -> {
752+ policiesCache .entrySet ().removeIf (entry -> Objects .equals (entry .getKey ().getNamespaceObject (), namespace ));
753+ globalPoliciesCache .entrySet ()
754+ .removeIf (entry -> Objects .equals (entry .getKey ().getNamespaceObject (), namespace ));
755+ return null ;
756+ });
757+ if (readerFuture != null && !readerFuture .isCompletedExceptionally ()) {
758+ readerFuture
759+ .thenCompose (SystemTopicClient .Reader ::closeAsync )
760+ .exceptionally (ex -> {
761+ log .warn ("[{}] Close change_event reader fail." , namespace , ex );
762+ return null ;
763+ });
764+ }
765+ }
766+
767+ private void cleanWriterCache (@ NonNull NamespaceName namespace ) {
768+ writerCaches .synchronous ().invalidate (namespace );
723769 }
724770
771+ private void cleanOwnedBundlesCount (@ NonNull NamespaceName namespace ) {
772+ ownedBundlesCountPerNamespace .remove (namespace );
773+ }
774+
775+
725776 private void cleanCacheAndCloseReader (@ NonNull NamespaceName namespace , boolean cleanOwnedBundlesCount ,
726777 boolean cleanWriterCache ) {
727778 if (cleanWriterCache ) {
@@ -754,6 +805,9 @@ private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, boolean
754805 });
755806 }
756807
808+
809+
810+
757811 /**
758812 * This is an async method for the background reader to continue syncing new messages.
759813 *
@@ -763,7 +817,7 @@ private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, boolean
763817 private void readMorePoliciesAsync (SystemTopicClient .Reader <PulsarEvent > reader ) {
764818 NamespaceName namespaceObject = reader .getSystemTopic ().getTopicName ().getNamespaceObject ();
765819 if (closed .get ()) {
766- cleanCacheAndCloseReader (namespaceObject , false );
820+ cleanPoliciesCacheInitMap (namespaceObject , true );
767821 return ;
768822 }
769823 reader .readNextAsync ()
@@ -784,11 +838,10 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader)
784838 if (ex == null ) {
785839 readMorePoliciesAsync (reader );
786840 } else {
787- Throwable cause = FutureUtil .unwrapCompletionException (ex );
788- if (cause instanceof PulsarClientException .AlreadyClosedException ) {
841+ if (isAlreadyClosedException (ex )) {
789842 log .info ("Closing the topic policies reader for {}" ,
790843 reader .getSystemTopic ().getTopicName ());
791- cleanCacheAndCloseReader (namespaceObject , false );
844+ cleanPoliciesCacheInitMap (namespaceObject , true );
792845 } else {
793846 log .warn ("Read more topic polices exception, read again." , ex );
794847 readMorePoliciesAsync (reader );
@@ -797,6 +850,11 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader)
797850 });
798851 }
799852
853+ private boolean isAlreadyClosedException (Throwable ex ) {
854+ Throwable cause = FutureUtil .unwrapCompletionException (ex );
855+ return cause instanceof PulsarClientException .AlreadyClosedException ;
856+ }
857+
800858 private void refreshTopicPoliciesCache (Message <PulsarEvent > msg ) {
801859 // delete policies
802860 if (msg .getValue () == null ) {
@@ -884,6 +942,11 @@ NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() {
884942 }
885943
886944
945+ @ VisibleForTesting
946+ public Map <NamespaceName , CompletableFuture <SystemTopicClient .Reader <PulsarEvent >>> getReaderCaches () {
947+ return readerCaches ;
948+ }
949+
887950 @ VisibleForTesting
888951 long getPoliciesCacheSize () {
889952 return policiesCache .size ();
0 commit comments