Skip to content

Commit a60407e

Browse files
committed
[fix] [broker] [namespace Geo-Replication]Reject a topic creation if there is a confilct topic on the remote side
1 parent d475655 commit a60407e

File tree

4 files changed

+88
-10
lines changed

4 files changed

+88
-10
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -605,12 +605,15 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
605605
throw new RestException(Status.CONFLICT, "This topic already exists");
606606
}
607607
})
608+
.thenCompose(__ -> {
609+
if (!createLocalTopicOnly && topicName.isGlobal()) {
610+
return internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
611+
}
612+
return CompletableFuture.completedFuture(null);
613+
})
608614
.thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
609615
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
610616
.thenRun(() -> {
611-
if (!createLocalTopicOnly && topicName.isGlobal()) {
612-
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
613-
}
614617
log.info("[{}] Successfully created partitions for topic {} in cluster {}",
615618
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
616619
asyncResponse.resume(Response.noContent().build());
@@ -622,11 +625,13 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
622625
});
623626
}
624627

625-
private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
626-
getNamespaceReplicatedClustersAsync(namespaceName)
627-
.thenAccept(clusters -> {
628+
private CompletableFuture<Void> internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
629+
return getNamespaceReplicatedClustersAsync(namespaceName)
630+
.thenCompose(clusters -> {
628631
// this call happens in the background without async composition. completion is logged.
629-
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
632+
Map<String, CompletableFuture<Void>> createPartitionedMetaOnRemoteCluster =
633+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
634+
return FutureUtil.waitForAll(createPartitionedMetaOnRemoteCluster.values());
630635
});
631636
}
632637

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,4 +486,63 @@ public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throw
486486
admin1.topics().deletePartitionedTopic(topicName);
487487
admin2.topics().deletePartitionedTopic(topicName);
488488
}
489+
490+
@Test
491+
public void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception {
492+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
493+
// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists.
494+
try {
495+
admin1.topics().getPartitionedTopicMetadata(topicName);
496+
fail("Expected a not found error");
497+
} catch (Exception ex) {
498+
Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
499+
assertTrue(unWrapEx.getMessage().contains("not found"));
500+
}
501+
// Verify: will get a conflict error when there is a topic with different partitions on the remote side.
502+
admin2.topics().createPartitionedTopic(topicName, 1);
503+
try {
504+
admin1.topics().createPartitionedTopic(topicName, 2);
505+
fail("Expected error due to a conflict partitioned topic already exists.");
506+
} catch (Exception ex) {
507+
Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
508+
assertTrue(unWrapEx.getMessage().contains("with different partitions"));
509+
}
510+
// Verify: nothing has been changed after the failed calling.
511+
Optional<PartitionedTopicMetadata> partitions1 = pulsar1.getPulsarResources().getNamespaceResources()
512+
.getPartitionedTopicResources()
513+
.getPartitionedTopicMetadataAsync(TopicName.get(topicName), true).join();
514+
assertFalse(partitions1.isPresent());
515+
Optional<PartitionedTopicMetadata> partitions2 = pulsar2.getPulsarResources().getNamespaceResources()
516+
.getPartitionedTopicResources()
517+
.getPartitionedTopicMetadataAsync(TopicName.get(topicName), true).join();
518+
assertTrue(partitions2.isPresent());
519+
assertEquals(partitions2.get().partitions, 1);
520+
// cleanup.
521+
admin2.topics().deletePartitionedTopic(topicName);
522+
}
523+
524+
@Test
525+
public void testNamespaceLevelPartitionedMetadataReplication() throws Exception {
526+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
527+
// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists.
528+
try {
529+
admin1.topics().getPartitionedTopicMetadata(topicName);
530+
fail("Expected a not found error");
531+
} catch (Exception ex) {
532+
Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
533+
assertTrue(unWrapEx.getMessage().contains("not found"));
534+
}
535+
// Verify: will get a conflict error when there is a topic with different partitions on the remote side.
536+
admin1.topics().createPartitionedTopic(topicName, 2);
537+
// Verify: nothing has been changed after the failed calling.
538+
PartitionedTopicMetadata topicMetadata1 = admin1.topics().getPartitionedTopicMetadata(topicName);
539+
assertEquals(topicMetadata1.partitions, 2);
540+
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
541+
assertEquals(topicMetadata2.partitions, 2);
542+
// cleanup.
543+
cleanupTopics(() -> {
544+
admin1.topics().deletePartitionedTopic(topicName);
545+
admin2.topics().deletePartitionedTopic(topicName);
546+
});
547+
}
489548
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.time.Duration;
2525
import java.util.Collections;
2626
import java.util.Optional;
27+
import java.util.concurrent.CompletableFuture;
2728
import lombok.extern.slf4j.Slf4j;
2829
import org.apache.pulsar.broker.PulsarService;
2930
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -159,9 +160,19 @@ protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Excep
159160
}
160161

161162
protected void waitChangeEventsInit(String namespace) {
162-
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
163-
.getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false)
164-
.join().get();
163+
if (!pulsar1.getConfig().isSystemTopicAndTopicLevelPoliciesEnabled() || pulsar1.getConfig().isSystemTopicEnabled()) {
164+
return;
165+
}
166+
CompletableFuture<Optional<Topic>> future = pulsar1.getBrokerService()
167+
.getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false);
168+
if (future == null) {
169+
return;
170+
}
171+
Optional<Topic> optional = future.join();
172+
if (optional.isEmpty()) {
173+
return;
174+
}
175+
PersistentTopic topic = (PersistentTopic) optional.get();
165176
Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> {
166177
TopicStatsImpl topicStats = topic.getStats(true, false, false);
167178
topicStats.getSubscriptions().entrySet().forEach(entry -> {

pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class FutureUtil {
5353
* @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
5454
*/
5555
public static CompletableFuture<Void> waitForAll(Collection<? extends CompletableFuture<?>> futures) {
56+
if (futures.isEmpty()) {
57+
return CompletableFuture.completedFuture(null);
58+
}
5659
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
5760
}
5861

0 commit comments

Comments
 (0)