Skip to content

Commit 1bfcaaa

Browse files
committed
[improve][common] Improve the performance of TopicName constructor and expose it
1 parent 7cd9410 commit 1bfcaaa

File tree

2 files changed

+181
-61
lines changed

2 files changed

+181
-61
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.qos;
20+
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.pulsar.common.naming.TopicName;
23+
import org.openjdk.jmh.annotations.Benchmark;
24+
import org.openjdk.jmh.annotations.BenchmarkMode;
25+
import org.openjdk.jmh.annotations.Fork;
26+
import org.openjdk.jmh.annotations.Measurement;
27+
import org.openjdk.jmh.annotations.Mode;
28+
import org.openjdk.jmh.annotations.OutputTimeUnit;
29+
import org.openjdk.jmh.annotations.Scope;
30+
import org.openjdk.jmh.annotations.State;
31+
import org.openjdk.jmh.annotations.Threads;
32+
import org.openjdk.jmh.annotations.Warmup;
33+
import org.openjdk.jmh.infra.Blackhole;
34+
35+
@Fork(1)
36+
@BenchmarkMode(Mode.Throughput)
37+
@OutputTimeUnit(TimeUnit.SECONDS)
38+
@State(Scope.Thread)
39+
public class TopicNameBenchmark {
40+
41+
private static final String[] topicBases = {"test",
42+
"tenant/ns/test",
43+
"persistent://another-tenant/another-ns/test"
44+
};
45+
46+
@Threads(1)
47+
@Benchmark
48+
@Warmup(time = 5, iterations = 1)
49+
@Measurement(time = 5, iterations = 1)
50+
public void testReadFromCache(Blackhole blackhole) {
51+
for (int i = 0; i < 10000; i++) {
52+
for (final var topicBase : topicBases) {
53+
blackhole.consume(TopicName.get(topicBase + i));
54+
}
55+
}
56+
}
57+
58+
@Threads(1)
59+
@Benchmark
60+
@Warmup(time = 5, iterations = 1)
61+
@Measurement(time = 5, iterations = 1)
62+
public void testConstruct(Blackhole blackhole) {
63+
for (int i = 0; i < 10000; i++) {
64+
for (final var topicBase : topicBases) {
65+
blackhole.consume(new TopicName(topicBase + i, false));
66+
}
67+
}
68+
}
69+
70+
@Threads(1)
71+
@Benchmark
72+
@Warmup(time = 5, iterations = 1)
73+
@Measurement(time = 5, iterations = 1)
74+
public void testConstructWithNamespaceInitialized(Blackhole blackhole) {
75+
for (int i = 0; i < 10000; i++) {
76+
for (final var topicBase : topicBases) {
77+
blackhole.consume(new TopicName(topicBase + i, true));
78+
}
79+
}
80+
}
81+
}

pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java

Lines changed: 100 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919
package org.apache.pulsar.common.naming;
2020

