Skip to content

Commit f56810a

Browse files
committed
[improve][common] Improve the performance of TopicName constructor and expose it
1 parent a40ac3c commit f56810a

File tree

2 files changed

+157
-61
lines changed

2 files changed

+157
-61
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.qos;
20+
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.pulsar.common.naming.TopicName;
23+
import org.openjdk.jmh.annotations.Benchmark;
24+
import org.openjdk.jmh.annotations.BenchmarkMode;
25+
import org.openjdk.jmh.annotations.Fork;
26+
import org.openjdk.jmh.annotations.Measurement;
27+
import org.openjdk.jmh.annotations.Mode;
28+
import org.openjdk.jmh.annotations.OutputTimeUnit;
29+
import org.openjdk.jmh.annotations.Scope;
30+
import org.openjdk.jmh.annotations.State;
31+
import org.openjdk.jmh.annotations.Threads;
32+
import org.openjdk.jmh.annotations.Warmup;
33+
import org.openjdk.jmh.infra.Blackhole;
34+
35+
@Fork(1)
36+
@BenchmarkMode(Mode.Throughput)
37+
@OutputTimeUnit(TimeUnit.SECONDS)
38+
@State(Scope.Thread)
39+
public class TopicNameBenchmark {
40+
41+
private static final String[] topicBases = {"test",
42+
"tenant/ns/test",
43+
"persistent://another-tenant/another-ns/test"
44+
};
45+
46+
@Threads(1)
47+
@Benchmark
48+
@Warmup(time = 5, iterations = 1)
49+
@Measurement(time = 5, iterations = 1)
50+
public void testReadFromCache(Blackhole blackhole) {
51+
for (int i = 0; i < 10000; i++) {
52+
for (final var topicBase : topicBases) {
53+
blackhole.consume(TopicName.get(topicBase + i));
54+
}
55+
}
56+
}
57+
58+
@Threads(1)
59+
@Benchmark
60+
@Warmup(time = 5, iterations = 1)
61+
@Measurement(time = 5, iterations = 1)
62+
public void testConstruct(Blackhole blackhole) {
63+
for (int i = 0; i < 10000; i++) {
64+
for (final var topicBase : topicBases) {
65+
blackhole.consume(new TopicName(topicBase + i, false));
66+
}
67+
}
68+
}
69+
70+
@Threads(1)
71+
@Benchmark
72+
@Warmup(time = 5, iterations = 1)
73+
@Measurement(time = 5, iterations = 1)
74+
public void testConstructWithNamespaceInitialized(Blackhole blackhole) {
75+
for (int i = 0; i < 10000; i++) {
76+
for (final var topicBase : topicBases) {
77+
blackhole.consume(new TopicName(topicBase + i, true));
78+
}
79+
}
80+
}
81+
}

pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java

Lines changed: 76 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pulsar.common.naming;
2020

