@@ -2304,28 +2304,56 @@ public void getReplicationClusters(@Suspended final AsyncResponse asyncResponse,
23042304 @ PathParam ("tenant" ) String tenant ,
23052305 @ PathParam ("namespace" ) String namespace ,
23062306 @ PathParam ("topic" ) @ Encoded String encodedTopic ,
2307+ @ QueryParam ("isGlobal" ) @ DefaultValue ("false" ) boolean isGlobal ,
23072308 @ QueryParam ("applied" ) @ DefaultValue ("false" ) boolean applied ,
23082309 @ ApiParam (value = "Whether leader broker redirected this call to this broker. "
23092310 + "For internal use." )
23102311 @ QueryParam ("authoritative" ) @ DefaultValue ("false" ) boolean authoritative ) {
23112312 validateTopicName (tenant , namespace , encodedTopic );
23122313 validateTopicPolicyOperationAsync (topicName , PolicyName .REPLICATION , PolicyOperation .READ )
23132314 .thenCompose (__ -> preValidation (authoritative ))
2314- .thenCompose (__ -> getTopicPoliciesAsyncWithRetry (topicName ))
2315- .thenAccept (op -> {
2316- asyncResponse .resume (op .map (TopicPolicies ::getReplicationClustersSet ).orElseGet (() -> {
2317- if (applied ) {
2318- return getNamespacePolicies (namespaceName ).replication_clusters ;
2315+ .thenCompose (__ -> {
2316+ if (applied ) {
2317+ return getAppliedReplicatedClusters ();
2318+ }
2319+ return getTopicPoliciesAsyncWithRetry (topicName , isGlobal ).thenApply (policy -> {
2320+ if (policy != null && policy .isPresent ()
2321+ && CollectionUtils .isNotEmpty (policy .get ().getReplicationClustersSet ())) {
2322+ return policy .get ().getReplicationClustersSet ();
23192323 }
23202324 return null ;
2321- })) ;
2325+ });
23222326 })
2327+ .thenAccept (asyncResponse ::resume )
23232328 .exceptionally (ex -> {
23242329 handleTopicPolicyException ("getReplicationClusters" , ex , asyncResponse );
23252330 return null ;
23262331 });
23272332 }
23282333
2334+ private CompletableFuture <Set <String >> getAppliedReplicatedClusters () {
2335+ return getTopicPoliciesAsyncWithRetry (topicName , false )
2336+ .thenCompose (localPolicy -> {
2337+ if (localPolicy != null && localPolicy .isPresent ()
2338+ && CollectionUtils .isNotEmpty (localPolicy .get ().getReplicationClustersSet ())) {
2339+ return CompletableFuture .completedFuture (localPolicy .get ().getReplicationClustersSet ());
2340+ }
2341+ return getTopicPoliciesAsyncWithRetry (topicName , true )
2342+ .thenCompose (globalPolicy -> {
2343+ if (globalPolicy != null && globalPolicy .isPresent ()
2344+ && CollectionUtils .isNotEmpty (globalPolicy .get ().getReplicationClustersSet ())) {
2345+ return CompletableFuture .completedFuture (globalPolicy .get ().getReplicationClustersSet ());
2346+ }
2347+ return getNamespacePoliciesAsync (namespaceName ).thenApply (v -> {
2348+ if (v != null ) {
2349+ return v .replication_clusters ;
2350+ }
2351+ return null ;
2352+ });
2353+ });
2354+ });
2355+ }
2356+
23292357 @ POST
23302358 @ Path ("/{tenant}/{namespace}/{topic}/replication" )
23312359 @ ApiOperation (value = "Set the replication clusters for a topic." )
@@ -2341,32 +2369,14 @@ public void setReplicationClusters(
23412369 @ Suspended final AsyncResponse asyncResponse ,
23422370 @ PathParam ("tenant" ) String tenant , @ PathParam ("namespace" ) String namespace ,
23432371 @ PathParam ("topic" ) @ Encoded String encodedTopic ,
2372+ @ QueryParam ("isGlobal" ) @ DefaultValue ("false" ) boolean isGlobal ,
23442373 @ ApiParam (value = "Whether leader broker redirected this call to this broker. For internal use." )
23452374 @ QueryParam ("authoritative" ) @ DefaultValue ("false" ) boolean authoritative ,
23462375 @ ApiParam (value = "List of replication clusters" , required = true ) List <String > clusterIds ) {
23472376 validateTopicName (tenant , namespace , encodedTopic );
23482377 validateTopicPolicyOperationAsync (topicName , PolicyName .REPLICATION , PolicyOperation .WRITE )
2349- .thenCompose (__ -> preValidation (authoritative )).thenCompose (__ -> {
2350- // Set a topic-level replicated clusters that do not contain local cluster is not meaningful, except
2351- // the following scenario: User has two clusters, which enabled Geo-Replication through a global
2352- // metadata store, the resources named partitioned topic metadata and the resource namespace-level
2353- // "replicated clusters" are shared between multi clusters. Pulsar can hardly delete a specify
2354- // partitioned topic. To support this use case, the following steps can implement it:
2355- // 1. set a global topic-level replicated clusters that do not contain local cluster.
2356- // 2. the local cluster will remove the subtopics automatically, and remove the schemas and local
2357- // topic policies. Just leave the global topic policies there, which prevents the namespace level
2358- // replicated clusters policy taking affect.
2359- // TODO But the API "pulsar-admin topics set-replication-clusters" does not support global policy,
2360- // to support this scenario, a PIP is needed.
2361- boolean clustersDoesNotContainsLocal = CollectionUtils .isEmpty (clusterIds )
2362- || !clusterIds .contains (pulsar ().getConfig ().getClusterName ());
2363- if (clustersDoesNotContainsLocal ) {
2364- return FutureUtil .failedFuture (new RestException (Response .Status .PRECONDITION_FAILED ,
2365- "Can not remove local cluster from the topic-level replication clusters policy" ));
2366- }
2367- return CompletableFuture .completedFuture (null );
2368- })
2369- .thenCompose (__ -> internalSetReplicationClusters (clusterIds ))
2378+ .thenCompose (__ -> preValidation (authoritative ))
2379+ .thenCompose (__ -> internalSetReplicationClusters (clusterIds , isGlobal ))
23702380 .thenRun (() -> asyncResponse .resume (Response .noContent ().build ()))
23712381 .exceptionally (ex -> {
23722382 handleTopicPolicyException ("setReplicationClusters" , ex , asyncResponse );
@@ -2387,12 +2397,13 @@ public void setReplicationClusters(
23872397 public void removeReplicationClusters (@ Suspended final AsyncResponse asyncResponse ,
23882398 @ PathParam ("tenant" ) String tenant , @ PathParam ("namespace" ) String namespace ,
23892399 @ PathParam ("topic" ) @ Encoded String encodedTopic ,
2400+ @ QueryParam ("isGlobal" ) @ DefaultValue ("false" ) boolean isGlobal ,
23902401 @ ApiParam (value = "Whether leader broker redirected this call to this broker. For internal use." )
23912402 @ QueryParam ("authoritative" ) @ DefaultValue ("false" ) boolean authoritative ) {
23922403 validateTopicName (tenant , namespace , encodedTopic );
23932404 validateTopicPolicyOperationAsync (topicName , PolicyName .REPLICATION , PolicyOperation .WRITE )
23942405 .thenCompose (__ -> preValidation (authoritative ))
2395- .thenCompose (__ -> internalRemoveReplicationClusters ())
2406+ .thenCompose (__ -> internalRemoveReplicationClusters (isGlobal ))
23962407 .thenRun (() -> asyncResponse .resume (Response .noContent ().build ()))
23972408 .exceptionally (ex -> {
23982409 handleTopicPolicyException ("removeReplicationClusters" , ex , asyncResponse );
0 commit comments