Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.common.naming;

import com.google.common.base.Splitter;
import com.google.re2j.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
Expand Down Expand Up @@ -116,59 +115,60 @@ private TopicName(String completeTopicName) {
try {
// The topic name can be in two different forms, one is fully qualified topic name,
// the other one is short topic name
if (!completeTopicName.contains("://")) {
int index = completeTopicName.indexOf("://");
if (index < 0) {
// The short topic name can be:
// - <topic>
// - <property>/<namespace>/<topic>
String[] parts = StringUtils.split(completeTopicName, '/');
if (parts.length == 3) {
completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName;
} else if (parts.length == 1) {
completeTopicName = TopicDomain.persistent.name() + "://"
+ PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
List<String> parts = splitBySlash(completeTopicName, 0);
this.domain = TopicDomain.persistent;
this.cluster = null;
if (parts.size() == 3) {
this.tenant = parts.get(0);
this.namespacePortion = parts.get(1);
this.localName = parts.get(2);
} else if (parts.size() == 1) {
this.tenant = PUBLIC_TENANT;
this.namespacePortion = DEFAULT_NAMESPACE;
this.localName = parts.get(0);
} else {
throw new IllegalArgumentException(
"Invalid short topic name '" + completeTopicName + "', it should be in the format of "
+ "<tenant>/<namespace>/<topic> or <topic>");
}
}

// The fully qualified topic name can be in two different forms:
// new: persistent://tenant/namespace/topic
// legacy: persistent://tenant/cluster/namespace/topic

List<String> parts = Splitter.on("://").limit(2).splitToList(completeTopicName);
this.domain = TopicDomain.getEnum(parts.get(0));

String rest = parts.get(1);

// The rest of the name can be in different forms:
// new: tenant/namespace/<localName>
// legacy: tenant/cluster/namespace/<localName>
// Examples of localName:
// 1. some, name, xyz
// 2. xyz-123, feeder-2


parts = Splitter.on("/").limit(4).splitToList(rest);
if (parts.size() == 3) {
// New topic name without cluster name
this.tenant = parts.get(0);
this.cluster = null;
this.namespacePortion = parts.get(1);
this.localName = parts.get(2);
this.partitionIndex = getPartitionIndex(completeTopicName);
this.partitionIndex = getPartitionIndex(localName);
this.completeTopicName = domain.name() + "://" + tenant + "/" + namespacePortion + "/" + localName;
this.namespaceName = NamespaceName.get(tenant, namespacePortion);
} else if (parts.size() == 4) {
// Legacy topic name that includes cluster name
this.tenant = parts.get(0);
this.cluster = parts.get(1);
this.namespacePortion = parts.get(2);
this.localName = parts.get(3);
this.partitionIndex = getPartitionIndex(completeTopicName);
this.namespaceName = NamespaceName.get(tenant, cluster, namespacePortion);
} else {
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName);
// The fully qualified topic name can be in two different forms:
// new: persistent://tenant/namespace/topic
// legacy: persistent://tenant/cluster/namespace/topic
//
// Examples of localName:
// 1. some, name, xyz
// 2. xyz-123, feeder-2
List<String> parts = splitBySlash(completeTopicName.substring(index + "://".length()), 4);
if (parts.size() == 3) {
// New topic name without cluster name
this.cluster = null;
this.tenant = parts.get(0);
this.namespacePortion = parts.get(1);
this.localName = parts.get(2);
this.partitionIndex = getPartitionIndex(localName);
this.namespaceName = NamespaceName.get(tenant, namespacePortion);
} else if (parts.size() == 4) {
// Legacy topic name that includes cluster name
this.tenant = parts.get(0);
this.cluster = parts.get(1);
this.namespacePortion = parts.get(2);
this.localName = parts.get(3);
this.partitionIndex = getPartitionIndex(localName);
namespaceName = NamespaceName.get(tenant, cluster, namespacePortion);
} else {
throw new IllegalArgumentException("Invalid topic name " + completeTopicName);
}
this.completeTopicName = completeTopicName;
this.domain = TopicDomain.getEnum(completeTopicName.substring(0, index));
}

if (StringUtils.isBlank(localName)) {
Expand All @@ -179,14 +179,6 @@ private TopicName(String completeTopicName) {
} catch (NullPointerException e) {
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e);
}
if (isV2()) {
this.completeTopicName = String.format("%s://%s/%s/%s",
domain, tenant, namespacePortion, localName);
} else {
this.completeTopicName = String.format("%s://%s/%s/%s/%s",
domain, tenant, cluster,
namespacePortion, localName);
}
}

public boolean isPersistent() {
Expand All @@ -201,7 +193,7 @@ public boolean isPersistent() {
* @return the namespace
*/
public String getNamespace() {
return namespaceName.toString();
return getNamespaceObject().toString();
}

/**
Expand Down Expand Up @@ -282,9 +274,10 @@ public String getPartitionedTopicName() {
*/
public static int getPartitionIndex(String topic) {
int partitionIndex = -1;
if (topic.contains(PARTITIONED_TOPIC_SUFFIX)) {
int index = topic.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
if (index >= 0) {
try {
String idx = StringUtils.substringAfterLast(topic, PARTITIONED_TOPIC_SUFFIX);
String idx = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length());
partitionIndex = Integer.parseInt(idx);
if (partitionIndex < 0) {
// for the "topic-partition--1"
Expand Down Expand Up @@ -356,10 +349,10 @@ public String getPersistenceNamingEncoding() {
public static String fromPersistenceNamingEncoding(String mlName) {
// The managedLedgerName convention is: tenant/namespace/domain/topic
// We want to transform to topic full name in the order: domain://tenant/namespace/topic
if (mlName == null || mlName.length() == 0) {
if (mlName == null || mlName.isEmpty()) {
return mlName;
}
List<String> parts = Splitter.on("/").splitToList(mlName);
List<String> parts = splitBySlash(mlName, 0);
String tenant;
String cluster;
String namespacePortion;
Expand All @@ -370,14 +363,14 @@ public static String fromPersistenceNamingEncoding(String mlName) {
namespacePortion = parts.get(1);
domain = parts.get(2);
localName = Codec.decode(parts.get(3));
return String.format("%s://%s/%s/%s", domain, tenant, namespacePortion, localName);
return domain + "://" + tenant + "/" + namespacePortion + "/" + localName;
} else if (parts.size() == 5) {
tenant = parts.get(0);
cluster = parts.get(1);
namespacePortion = parts.get(2);
domain = parts.get(3);
localName = Codec.decode(parts.get(4));
return String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, namespacePortion, localName);
return domain + "://" + tenant + "/" + cluster + "/" + namespacePortion + "/" + localName;
} else {
throw new IllegalArgumentException("Invalid managedLedger name: " + mlName);
}
Expand Down
Loading