21-
import com.google.common.base.Splitter;
2221
import com.google.re2j.Pattern;
2322
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2423
import java.util.ArrayList;
@@ -46,7 +45,7 @@ public class TopicName implements ServiceUnitId {
4645
private final String namespacePortion;
4746
private final String localName;
4847

49-
private final NamespaceName namespaceName;
48+
private volatile NamespaceName namespaceName;
5049

5150
private final int partitionIndex;
5251

@@ -111,64 +110,77 @@ public static String getPattern(String topic) {
111110
return "^" + Pattern.quote(get(topic).getPartitionedTopicName().toString()) + "$";
112111
}
113112

114-
@SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION")
115113
private TopicName(String completeTopicName) {
114+
this(completeTopicName, true);
115+
}
116+
117+
/**
118+
* The constructor from a topic name string. You can leverage {@link TopicName#get(String)} to get benefits from
119+
* the built-in cache mechanism.
120+
*
121+
* @param completeTopicName the topic name
122+
* @param initializeNamespaceName whether to initializing the internal {@link NamespaceName} field
123+
*/
124+
@SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION")
125+
public TopicName(String completeTopicName, boolean initializeNamespaceName) {
116126
try {
117127
// The topic name can be in two different forms, one is fully qualified topic name,
118128
// the other one is short topic name
119-
if (!completeTopicName.contains("://")) {
129+
int index = completeTopicName.indexOf("://");
130+
if (index < 0) {
120131
// The short topic name can be:
121132
// - <topic>
122133
// - <property>/<namespace>/<topic>
123-
String[] parts = StringUtils.split(completeTopicName, '/');
124-
if (parts.length == 3) {
125-
completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName;
126-
} else if (parts.length == 1) {
127-
completeTopicName = TopicDomain.persistent.name() + "://"
128-
+ PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
134+
List<String> parts = splitBySlash(completeTopicName, 0);
135+
this.domain = TopicDomain.persistent;
136+
this.cluster = null;
137+
if (parts.size() == 3) {
138+
this.tenant = parts.get(0);
139+
this.namespacePortion = parts.get(1);
140+
this.localName = parts.get(2);
141+
} else if (parts.size() == 1) {
142+
this.tenant = PUBLIC_TENANT;
143+
this.namespacePortion = DEFAULT_NAMESPACE;
144+
this.localName = parts.get(0);
129145
} else {
130146
throw new IllegalArgumentException(
131147
"Invalid short topic name '" + completeTopicName + "', it should be in the format of "
132148
+ "<tenant>/<namespace>/<topic> or <topic>");
133149
}
150+
this.partitionIndex = getPartitionIndex(localName);
151+
this.completeTopicName = domain.name() + "://" + tenant + "/" + namespacePortion + "/" + localName;
152+
} else {
153+
// The fully qualified topic name can be in two different forms:
154+
// new: persistent://tenant/namespace/topic
155+
// legacy: persistent://tenant/cluster/namespace/topic
156+
//
157+
// Examples of localName:
158+
// 1. some, name, xyz
159+
// 2. xyz-123, feeder-2
160+
List<String> parts = splitBySlash(completeTopicName.substring(index + "://".length()), 4);
161+
if (parts.size() == 3) {
162+
// New topic name without cluster name
163+
this.cluster = null;
164+
this.tenant = parts.get(0);
165+
this.namespacePortion = parts.get(1);
166+
this.localName = parts.get(2);
167+
this.partitionIndex = getPartitionIndex(localName);
168+
} else if (parts.size() == 4) {
169+
// Legacy topic name that includes cluster name
170+
this.tenant = parts.get(0);
171+
this.cluster = parts.get(1);
172+
this.namespacePortion = parts.get(2);
173+
this.localName = parts.get(3);
174+
this.partitionIndex = getPartitionIndex(localName);
175+
} else {
176+
throw new IllegalArgumentException("Invalid topic name " + completeTopicName);
177+
}
178+
this.completeTopicName = completeTopicName;
179+
this.domain = TopicDomain.getEnum(completeTopicName.substring(0, index));
134180
}
135181

136-
// The fully qualified topic name can be in two different forms:
137-
// new: persistent://tenant/namespace/topic
138-
// legacy: persistent://tenant/cluster/namespace/topic
139-
140-
List<String> parts = Splitter.on("://").limit(2).splitToList(completeTopicName);
141-
this.domain = TopicDomain.getEnum(parts.get(0));
142-
143-
String rest = parts.get(1);
144-
145-
// The rest of the name can be in different forms:
146-
// new: tenant/namespace/<localName>
147-
// legacy: tenant/cluster/namespace/<localName>
148-
// Examples of localName:
149-
// 1. some, name, xyz
150-
// 2. xyz-123, feeder-2
151-
152-
153-
parts = Splitter.on("/").limit(4).splitToList(rest);
154-
if (parts.size() == 3) {
155-
// New topic name without cluster name
156-
this.tenant = parts.get(0);
157-
this.cluster = null;
158-
this.namespacePortion = parts.get(1);
159-
this.localName = parts.get(2);
160-
this.partitionIndex = getPartitionIndex(completeTopicName);
161-
this.namespaceName = NamespaceName.get(tenant, namespacePortion);
162-
} else if (parts.size() == 4) {
163-
// Legacy topic name that includes cluster name
164-
this.tenant = parts.get(0);
165-
this.cluster = parts.get(1);
166-
this.namespacePortion = parts.get(2);
167-
this.localName = parts.get(3);
168-
this.partitionIndex = getPartitionIndex(completeTopicName);
169-
this.namespaceName = NamespaceName.get(tenant, cluster, namespacePortion);
170-
} else {
171-
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName);
182+
if (initializeNamespaceName) {
183+
getNamespaceObject();
172184
}
173185

174186
if (StringUtils.isBlank(localName)) {
@@ -179,14 +191,6 @@ private TopicName(String completeTopicName) {
179191
} catch (NullPointerException e) {
180192
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e);
181193
}
182-
if (isV2()) {
183-
this.completeTopicName = String.format("%s://%s/%s/%s",
184-
domain, tenant, namespacePortion, localName);
185-
} else {
186-
this.completeTopicName = String.format("%s://%s/%s/%s/%s",
187-
domain, tenant, cluster,
188-
namespacePortion, localName);
189-
}
190194
}
191195

192196
public boolean isPersistent() {
@@ -201,7 +205,7 @@ public boolean isPersistent() {
201205
* @return the namespace
202206
*/
203207
public String getNamespace() {
204-
return namespaceName.toString();
208+
return getNamespaceObject().toString();
205209
}
206210

207211
/**
@@ -211,6 +215,16 @@ public String getNamespace() {
211215
*/
212216
@Override
213217
public NamespaceName getNamespaceObject() {
218+
if (namespaceName != null) {
219+
return namespaceName;
220+
}
221+
synchronized (this) {
222+
if (cluster == null) {
223+
namespaceName = NamespaceName.get(tenant, namespacePortion);
224+
} else {
225+
namespaceName = NamespaceName.get(tenant, cluster, namespacePortion);
226+
}
227+
}
214228
return namespaceName;
215229
}
216230

@@ -282,9 +296,10 @@ public String getPartitionedTopicName() {
282296
*/
283297
public static int getPartitionIndex(String topic) {
284298
int partitionIndex = -1;
285-
if (topic.contains(PARTITIONED_TOPIC_SUFFIX)) {
299+
int index = topic.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
300+
if (index >= 0) {
286301
try {
287-
String idx = StringUtils.substringAfterLast(topic, PARTITIONED_TOPIC_SUFFIX);
302+
String idx = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length());
288303
partitionIndex = Integer.parseInt(idx);
289304
if (partitionIndex < 0) {
290305
// for the "topic-partition--1"
@@ -356,10 +371,10 @@ public String getPersistenceNamingEncoding() {
356371
public static String fromPersistenceNamingEncoding(String mlName) {
357372
// The managedLedgerName convention is: tenant/namespace/domain/topic
358373
// We want to transform to topic full name in the order: domain://tenant/namespace/topic
359-
if (mlName == null || mlName.length() == 0) {
374+
if (mlName == null || mlName.isEmpty()) {
360375
return mlName;
361376
}
362-
List<String> parts = Splitter.on("/").splitToList(mlName);
377+
List<String> parts = splitBySlash(mlName, 0);
363378
String tenant;
364379
String cluster;
365380
String namespacePortion;
@@ -370,14 +385,14 @@ public static String fromPersistenceNamingEncoding(String mlName) {
370385
namespacePortion = parts.get(1);
371386
domain = parts.get(2);
372387
localName = Codec.decode(parts.get(3));
373-
return String.format("%s://%s/%s/%s", domain, tenant, namespacePortion, localName);
388+
return domain + "://" + tenant + "/" + namespacePortion + "/" + localName;
374389
} else if (parts.size() == 5) {
375390
tenant = parts.get(0);
376391
cluster = parts.get(1);
377392
namespacePortion = parts.get(2);
378393
domain = parts.get(3);
379394
localName = Codec.decode(parts.get(4));
380-
return String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, namespacePortion, localName);
395+
return domain + "://" + tenant + "/" + cluster + "/" + namespacePortion + "/" + localName;
381396
} else {
382397
throw new IllegalArgumentException("Invalid managedLedger name: " + mlName);
383398
}

0 commit comments

Comments
 (0)