Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,15 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
throw new RestException(Status.CONFLICT, "This topic already exists");
}
})
.thenCompose(__ -> {
if (!createLocalTopicOnly && topicName.isGlobal()) {
return internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
}
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
if (!createLocalTopicOnly && topicName.isGlobal()) {
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
}
log.info("[{}] Successfully created partitions for topic {} in cluster {}",
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
asyncResponse.resume(Response.noContent().build());
Expand All @@ -622,11 +625,13 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
});
}

private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
getNamespaceReplicatedClustersAsync(namespaceName)
.thenAccept(clusters -> {
private CompletableFuture<Void> internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
return getNamespaceReplicatedClustersAsync(namespaceName)
.thenCompose(clusters -> {
// this call happens in the background without async composition. completion is logged.
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
Map<String, CompletableFuture<Void>> createPartitionedMetaOnRemoteCluster =
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
return FutureUtil.waitForAll(createPartitionedMetaOnRemoteCluster.values());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,4 +663,63 @@ public void testUnFenceTopicToReuse() throws Exception {
admin2.topics().delete(topicName);
});
}

@Test
public void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists.
try {
admin1.topics().getPartitionedTopicMetadata(topicName);
fail("Expected a not found error");
} catch (Exception ex) {
Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unWrapEx.getMessage().contains("not found"));
}
// Verify: will get a conflict error when there is a topic with different partitions on the remote side.
admin2.topics().createPartitionedTopic(topicName, 1);
try {
admin1.topics().createPartitionedTopic(topicName, 2);
fail("Expected error due to a conflict partitioned topic already exists.");
} catch (Exception ex) {
Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unWrapEx.getMessage().contains("with different partitions"));
}
// Verify: nothing has been changed after the failed calling.
Optional<PartitionedTopicMetadata> partitions1 = pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(TopicName.get(topicName), true).join();
assertFalse(partitions1.isPresent());
Optional<PartitionedTopicMetadata> partitions2 = pulsar2.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(TopicName.get(topicName), true).join();
assertTrue(partitions2.isPresent());
assertEquals(partitions2.get().partitions, 1);
// cleanup.
admin2.topics().deletePartitionedTopic(topicName);
}

@Test
public void testNamespaceLevelPartitionedMetadataReplication() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists.
try {
admin1.topics().getPartitionedTopicMetadata(topicName);
fail("Expected a not found error");
} catch (Exception ex) {
Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unWrapEx.getMessage().contains("not found"));
}
// Verify: will get a conflict error when there is a topic with different partitions on the remote side.
admin1.topics().createPartitionedTopic(topicName, 2);
// Verify: nothing has been changed after the failed calling.
PartitionedTopicMetadata topicMetadata1 = admin1.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata1.partitions, 2);
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 2);
// cleanup.
cleanupTopics(() -> {
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -163,9 +164,19 @@ protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAc
}

protected void waitChangeEventsInit(String namespace) {
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false)
.join().get();
if (!pulsar1.getConfig().isSystemTopicAndTopicLevelPoliciesEnabled() || pulsar1.getConfig().isSystemTopicEnabled()) {
return;
}
CompletableFuture<Optional<Topic>> future = pulsar1.getBrokerService()
.getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false);
if (future == null) {
return;
}
Optional<Topic> optional = future.join();
if (optional.isEmpty()) {
return;
}
PersistentTopic topic = (PersistentTopic) optional.get();
Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> {
TopicStatsImpl topicStats = topic.getStats(true, false, false);
topicStats.getSubscriptions().entrySet().forEach(entry -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class FutureUtil {
* @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
*/
public static CompletableFuture<Void> waitForAll(Collection<? extends CompletableFuture<?>> futures) {
if (futures.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

Expand Down
Loading