Skip to content

Commit 2d78cbd

Browse files
authored
[fix][broker] Orphan schema after disabled a cluster for a namespace (#24223)
1 parent 0c8e88c commit 2d78cbd

File tree

2 files changed

+90
-9
lines changed

2 files changed

+90
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.concurrent.TimeUnit;
5353
import java.util.concurrent.TimeoutException;
5454
import java.util.concurrent.atomic.AtomicBoolean;
55+
import java.util.concurrent.atomic.AtomicInteger;
5556
import java.util.concurrent.atomic.AtomicLong;
5657
import java.util.concurrent.atomic.AtomicReference;
5758
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -1931,7 +1932,9 @@ public CompletableFuture<Void> checkReplication() {
19311932
if (!success) {
19321933
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
19331934
// because pulsar doesn't serve global topic without local repl-cluster configured.
1934-
return deleteForcefully();
1935+
return deleteForcefully().thenCompose(ignore -> {
1936+
return deleteSchemaAndPoliciesIfClusterRemoved();
1937+
});
19351938
}
19361939

19371940
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
@@ -1974,6 +1977,55 @@ public CompletableFuture<Void> checkReplication() {
19741977
});
19751978
}
19761979

1980+
CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() {
1981+
TopicName tName = TopicName.get(topic);
1982+
if (!tName.isPartitioned()) {
1983+
return CompletableFuture.completedFuture(null);
1984+
}
1985+
TopicName partitionedName = TopicName.get(tName.getPartitionedTopicName());
1986+
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
1987+
.getPartitionedTopicResources()
1988+
.getPartitionedTopicMetadataAsync(partitionedName)
1989+
.thenApply(metadataOp -> {
1990+
if (metadataOp.isEmpty()) {
1991+
return null;
1992+
}
1993+
AtomicInteger checkedCounter = new AtomicInteger(metadataOp.get().partitions);
1994+
for (int i = 0; i < metadataOp.get().partitions; i++) {
1995+
brokerService.getPulsar().getPulsarResources().getTopicResources()
1996+
.persistentTopicExists(partitionedName.getPartition(i)).thenAccept(b -> {
1997+
if (!b) {
1998+
int leftPartitions = checkedCounter.decrementAndGet();
1999+
log.info("[{}] partitions: {}, left: {}", tName, metadataOp.get().partitions,
2000+
leftPartitions);
2001+
if (leftPartitions == 0) {
2002+
brokerService.getPulsar().getSchemaStorage()
2003+
.delete(partitionedName.getSchemaName())
2004+
.whenComplete((schemaVersion, ex) -> {
2005+
if (ex == null) {
2006+
log.info("Deleted schema[{}] after all partitions[{}] were removed"
2007+
+ " because the current cluster has bee removed from"
2008+
+ " topic/namespace policies",
2009+
partitionedName, metadataOp.get().partitions);
2010+
} else {
2011+
log.error("Failed to delete schema[{}] after all partitions[{}] were"
2012+
+ " removed, when the current cluster has bee removed from"
2013+
+ " topic/namespace policies",
2014+
partitionedName, metadataOp.get().partitions, ex);
2015+
}
2016+
2017+
});
2018+
// TODO regarding the topic level policies, it will be deleted at a seperate PR.
2019+
// Because there is an issue related to Global policies has not been solved so
2020+
// far.
2021+
}
2022+
}
2023+
});
2024+
}
2025+
return null;
2026+
});
2027+
}
2028+
19772029
private CompletableFuture<Boolean> checkAllowedCluster(String localCluster) {
19782030
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
19792031
return brokerService.pulsar().getPulsarResources().getNamespaceResources()

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.testng.Assert.assertEquals;
2122
import static org.testng.Assert.assertFalse;
2223
import static org.testng.Assert.assertTrue;
2324
import java.time.Duration;
2425
import java.util.Arrays;
2526
import java.util.HashSet;
27+
import java.util.List;
2628
import java.util.Map;
2729
import java.util.Optional;
2830
import java.util.concurrent.CompletableFuture;
@@ -33,6 +35,7 @@
3335
import org.apache.pulsar.client.api.Schema;
3436
import org.apache.pulsar.common.naming.TopicName;
3537
import org.apache.pulsar.common.policies.data.TopicType;
38+
import org.apache.pulsar.common.protocol.schema.StoredSchema;
3639
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
3740
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
3841
import org.awaitility.Awaitility;
@@ -172,36 +175,62 @@ public void testRemoveCluster() throws Exception {
172175
// Initialize.
173176
final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8";
174177
final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
178+
final String topicP0 = TopicName.get(topic).getPartition(0).toString();
179+
final String topicP1 = TopicName.get(topic).getPartition(1).toString();
175180
final String topicChangeEvents = "persistent://" + ns1 + "/__change_events-partition-0";
176181
admin1.namespaces().createNamespace(ns1);
177182
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
178-
admin1.topics().createNonPartitionedTopic(topic);
183+
admin1.topics().createPartitionedTopic(topic, 2);
184+
Awaitility.await().untilAsserted(() -> {
185+
assertTrue(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
186+
.partitionedTopicExists(TopicName.get(topic)));
187+
List<CompletableFuture<StoredSchema>> schemaList11
188+
= pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
189+
assertEquals(schemaList11.size(), 0);
190+
List<CompletableFuture<StoredSchema>> schemaList21
191+
= pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
192+
assertEquals(schemaList21.size(), 0);
193+
});
179194

180-
// Wait for loading topic up.
195+
// Wait for copying messages.
181196
Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create();
197+
p.send("msg-1");
198+
p.close();
182199
Awaitility.await().untilAsserted(() -> {
183200
Map<String, CompletableFuture<Optional<Topic>>> tps = pulsar1.getBrokerService().getTopics();
184-
assertTrue(tps.containsKey(topic));
201+
assertTrue(tps.containsKey(topicP0));
202+
assertTrue(tps.containsKey(topicP1));
185203
assertTrue(tps.containsKey(topicChangeEvents));
204+
List<CompletableFuture<StoredSchema>> schemaList12
205+
= pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
206+
assertEquals(schemaList12.size(), 1);
207+
List<CompletableFuture<StoredSchema>> schemaList22
208+
= pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
209+
assertEquals(schemaList12.size(), 1);
186210
});
187211

188212
// The topics under the namespace of the cluster-1 will be deleted.
189213
// Verify the result.
190214
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2)));
191215
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(() -> {
192216
Map<String, CompletableFuture<Optional<Topic>>> tps = pulsar1.getBrokerService().getTopics();
193-
assertFalse(tps.containsKey(topic));
217+
assertFalse(tps.containsKey(topicP0));
218+
assertFalse(tps.containsKey(topicP1));
194219
assertFalse(tps.containsKey(topicChangeEvents));
195-
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic))
196-
.get(5, TimeUnit.SECONDS).isExists());
197220
assertFalse(pulsar1.getNamespaceService()
198221
.checkTopicExists(TopicName.get(topicChangeEvents))
199222
.get(5, TimeUnit.SECONDS).isExists());
223+
// Verify: schema will be removed in local cluster, and remote cluster will not.
224+
List<CompletableFuture<StoredSchema>> schemaList13
225+
= pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
226+
assertEquals(schemaList13.size(), 0);
227+
List<CompletableFuture<StoredSchema>> schemaList23
228+
= pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
229+
assertEquals(schemaList23.size(), 1);
200230
});
201231

202232
// cleanup.
203-
p.close();
204-
admin2.topics().delete(topic);
233+
admin2.topics().deletePartitionedTopic(topic);
205234
admin2.namespaces().deleteNamespace(ns1);
206235
}
207236
}

0 commit comments

Comments
 (0)