From f56810a1ab2a43337ddcd42c5b35d4ad94fdf8c6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Jun 2025 15:15:19 +0800 Subject: [PATCH 1/4] [improve][common] Improve the performance of TopicName constructor and expose it --- .../pulsar/broker/qos/TopicNameBenchmark.java | 81 +++++++++++ .../pulsar/common/naming/TopicName.java | 137 ++++++++++-------- 2 files changed, 157 insertions(+), 61 deletions(-) create mode 100644 microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java new file mode 100644 index 0000000000000..035c96f9910c1 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.qos; + +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.naming.TopicName; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class TopicNameBenchmark { + + private static final String[] topicBases = {"test", + "tenant/ns/test", + "persistent://another-tenant/another-ns/test" + }; + + @Threads(1) + @Benchmark + @Warmup(time = 5, iterations = 1) + @Measurement(time = 5, iterations = 1) + public void testReadFromCache(Blackhole blackhole) { + for (int i = 0; i < 10000; i++) { + for (final var topicBase : topicBases) { + blackhole.consume(TopicName.get(topicBase + i)); + } + } + } + + @Threads(1) + @Benchmark + @Warmup(time = 5, iterations = 1) + @Measurement(time = 5, iterations = 1) + public void testConstruct(Blackhole blackhole) { + for (int i = 0; i < 10000; i++) { + for (final var topicBase : topicBases) { + blackhole.consume(new TopicName(topicBase + i, false)); + } + } + } + + @Threads(1) + @Benchmark + @Warmup(time = 5, iterations = 1) + @Measurement(time = 5, iterations = 1) + public void testConstructWithNamespaceInitialized(Blackhole blackhole) { + for (int i = 0; i < 10000; i++) { + for (final var topicBase : topicBases) { + blackhole.consume(new TopicName(topicBase + i, true)); + } + } + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 4d9b28df91be1..3c32bcfd5abb3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.common.naming; -import com.google.common.base.Splitter; import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; @@ -46,7 +45,7 @@ public class TopicName implements ServiceUnitId { private final String namespacePortion; private final String localName; - private final NamespaceName namespaceName; + private volatile NamespaceName namespaceName; private final int partitionIndex; @@ -111,64 +110,77 @@ public static String getPattern(String topic) { return "^" + Pattern.quote(get(topic).getPartitionedTopicName().toString()) + "$"; } - @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") private TopicName(String completeTopicName) { + this(completeTopicName, true); + } + + /** + * The constructor from a topic name string. You can leverage {@link TopicName#get(String)} to get benefits from + * the built-in cache mechanism. + * + * @param completeTopicName the topic name + * @param initializeNamespaceName whether to initializing the internal {@link NamespaceName} field + */ + @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") + public TopicName(String completeTopicName, boolean initializeNamespaceName) { try { // The topic name can be in two different forms, one is fully qualified topic name, // the other one is short topic name - if (!completeTopicName.contains("://")) { + int index = completeTopicName.indexOf("://"); + if (index < 0) { // The short topic name can be: // - // - // - String[] parts = StringUtils.split(completeTopicName, '/'); - if (parts.length == 3) { - completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName; - } else if (parts.length == 1) { - completeTopicName = TopicDomain.persistent.name() + "://" - + PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0]; + List parts = splitBySlash(completeTopicName, 0); + this.domain = TopicDomain.persistent; + this.cluster = null; + if (parts.size() == 3) { + this.tenant = parts.get(0); + this.namespacePortion = parts.get(1); + this.localName = parts.get(2); + } else if (parts.size() == 1) { + this.tenant = PUBLIC_TENANT; + this.namespacePortion = DEFAULT_NAMESPACE; + this.localName = parts.get(0); } else { throw new IllegalArgumentException( "Invalid short topic name '" + completeTopicName + "', it should be in the format of " + "// or "); } + this.partitionIndex = getPartitionIndex(localName); + this.completeTopicName = domain.name() + "://" + tenant + "/" + namespacePortion + "/" + localName; + } else { + // The fully qualified topic name can be in two different forms: + // new: persistent://tenant/namespace/topic + // legacy: persistent://tenant/cluster/namespace/topic + // + // Examples of localName: + // 1. some, name, xyz + // 2. xyz-123, feeder-2 + List parts = splitBySlash(completeTopicName.substring(index + "://".length()), 4); + if (parts.size() == 3) { + // New topic name without cluster name + this.cluster = null; + this.tenant = parts.get(0); + this.namespacePortion = parts.get(1); + this.localName = parts.get(2); + this.partitionIndex = getPartitionIndex(localName); + } else if (parts.size() == 4) { + // Legacy topic name that includes cluster name + this.tenant = parts.get(0); + this.cluster = parts.get(1); + this.namespacePortion = parts.get(2); + this.localName = parts.get(3); + this.partitionIndex = getPartitionIndex(localName); + } else { + throw new IllegalArgumentException("Invalid topic name " + completeTopicName); + } + this.completeTopicName = completeTopicName; + this.domain = TopicDomain.getEnum(completeTopicName.substring(0, index)); } - // The fully qualified topic name can be in two different forms: - // new: persistent://tenant/namespace/topic - // legacy: persistent://tenant/cluster/namespace/topic - - List parts = Splitter.on("://").limit(2).splitToList(completeTopicName); - this.domain = TopicDomain.getEnum(parts.get(0)); - - String rest = parts.get(1); - - // The rest of the name can be in different forms: - // new: tenant/namespace/ - // legacy: tenant/cluster/namespace/ - // Examples of localName: - // 1. some, name, xyz - // 2. xyz-123, feeder-2 - - - parts = Splitter.on("/").limit(4).splitToList(rest); - if (parts.size() == 3) { - // New topic name without cluster name - this.tenant = parts.get(0); - this.cluster = null; - this.namespacePortion = parts.get(1); - this.localName = parts.get(2); - this.partitionIndex = getPartitionIndex(completeTopicName); - this.namespaceName = NamespaceName.get(tenant, namespacePortion); - } else if (parts.size() == 4) { - // Legacy topic name that includes cluster name - this.tenant = parts.get(0); - this.cluster = parts.get(1); - this.namespacePortion = parts.get(2); - this.localName = parts.get(3); - this.partitionIndex = getPartitionIndex(completeTopicName); - this.namespaceName = NamespaceName.get(tenant, cluster, namespacePortion); - } else { - throw new IllegalArgumentException("Invalid topic name: " + completeTopicName); + if (initializeNamespaceName) { + getNamespaceObject(); } if (StringUtils.isBlank(localName)) { @@ -179,14 +191,6 @@ private TopicName(String completeTopicName) { } catch (NullPointerException e) { throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e); } - if (isV2()) { - this.completeTopicName = String.format("%s://%s/%s/%s", - domain, tenant, namespacePortion, localName); - } else { - this.completeTopicName = String.format("%s://%s/%s/%s/%s", - domain, tenant, cluster, - namespacePortion, localName); - } } public boolean isPersistent() { @@ -201,7 +205,7 @@ public boolean isPersistent() { * @return the namespace */ public String getNamespace() { - return namespaceName.toString(); + return getNamespaceObject().toString(); } /** @@ -211,6 +215,16 @@ public String getNamespace() { */ @Override public NamespaceName getNamespaceObject() { + if (namespaceName != null) { + return namespaceName; + } + synchronized (this) { + if (cluster == null) { + namespaceName = NamespaceName.get(tenant, namespacePortion); + } else { + namespaceName = NamespaceName.get(tenant, cluster, namespacePortion); + } + } return namespaceName; } @@ -282,9 +296,10 @@ public String getPartitionedTopicName() { */ public static int getPartitionIndex(String topic) { int partitionIndex = -1; - if (topic.contains(PARTITIONED_TOPIC_SUFFIX)) { + int index = topic.lastIndexOf(PARTITIONED_TOPIC_SUFFIX); + if (index >= 0) { try { - String idx = StringUtils.substringAfterLast(topic, PARTITIONED_TOPIC_SUFFIX); + String idx = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()); partitionIndex = Integer.parseInt(idx); if (partitionIndex < 0) { // for the "topic-partition--1" @@ -356,10 +371,10 @@ public String getPersistenceNamingEncoding() { public static String fromPersistenceNamingEncoding(String mlName) { // The managedLedgerName convention is: tenant/namespace/domain/topic // We want to transform to topic full name in the order: domain://tenant/namespace/topic - if (mlName == null || mlName.length() == 0) { + if (mlName == null || mlName.isEmpty()) { return mlName; } - List parts = Splitter.on("/").splitToList(mlName); + List parts = splitBySlash(mlName, 0); String tenant; String cluster; String namespacePortion; @@ -370,14 +385,14 @@ public static String fromPersistenceNamingEncoding(String mlName) { namespacePortion = parts.get(1); domain = parts.get(2); localName = Codec.decode(parts.get(3)); - return String.format("%s://%s/%s/%s", domain, tenant, namespacePortion, localName); + return domain + "://" + tenant + "/" + namespacePortion + "/" + localName; } else if (parts.size() == 5) { tenant = parts.get(0); cluster = parts.get(1); namespacePortion = parts.get(2); domain = parts.get(3); localName = Codec.decode(parts.get(4)); - return String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, namespacePortion, localName); + return domain + "://" + tenant + "/" + cluster + "/" + namespacePortion + "/" + localName; } else { throw new IllegalArgumentException("Invalid managedLedger name: " + mlName); } From 355140ad091a93c9db14f6132660a19ae00b7bef Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Jun 2025 16:21:15 +0800 Subject: [PATCH 2/4] Don't initialize namespace by default --- .../pulsar/broker/qos/TopicNameBenchmark.java | 14 +------------- .../apache/pulsar/common/naming/TopicName.java | 17 +++++------------ 2 files changed, 6 insertions(+), 25 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java index 035c96f9910c1..91bf4c9dd5065 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java @@ -62,19 +62,7 @@ public void testReadFromCache(Blackhole blackhole) { public void testConstruct(Blackhole blackhole) { for (int i = 0; i < 10000; i++) { for (final var topicBase : topicBases) { - blackhole.consume(new TopicName(topicBase + i, false)); - } - } - } - - @Threads(1) - @Benchmark - @Warmup(time = 5, iterations = 1) - @Measurement(time = 5, iterations = 1) - public void testConstructWithNamespaceInitialized(Blackhole blackhole) { - for (int i = 0; i < 10000; i++) { - for (final var topicBase : topicBases) { - blackhole.consume(new TopicName(topicBase + i, true)); + blackhole.consume(new TopicName(topicBase + i)); } } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 3c32bcfd5abb3..2f49835c5f38e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -110,19 +110,16 @@ public static String getPattern(String topic) { return "^" + Pattern.quote(get(topic).getPartitionedTopicName().toString()) + "$"; } - private TopicName(String completeTopicName) { - this(completeTopicName, true); - } - /** - * The constructor from a topic name string. You can leverage {@link TopicName#get(String)} to get benefits from - * the built-in cache mechanism. + * The constructor from a topic name string. The difference from {@link TopicName#get(String)} is that the `get` + * method can leverage the built-in cache mechanism, which can be slightly faster, but at the cost of higher JVM + * memory usage that could lead to unnecessary GC, as well as potentially OOM in extreme cases. + * You can benchmark `TopicName.get(topic)` and `new TopicName(topic)` to see the actual performance gap. * * @param completeTopicName the topic name - * @param initializeNamespaceName whether to initializing the internal {@link NamespaceName} field */ @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") - public TopicName(String completeTopicName, boolean initializeNamespaceName) { + public TopicName(String completeTopicName) { try { // The topic name can be in two different forms, one is fully qualified topic name, // the other one is short topic name @@ -179,10 +176,6 @@ public TopicName(String completeTopicName, boolean initializeNamespaceName) { this.domain = TopicDomain.getEnum(completeTopicName.substring(0, index)); } - if (initializeNamespaceName) { - getNamespaceObject(); - } - if (StringUtils.isBlank(localName)) { throw new IllegalArgumentException(String.format("Invalid topic name: %s. Topic local name must not" + " be blank.", completeTopicName)); From e88dd75a30a1b21fae7de94c9b7a48db8c0eb98b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 25 Jul 2025 20:47:07 +0800 Subject: [PATCH 3/4] Revert unnecessary changes --- .../pulsar/broker/qos/TopicNameBenchmark.java | 69 ------------------- .../pulsar/common/naming/TopicName.java | 10 +-- 2 files changed, 1 insertion(+), 78 deletions(-) delete mode 100644 microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java diff --git a/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java deleted file mode 100644 index 91bf4c9dd5065..0000000000000 --- a/microbench/src/main/java/org/apache/pulsar/broker/qos/TopicNameBenchmark.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.qos; - -import java.util.concurrent.TimeUnit; -import org.apache.pulsar.common.naming.TopicName; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.Threads; -import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.infra.Blackhole; - -@Fork(1) -@BenchmarkMode(Mode.Throughput) -@OutputTimeUnit(TimeUnit.SECONDS) -@State(Scope.Thread) -public class TopicNameBenchmark { - - private static final String[] topicBases = {"test", - "tenant/ns/test", - "persistent://another-tenant/another-ns/test" - }; - - @Threads(1) - @Benchmark - @Warmup(time = 5, iterations = 1) - @Measurement(time = 5, iterations = 1) - public void testReadFromCache(Blackhole blackhole) { - for (int i = 0; i < 10000; i++) { - for (final var topicBase : topicBases) { - blackhole.consume(TopicName.get(topicBase + i)); - } - } - } - - @Threads(1) - @Benchmark - @Warmup(time = 5, iterations = 1) - @Measurement(time = 5, iterations = 1) - public void testConstruct(Blackhole blackhole) { - for (int i = 0; i < 10000; i++) { - for (final var topicBase : topicBases) { - blackhole.consume(new TopicName(topicBase + i)); - } - } - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 2f49835c5f38e..c234bf51b32a7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -110,16 +110,8 @@ public static String getPattern(String topic) { return "^" + Pattern.quote(get(topic).getPartitionedTopicName().toString()) + "$"; } - /** - * The constructor from a topic name string. The difference from {@link TopicName#get(String)} is that the `get` - * method can leverage the built-in cache mechanism, which can be slightly faster, but at the cost of higher JVM - * memory usage that could lead to unnecessary GC, as well as potentially OOM in extreme cases. - * You can benchmark `TopicName.get(topic)` and `new TopicName(topic)` to see the actual performance gap. - * - * @param completeTopicName the topic name - */ @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") - public TopicName(String completeTopicName) { + private TopicName(String completeTopicName) { try { // The topic name can be in two different forms, one is fully qualified topic name, // the other one is short topic name From e1474467c93d3cd7127bf6feceba83751004ef50 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 25 Jul 2025 23:06:11 +0800 Subject: [PATCH 4/4] Revert NamespaceName lazy initialization --- .../apache/pulsar/common/naming/TopicName.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index c234bf51b32a7..d60b11e5d783a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -45,7 +45,7 @@ public class TopicName implements ServiceUnitId { private final String namespacePortion; private final String localName; - private volatile NamespaceName namespaceName; + private final NamespaceName namespaceName; private final int partitionIndex; @@ -138,6 +138,7 @@ private TopicName(String completeTopicName) { } this.partitionIndex = getPartitionIndex(localName); this.completeTopicName = domain.name() + "://" + tenant + "/" + namespacePortion + "/" + localName; + this.namespaceName = NamespaceName.get(tenant, namespacePortion); } else { // The fully qualified topic name can be in two different forms: // new: persistent://tenant/namespace/topic @@ -154,6 +155,7 @@ private TopicName(String completeTopicName) { this.namespacePortion = parts.get(1); this.localName = parts.get(2); this.partitionIndex = getPartitionIndex(localName); + this.namespaceName = NamespaceName.get(tenant, namespacePortion); } else if (parts.size() == 4) { // Legacy topic name that includes cluster name this.tenant = parts.get(0); @@ -161,6 +163,7 @@ private TopicName(String completeTopicName) { this.namespacePortion = parts.get(2); this.localName = parts.get(3); this.partitionIndex = getPartitionIndex(localName); + namespaceName = NamespaceName.get(tenant, cluster, namespacePortion); } else { throw new IllegalArgumentException("Invalid topic name " + completeTopicName); } @@ -200,16 +203,6 @@ public String getNamespace() { */ @Override public NamespaceName getNamespaceObject() { - if (namespaceName != null) { - return namespaceName; - } - synchronized (this) { - if (cluster == null) { - namespaceName = NamespaceName.get(tenant, namespacePortion); - } else { - namespaceName = NamespaceName.get(tenant, cluster, namespacePortion); - } - } return namespaceName; }