21-
import com.google.common.base.Splitter;
2221
import com.google.re2j.Pattern;
2322
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23+
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.Objects;
2626
import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +45,7 @@ public class TopicName implements ServiceUnitId {
4545
private final String namespacePortion;
4646
private final String localName;
4747

48-
private final NamespaceName namespaceName;
48+
private volatile NamespaceName namespaceName;
4949

5050
private final int partitionIndex;
5151

@@ -110,64 +110,77 @@ public static String getPattern(String topic) {
110110
return "^" + Pattern.quote(get(topic).getPartitionedTopicName().toString()) + "$";
111111
}
112112

113-
@SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION")
114113
private TopicName(String completeTopicName) {
114+
this(completeTopicName, true);
115+
}
116+
117+
/**
118+
* The constructor from a topic name string. You can leverage {@link TopicName#get(String)} to get benefits from
119+
* the built-in cache mechanism.
120+
*
121+
* @param completeTopicName the topic name
122+
* @param initializeNamespaceName whether to initializing the internal {@link NamespaceName} field
123+
*/
124+
@SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION")
125+
public TopicName(String completeTopicName, boolean initializeNamespaceName) {
115126
try {
116127
// The topic name can be in two different forms, one is fully qualified topic name,
117128
// the other one is short topic name
118-
if (!completeTopicName.contains("://")) {
129+
int index = completeTopicName.indexOf("://");
130+
if (index < 0) {
119131
// The short topic name can be:
120132
// - <topic>
121133
// - <property>/<namespace>/<topic>
122-
String[] parts = StringUtils.split(completeTopicName, '/');
123-
if (parts.length == 3) {
124-
completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName;
125-
} else if (parts.length == 1) {
126-
completeTopicName = TopicDomain.persistent.name() + "://"
127-
+ PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
134+
List<String> parts = splitBySlash(completeTopicName, 0);
135+
this.domain = TopicDomain.persistent;
136+
this.cluster = null;
137+
if (parts.size() == 3) {
138+
this.tenant = parts.get(0);
139+
this.namespacePortion = parts.get(1);
140+
this.localName = parts.get(2);
141+
} else if (parts.size() == 1) {
142+
this.tenant = PUBLIC_TENANT;
143+
this.namespacePortion = DEFAULT_NAMESPACE;
144+
this.localName = parts.get(0);
128145
} else {
129146
throw new IllegalArgumentException(
130147
"Invalid short topic name '" + completeTopicName + "', it should be in the format of "
131148
+ "<tenant>/<namespace>/<topic> or <topic>");
132149
}
150+
this.partitionIndex = getPartitionIndex(localName);
151+
this.completeTopicName = domain.name() + "://" + tenant + "/" + namespacePortion + "/" + localName;
152+
} else {
153+
// The fully qualified topic name can be in two different forms:
154+
// new: persistent://tenant/namespace/topic
155+
// legacy: persistent://tenant/cluster/namespace/topic
156+
//
157+
// Examples of localName:
158+
// 1. some, name, xyz
159+
// 2. xyz-123, feeder-2
160+
List<String> parts = splitBySlash(completeTopicName.substring(index + "://".length()), 4);
161+
if (parts.size() == 3) {
162+
// New topic name without cluster name
163+
this.cluster = null;
164+
this.tenant = parts.get(0);
165+
this.namespacePortion = parts.get(1);
166+
this.localName = parts.get(2);
167+
this.partitionIndex = getPartitionIndex(localName);
168+
} else if (parts.size() == 4) {
169+
// Legacy topic name that includes cluster name
170+
this.tenant = parts.get(0);
171+
this.cluster = parts.get(1);
172+
this.namespacePortion = parts.get(2);
173+
this.localName = parts.get(3);
174+
this.partitionIndex = getPartitionIndex(localName);
175+
} else {
176+
throw new IllegalArgumentException("Invalid topic name " + completeTopicName);
177+
}
178+
this.completeTopicName = completeTopicName;
179+
this.domain = TopicDomain.getEnum(completeTopicName.substring(0, index));
133180
}
134181

135-
// The fully qualified topic name can be in two different forms:
136-
// new: persistent://tenant/namespace/topic
137-
// legacy: persistent://tenant/cluster/namespace/topic
138-
139-
List<String> parts = Splitter.on("://").limit(2).splitToList(completeTopicName);
140-
this.domain = TopicDomain.getEnum(parts.get(0));
141-
142-
String rest = parts.get(1);
143-
144-
// The rest of the name can be in different forms:
145-
// new: tenant/namespace/<localName>
146-
// legacy: tenant/cluster/namespace/<localName>
147-
// Examples of localName:
148-
// 1. some, name, xyz
149-
// 2. xyz-123, feeder-2
150-
151-
152-
parts = Splitter.on("/").limit(4).splitToList(rest);
153-
if (parts.size() == 3) {
154-
// New topic name without cluster name
155-
this.tenant = parts.get(0);
156-
this.cluster = null;
157-
this.namespacePortion = parts.get(1);
158-
this.localName = parts.get(2);
159-
this.partitionIndex = getPartitionIndex(completeTopicName);
160-
this.namespaceName = NamespaceName.get(tenant, namespacePortion);
161-
} else if (parts.size() == 4) {
162-
// Legacy topic name that includes cluster name
163-
this.tenant = parts.get(0);
164-
this.cluster = parts.get(1);
165-
this.namespacePortion = parts.get(2);
166-
this.localName = parts.get(3);
167-
this.partitionIndex = getPartitionIndex(completeTopicName);
168-
this.namespaceName = NamespaceName.get(tenant, cluster, namespacePortion);
169-
} else {
170-
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName);
182+
if (initializeNamespaceName) {
183+
getNamespaceObject();
171184
}
172185

173186
if (StringUtils.isBlank(localName)) {
@@ -178,14 +191,6 @@ private TopicName(String completeTopicName) {
178191
} catch (NullPointerException e) {
179192
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e);
180193
}
181-
if (isV2()) {
182-
this.completeTopicName = String.format("%s://%s/%s/%s",
183-
domain, tenant, namespacePortion, localName);
184-
} else {
185-
this.completeTopicName = String.format("%s://%s/%s/%s/%s",
186-
domain, tenant, cluster,
187-
namespacePortion, localName);
188-
}
189194
}
190195

191196
public boolean isPersistent() {
@@ -200,7 +205,7 @@ public boolean isPersistent() {
200205
* @return the namespace
201206
*/
202207
public String getNamespace() {
203-
return namespaceName.toString();
208+
return getNamespaceObject().toString();
204209
}
205210

206211
/**
@@ -210,6 +215,16 @@ public String getNamespace() {
210215
*/
211216
@Override
212217
public NamespaceName getNamespaceObject() {
218+
if (namespaceName != null) {
219+
return namespaceName;
220+
}
221+
synchronized (this) {
222+
if (cluster == null) {
223+
namespaceName = NamespaceName.get(tenant, namespacePortion);
224+
} else {
225+
namespaceName = NamespaceName.get(tenant, cluster, namespacePortion);
226+
}
227+
}
213228
return namespaceName;
214229
}
215230

@@ -281,9 +296,10 @@ public String getPartitionedTopicName() {
281296
*/
282297
public static int getPartitionIndex(String topic) {
283298
int partitionIndex = -1;
284-
if (topic.contains(PARTITIONED_TOPIC_SUFFIX)) {
299+
int index = topic.lastIndexOf(PARTITIONED_TOPIC_SUFFIX);
300+
if (index >= 0) {
285301
try {
286-
String idx = StringUtils.substringAfterLast(topic, PARTITIONED_TOPIC_SUFFIX);
302+
String idx = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length());
287303
partitionIndex = Integer.parseInt(idx);
288304
if (partitionIndex < 0) {
289305
// for the "topic-partition--1"
@@ -355,10 +371,10 @@ public String getPersistenceNamingEncoding() {
355371
public static String fromPersistenceNamingEncoding(String mlName) {
356372
// The managedLedgerName convention is: tenant/namespace/domain/topic
357373
// We want to transform to topic full name in the order: domain://tenant/namespace/topic
358-
if (mlName == null || mlName.length() == 0) {
374+
if (mlName == null || mlName.isEmpty()) {
359375
return mlName;
360376
}
361-
List<String> parts = Splitter.on("/").splitToList(mlName);
377+
List<String> parts = splitBySlash(mlName, 0);
362378
String tenant;
363379
String cluster;
364380
String namespacePortion;
@@ -369,14 +385,14 @@ public static String fromPersistenceNamingEncoding(String mlName) {
369385
namespacePortion = parts.get(1);
370386
domain = parts.get(2);
371387
localName = Codec.decode(parts.get(3));
372-
return String.format("%s://%s/%s/%s", domain, tenant, namespacePortion, localName);
388+
return domain + "://" + tenant + "/" + namespacePortion + "/" + localName;
373389
} else if (parts.size() == 5) {
374390
tenant = parts.get(0);
375391
cluster = parts.get(1);
376392
namespacePortion = parts.get(2);
377393
domain = parts.get(3);
378394
localName = Codec.decode(parts.get(4));
379-
return String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, namespacePortion, localName);
395+
return domain + "://" + tenant + "/" + cluster + "/" + namespacePortion + "/" + localName;
380396
} else {
381397
throw new IllegalArgumentException("Invalid managedLedger name: " + mlName);
382398
}
@@ -442,4 +458,27 @@ public boolean includes(TopicName otherTopicName) {
442458
public boolean isV2() {
443459
return cluster == null;
444460
}
461+
462+
private static List<String> splitBySlash(String topic, int limit) {
463+
final List<String> tokens = new ArrayList<>(3);
464+
final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1;
465+
int beginIndex = 0;
466+
for (int i = 0; i < loopCount; i++) {
467+
final int endIndex = topic.indexOf('/', beginIndex);
468+
if (endIndex < 0) {
469+
tokens.add(topic.substring(beginIndex));
470+
return tokens;
471+
} else if (endIndex > beginIndex) {
472+
tokens.add(topic.substring(beginIndex, endIndex));
473+
} else {
474+
throw new IllegalArgumentException("Invalid topic name " + topic);
475+
}
476+
beginIndex = endIndex + 1;
477+
}
478+
if (beginIndex >= topic.length()) {
479+
throw new IllegalArgumentException("Invalid topic name " + topic);
480+
}
481+
tokens.add(topic.substring(beginIndex));
482+
return tokens;
483+
}
445484
}

0 commit comments

Comments
 (0)