Skip to content

Commit 1e57827

Browse files
authored
[improve][admin] PIP-422 part 1: Support global topic-level replicated clusters policy (#24390)
1 parent e292b77 commit 1e57827

File tree

13 files changed

+813
-95
lines changed

13 files changed

+813
-95
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3360,7 +3360,7 @@ protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQu
33603360
});
33613361
}
33623362

3363-
protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
3363+
protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds, boolean isGlobal) {
33643364
if (CollectionUtils.isEmpty(clusterIds)) {
33653365
return CompletableFuture.failedFuture(new RestException(Status.PRECONDITION_FAILED,
33663366
"ClusterIds should not be null or empty"));
@@ -3373,6 +3373,47 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
33733373
"Cannot specify global in the list of replication clusters");
33743374
}
33753375
})
3376+
.thenCompose(__ -> {
3377+
// Set a topic-level replicated clusters that do not contain local cluster is not meaningful, except
3378+
// the following scenario: User has two clusters, which enabled Geo-Replication through a global
3379+
// metadata store, the resources named partitioned topic metadata and the resource namespace-level
3380+
// "replicated clusters" are shared between multi clusters. Pulsar can hardly delete a specify
3381+
// partitioned topic. To support this use case, the following steps can implement it:
3382+
// 1. set a global topic-level replicated clusters that do not contain local cluster.
3383+
// 2. the local cluster will remove the subtopics automatically, and remove the schemas and local
3384+
// topic policies. Just leave the global topic policies there, which prevents the namespace level
3385+
// replicated clusters policy taking affect.
3386+
boolean clustersDoesNotContainsLocal = CollectionUtils.isEmpty(clusterIds)
3387+
|| !clusterIds.contains(pulsar().getConfig().getClusterName());
3388+
if (clustersDoesNotContainsLocal && !isGlobal) {
3389+
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED,
3390+
"Can not remove local cluster from the local topic-level replication clusters policy"));
3391+
}
3392+
if (isGlobal) {
3393+
return getNamespacePoliciesAsync(namespaceName).thenCompose(v -> {
3394+
// Since global policies depends on namespace level replication, users only can set global
3395+
// policies when namespace level replication exists. Otherwise, the policies will never be
3396+
// copied to the remote side, which is meaningless.
3397+
if (v == null || v.replication_clusters.size() < 2) {
3398+
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED,
3399+
"Please do not use the global topic level policy when namespace-level replication"
3400+
+ " is not enabled, because the global level policy relies on namespace-level"
3401+
+ " replication"));
3402+
}
3403+
for (String clusterId : clusterIds) {
3404+
if (v.replication_clusters.contains(clusterId)) {
3405+
continue;
3406+
}
3407+
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED,
3408+
"The policies at the global topic level will only be copied to the clusters"
3409+
+ " included in the namespace level replication. Therefore, please do not set the"
3410+
+ " policies including other clusters"));
3411+
}
3412+
return CompletableFuture.completedFuture(null);
3413+
});
3414+
}
3415+
return CompletableFuture.completedFuture(null);
3416+
})
33763417
.thenCompose(__ -> clustersAsync())
33773418
.thenCompose(clusters -> {
33783419
List<CompletableFuture<Void>> futures = new ArrayList<>(replicationClusters.size());
@@ -3405,9 +3446,10 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
34053446
topicMetaOp.get().partitions).values());
34063447
});
34073448
}).thenCompose(__ ->
3408-
getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
3449+
getTopicPoliciesAsyncWithRetry(topicName, isGlobal).thenCompose(op -> {
34093450
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
34103451
topicPolicies.setReplicationClusters(clusterIds);
3452+
topicPolicies.setIsGlobal(isGlobal);
34113453
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
34123454
.thenRun(() -> {
34133455
log.info("[{}] Successfully set replication clusters for namespace={}, "
@@ -3421,12 +3463,16 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
34213463
));
34223464
}
34233465

3424-
protected CompletableFuture<Void> internalRemoveReplicationClusters() {
3466+
protected CompletableFuture<Void> internalRemoveReplicationClusters(boolean isGlobal) {
34253467
return validatePoliciesReadOnlyAccessAsync()
3426-
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
3468+
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
34273469
.thenCompose(op -> {
3470+
if (op.isEmpty()) {
3471+
return CompletableFuture.completedFuture(null);
3472+
}
34283473
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
34293474
topicPolicies.setReplicationClusters(null);
3475+
topicPolicies.setIsGlobal(isGlobal);
34303476
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
34313477
.thenRun(() -> {
34323478
log.info("[{}] Successfully set replication clusters for namespace={}, "

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2304,28 +2304,56 @@ public void getReplicationClusters(@Suspended final AsyncResponse asyncResponse,
23042304
@PathParam("tenant") String tenant,
23052305
@PathParam("namespace") String namespace,
23062306
@PathParam("topic") @Encoded String encodedTopic,
2307+
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
23072308
@QueryParam("applied") @DefaultValue("false") boolean applied,
23082309
@ApiParam(value = "Whether leader broker redirected this call to this broker. "
23092310
+ "For internal use.")
23102311
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
23112312
validateTopicName(tenant, namespace, encodedTopic);
23122313
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.READ)
23132314
.thenCompose(__ -> preValidation(authoritative))
2314-
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
2315-
.thenAccept(op -> {
2316-
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(() -> {
2317-
if (applied) {
2318-
return getNamespacePolicies(namespaceName).replication_clusters;
2315+
.thenCompose(__ -> {
2316+
if (applied) {
2317+
return getAppliedReplicatedClusters();
2318+
}
2319+
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal).thenApply(policy -> {
2320+
if (policy != null && policy.isPresent()
2321+
&& CollectionUtils.isNotEmpty(policy.get().getReplicationClustersSet())) {
2322+
return policy.get().getReplicationClustersSet();
23192323
}
23202324
return null;
2321-
}));
2325+
});
23222326
})
2327+
.thenAccept(asyncResponse::resume)
23232328
.exceptionally(ex -> {
23242329
handleTopicPolicyException("getReplicationClusters", ex, asyncResponse);
23252330
return null;
23262331
});
23272332
}
23282333

2334+
private CompletableFuture<Set<String>> getAppliedReplicatedClusters() {
2335+
return getTopicPoliciesAsyncWithRetry(topicName, false)
2336+
.thenCompose(localPolicy -> {
2337+
if (localPolicy != null && localPolicy.isPresent()
2338+
&& CollectionUtils.isNotEmpty(localPolicy.get().getReplicationClustersSet())) {
2339+
return CompletableFuture.completedFuture(localPolicy.get().getReplicationClustersSet());
2340+
}
2341+
return getTopicPoliciesAsyncWithRetry(topicName, true)
2342+
.thenCompose(globalPolicy -> {
2343+
if (globalPolicy != null && globalPolicy.isPresent()
2344+
&& CollectionUtils.isNotEmpty(globalPolicy.get().getReplicationClustersSet())) {
2345+
return CompletableFuture.completedFuture(globalPolicy.get().getReplicationClustersSet());
2346+
}
2347+
return getNamespacePoliciesAsync(namespaceName).thenApply(v -> {
2348+
if (v != null) {
2349+
return v.replication_clusters;
2350+
}
2351+
return null;
2352+
});
2353+
});
2354+
});
2355+
}
2356+
23292357
@POST
23302358
@Path("/{tenant}/{namespace}/{topic}/replication")
23312359
@ApiOperation(value = "Set the replication clusters for a topic.")
@@ -2341,32 +2369,14 @@ public void setReplicationClusters(
23412369
@Suspended final AsyncResponse asyncResponse,
23422370
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
23432371
@PathParam("topic") @Encoded String encodedTopic,
2372+
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
23442373
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
23452374
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
23462375
@ApiParam(value = "List of replication clusters", required = true) List<String> clusterIds) {
23472376
validateTopicName(tenant, namespace, encodedTopic);
23482377
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
2349-
.thenCompose(__ -> preValidation(authoritative)).thenCompose(__ -> {
2350-
// Set a topic-level replicated clusters that do not contain local cluster is not meaningful, except
2351-
// the following scenario: User has two clusters, which enabled Geo-Replication through a global
2352-
// metadata store, the resources named partitioned topic metadata and the resource namespace-level
2353-
// "replicated clusters" are shared between multi clusters. Pulsar can hardly delete a specify
2354-
// partitioned topic. To support this use case, the following steps can implement it:
2355-
// 1. set a global topic-level replicated clusters that do not contain local cluster.
2356-
// 2. the local cluster will remove the subtopics automatically, and remove the schemas and local
2357-
// topic policies. Just leave the global topic policies there, which prevents the namespace level
2358-
// replicated clusters policy taking affect.
2359-
// TODO But the API "pulsar-admin topics set-replication-clusters" does not support global policy,
2360-
// to support this scenario, a PIP is needed.
2361-
boolean clustersDoesNotContainsLocal = CollectionUtils.isEmpty(clusterIds)
2362-
|| !clusterIds.contains(pulsar().getConfig().getClusterName());
2363-
if (clustersDoesNotContainsLocal) {
2364-
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED,
2365-
"Can not remove local cluster from the topic-level replication clusters policy"));
2366-
}
2367-
return CompletableFuture.completedFuture(null);
2368-
})
2369-
.thenCompose(__ -> internalSetReplicationClusters(clusterIds))
2378+
.thenCompose(__ -> preValidation(authoritative))
2379+
.thenCompose(__ -> internalSetReplicationClusters(clusterIds, isGlobal))
23702380
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
23712381
.exceptionally(ex -> {
23722382
handleTopicPolicyException("setReplicationClusters", ex, asyncResponse);
@@ -2387,12 +2397,13 @@ public void setReplicationClusters(
23872397
public void removeReplicationClusters(@Suspended final AsyncResponse asyncResponse,
23882398
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
23892399
@PathParam("topic") @Encoded String encodedTopic,
2400+
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
23902401
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
23912402
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
23922403
validateTopicName(tenant, namespace, encodedTopic);
23932404
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
23942405
.thenCompose(__ -> preValidation(authoritative))
2395-
.thenCompose(__ -> internalRemoveReplicationClusters())
2406+
.thenCompose(__ -> internalRemoveReplicationClusters(isGlobal))
23962407
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
23972408
.exceptionally(ex -> {
23982409
handleTopicPolicyException("removeReplicationClusters", ex, asyncResponse);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,17 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
139139

140140
@Override
141141
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
142+
return deleteTopicPoliciesAsync(topicName, false);
143+
}
144+
145+
/**
146+
* @param keepGlobalPolicies only be used when a topic was deleted because users removes current
147+
* cluster from the policy "replicatedClusters".
148+
* See also https://github.com/apache/pulsar/blob/master/pip/pip-422.md
149+
*/
150+
@Override
151+
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName,
152+
boolean keepGlobalPolicies) {
142153
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject()) || isSelf(topicName)) {
143154
return CompletableFuture.completedFuture(null);
144155
}
@@ -167,7 +178,8 @@ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
167178
log.info("Skip delete topic-level policies because {} has been removed before", changeEvents);
168179
return CompletableFuture.completedFuture(null);
169180
}
170-
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
181+
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null,
182+
keepGlobalPolicies);
171183
});
172184
}
173185

@@ -177,11 +189,11 @@ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, Top
177189
return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException(
178190
"Not allowed to update topic policy for the heartbeat topic"));
179191
}
180-
return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
192+
return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies, false);
181193
}
182194

183195
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
184-
@Nullable TopicPolicies policies) {
196+
@Nullable TopicPolicies policies, boolean keepGlobalPoliciesAfterDeleting) {
185197
return pulsarService.getPulsarResources().getNamespaceResources()
186198
.getPoliciesAsync(topicName.getNamespaceObject())
187199
.thenCompose(namespacePolicies -> {
@@ -203,7 +215,8 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
203215
result.completeExceptionally(cause);
204216
} else {
205217
CompletableFuture<MessageId> writeFuture =
206-
sendTopicPolicyEventInternal(topicName, actionType, writer, policies);
218+
sendTopicPolicyEventInternal(topicName, actionType, writer, policies,
219+
keepGlobalPoliciesAfterDeleting);
207220
writeFuture.whenComplete((messageId, e) -> {
208221
if (e != null) {
209222
result.completeExceptionally(e);
@@ -223,14 +236,20 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
223236
}
224237

225238
private CompletableFuture<MessageId> sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType,
226-
SystemTopicClient.Writer<PulsarEvent> writer,
227-
@Nullable TopicPolicies policies) {
239+
SystemTopicClient.Writer<PulsarEvent> writer, @Nullable TopicPolicies policies,
240+
boolean keepGlobalPoliciesAfterDeleting) {
228241
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
229242
if (!ActionType.DELETE.equals(actionType)) {
230243
return writer.writeAsync(getEventKey(event, policies != null && policies.isGlobalPolicies()), event);
231244
}
232245
// When a topic is deleting, delete both non-global and global topic-level policies.
233-
CompletableFuture<MessageId> deletePolicies = writer.deleteAsync(getEventKey(event, true), event)
246+
CompletableFuture<MessageId> dealWithGlobalPolicy;
247+
if (keepGlobalPoliciesAfterDeleting) {
248+
dealWithGlobalPolicy = CompletableFuture.completedFuture(null);
249+
} else {
250+
dealWithGlobalPolicy = writer.deleteAsync(getEventKey(event, true), event);
251+
}
252+
CompletableFuture<MessageId> deletePolicies = dealWithGlobalPolicy
234253
.thenCompose(__ -> {
235254
return writer.deleteAsync(getEventKey(event, false), event);
236255
});
@@ -620,7 +639,7 @@ private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
620639
SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
621640
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
622641
systemTopicClient.newWriterAsync().thenAccept(writer -> {
623-
sendTopicPolicyEventInternal(topicName, ActionType.DELETE, writer, event.getPolicies())
642+
sendTopicPolicyEventInternal(topicName, ActionType.DELETE, writer, event.getPolicies(), false)
624643
.whenComplete((result, e) -> writer.closeAsync()
625644
.whenComplete((res, ex) -> {
626645
if (ex != null) {

0 commit comments

Comments
 (0)