Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f91475d
Revert "[improve] [broker] Improve CPU resources usege of TopicName C…
lhotari Jun 24, 2025
61f46d7
Replace Guava Cache with Caffeine cache to fix original performance i…
lhotari Jun 24, 2025
e9228c0
Deduplicate String instances to reduce heap usage
lhotari Jun 24, 2025
67b3c0d
Use interner to deduplicate TopicName and NamespaceName instances
lhotari Jun 24, 2025
6085a66
Use soft values in the cache to ensure that caches will be dropped on…
lhotari Jun 24, 2025
1ad049b
Switch to use expireAfterWrite and configure scheduler to remove expi…
lhotari Jun 24, 2025
99c683f
Add comments
lhotari Jun 24, 2025
3ff6954
Add simple microbenchmark
lhotari Jun 24, 2025
aea3e1d
Add invalidateCache methods to TopicName and NamespaceName
lhotari Jun 24, 2025
db7c0f9
Improve benchmark
lhotari Jun 24, 2025
a83960d
Improve benchmark
lhotari Jun 24, 2025
59725fe
Test warm lookups to cache
lhotari Jun 24, 2025
7f372a9
Refactor
lhotari Jun 24, 2025
88fb5b6
Remove keysInAddedOrder
lhotari Jun 24, 2025
cf2a0d1
Add comment
lhotari Jun 24, 2025
07451c7
Improve logic
lhotari Jun 24, 2025
f1801ae
Improve logic
lhotari Jun 24, 2025
d75d213
cache maxsize check
lhotari Jun 24, 2025
82c8167
Improve logic
lhotari Jun 24, 2025
895e176
Refactor both TopicName and NamespaceName to use the same generic cac…
lhotari Jun 24, 2025
6ff60d3
Remove redundant removal
lhotari Jun 24, 2025
7c37999
Rename oldestKey -> key since keys are in hash order
lhotari Jun 24, 2025
41b06dd
Revisit test that used reflection to access the previous cache implem…
lhotari Jun 24, 2025
98a58b6
Intern "public" and "default" for consistency
lhotari Jun 24, 2025
097bf11
Add size method to NameCache
lhotari Jun 24, 2025
05bc206
Test shrinking the cache
lhotari Jun 24, 2025
74aa80b
Add test for soft reference handling
lhotari Jun 24, 2025
e9894b9
Add comment about the cache configuration
lhotari Jun 24, 2025
e4493d2
Improve comments
lhotari Jun 24, 2025
1aaf0a4
Refactor: replace functions with abstract methods in NameCache
lhotari Jun 24, 2025
86bdc30
Reset cache settings to defaults after the test
lhotari Jun 24, 2025
d3a002e
Remove duplicate check calling NamedEntity.checkName
lhotari Jun 24, 2025
73abc0b
Improve benchmark, add test with strong references to avoid soft refe…
lhotari Jun 25, 2025
c4da223
Fix benchmark
lhotari Jun 25, 2025
eb8510c
Improve async-profiler example
lhotari Jun 25, 2025
12cb96b
Reduce max heap for benchmark
lhotari Jun 25, 2025
d63212d
Invalidate caches when after adding strong references
lhotari Jun 25, 2025
3f24353
Replace invalidateCache and strongReferences with TestMode enum
lhotari Jun 25, 2025
12a6b7c
Address review comment
lhotari Jun 25, 2025
10e1d60
Optimize performance of running maintenance tasks by using System.cur…
lhotari Jun 25, 2025
75ce527
Reduce default maintenance task interval to 5 seconds
lhotari Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,6 @@ skipBrokerShutdownOnOOM=false
# Factory class-name to create topic with custom workflow
topicFactoryClassName=

# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache
# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName.
topicNameCacheMaxCapacity=100000

# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when
# there are too many topics are in use.
maxSecondsToClearTopicNameCache=7200

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
9 changes: 6 additions & 3 deletions microbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp
Profiling benchmarks with [async-profiler](https://github.com/async-profiler/async-profiler):

```shell
# example of profiling with async-profiler
# download async-profiler from https://github.com/async-profiler/async-profiler/releases
# example of profiling with async-profiler 4.0
# download async-profiler 4.0 from https://github.com/async-profiler/async-profiler/releases
# macos
LIBASYNCPROFILER_PATH=$HOME/async-profiler/lib/libasyncProfiler.dylib
java -jar microbench/target/microbenchmarks.jar -prof async:libPath=$LIBASYNCPROFILER_PATH\;output=flamegraph\;dir=profile-results ".*BenchmarkName.*"
# linux
LIBASYNCPROFILER_PATH=$HOME/async-profiler/lib/libasyncProfiler.so
java -jar microbench/target/microbenchmarks.jar -prof async:libPath=$LIBASYNCPROFILER_PATH\;output=flamegraph\;dir=profile-results\;rawCommand=cstack=vmx ".*BenchmarkName.*"
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.common.naming;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.IterationParams;
import org.openjdk.jmh.runner.IterationType;

/**
* Benchmark TopicName.get performance.
*/
@Fork(value = 3, jvmArgs = {"-Xms500m", "-Xmx500m", "-XX:+UseG1GC"})
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
public class TopicNameBenchmark {
public static final int MAX_TOPICS = 100000;

public enum TestMode {
basic,
invalidateCache,
strongReferences
}


@State(Scope.Benchmark)
public static class BenchmarkState {
public static final int PAUSE_MILLIS_BEFORE_MEASUREMENT = 5000;
private static final AtomicBoolean paused = new AtomicBoolean(false);
@Param
private TestMode testMode;
private String[] topicNames;
// Used to hold strong references to TopicName objects when strongReferences is true.
// This is to prevent them from being garbage collected during the benchmark since the cache holds soft refs.
private TopicName[] strongTopicNameReferences;

@Setup(Level.Trial)
public void setup() {
topicNames = new String[MAX_TOPICS];
for (int i = 0; i < topicNames.length; i++) {
topicNames[i] = String.format("persistent://tenant-%d/ns-%d/topic-%d", i % 100, i % 1000, i);
}
if (testMode == TestMode.strongReferences) {
strongTopicNameReferences = new TopicName[MAX_TOPICS];
for (int i = 0; i < topicNames.length; i++) {
strongTopicNameReferences[i] = TopicName.get(topicNames[i]);
}
}
}

@Setup(Level.Iteration)
public void pauseBetweenWarmupAndMeasurement(IterationParams params) throws InterruptedException {
if (params.getType() == IterationType.MEASUREMENT && paused.compareAndSet(false, true)) {
System.out.println("Pausing before starting measurement iterations...");
// pause to allow JIT compilation to happen before measurement starts
Thread.sleep(PAUSE_MILLIS_BEFORE_MEASUREMENT);
System.out.println("Starting measurement iterations...");
}
}

@TearDown(Level.Iteration)
public void tearDown() {
if (testMode == TestMode.invalidateCache) {
TopicName.invalidateCache();
NamespaceName.invalidateCache();
}
}

public String getNextTopicName(long counter) {
return topicNames[(int) (counter % topicNames.length)];
}
}

@State(Scope.Thread)
public static class TestState {
private long counter = 0;

@TearDown(Level.Iteration)
public void tearDown() {
counter = 0;
}

public TopicName runTest(BenchmarkState benchmarkState) {
return TopicName.get(benchmarkState.getNextTopicName(counter++));
}
}

@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@Threads(1)
public TopicName topicLookup001(BenchmarkState benchmarkState, TestState state) {
return state.runTest(benchmarkState);
}

@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@Threads(10)
public TopicName topicLookup010(BenchmarkState benchmarkState, TestState state) {
return state.runTest(benchmarkState);
}

@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@Threads(100)
public TopicName topicLookup100(BenchmarkState benchmarkState, TestState state) {
return state.runTest(benchmarkState);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.naming;
Original file line number Diff line number Diff line change
Expand Up @@ -603,21 +603,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean backlogQuotaCheckEnabled = true;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache"
+ " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName."
)
private int topicNameCacheMaxCapacity = 100_000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache"
+ " frequently when there are too many topics are in use."
)
private int maxSecondsToClearTopicNameCache = 3600 * 2;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Whether to enable precise time based backlog quota check. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,16 +641,6 @@ public void start() throws Exception {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
this.startClearInvalidateTopicNameCacheTask();
}

protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
}

protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertSame;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -59,8 +56,6 @@ protected void doInitConf() throws Exception {
super.doInitConf();
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTopicNameCacheMaxCapacity(5000);
conf.setMaxSecondsToClearTopicNameCache(5);
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
Expand Down Expand Up @@ -195,34 +190,6 @@ public void testDynamicBrokerPort() throws Exception {
assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get());
}

@Test
public void testTopicCacheConfiguration() throws Exception {
cleanup();
setup();
assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5);

List<TopicName> topicNameCached = new ArrayList<>();
for (int i = 0; i < 20; i++) {
topicNameCached.add(TopicName.get("public/default/tp_" + i));
}

// Verify: the cache does not clear since it is not larger than max capacity.
Thread.sleep(10 * 1000);
for (int i = 0; i < 20; i++) {
assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i));
}

// Update max capacity.
admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10");

// Verify: the cache were cleared.
Thread.sleep(10 * 1000);
for (int i = 0; i < 20; i++) {
assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i));
}
}

@Test
public void testBacklogAndRetentionCheck() throws PulsarServerException {
ServiceConfiguration config = new ServiceConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -68,6 +67,8 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
Expand All @@ -89,8 +90,8 @@
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
Expand Down Expand Up @@ -1296,10 +1297,10 @@ public void testReplicationCountMetrics() throws Exception {
*/
@Test
public void testCloseTopicAfterStartReplicationFailed() throws Exception {
Field fieldTopicNameCache = TopicName.class.getDeclaredField("cache");
Field fieldTopicNameCache =
ClassUtils.getClass("org.apache.pulsar.common.naming.TopicNameCache").getDeclaredField("INSTANCE");
fieldTopicNameCache.setAccessible(true);
ConcurrentHashMap<String, TopicName> topicNameCache =
(ConcurrentHashMap<String, TopicName>) fieldTopicNameCache.get(null);
Object topicNameCache = fieldTopicNameCache.get(null);
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
// 1.Create topic, does not enable replication now.
admin1.topics().createNonPartitionedTopic(topicName);
Expand All @@ -1324,9 +1325,9 @@ public void testCloseTopicAfterStartReplicationFailed() throws Exception {
// - Since the topic should not be touched anymore, we use "TopicName" to confirm whether it be used by
// Replication again.
Thread.sleep(10 * 1000);
topicNameCache.remove(topicName);
MethodUtils.invokeMethod(topicNameCache, "invalidateCache");
Thread.sleep(60 * 1000);
assertTrue(!topicNameCache.containsKey(topicName));
assertNull(MethodUtils.invokeMethod(topicNameCache, "getIfPresent", topicName));

// cleanup.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public void testInitialize() throws Exception {
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200);
assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -75,8 +74,6 @@ public void testInit() throws Exception {
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
assertEquals(config.isCreateTopicToRemoteClusterForReplication(), false);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
Expand Down Expand Up @@ -384,17 +381,6 @@ public void testAllowAutoTopicCreationType() throws Exception {
assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED);
}

@Test
public void testTopicNameCacheConfiguration() throws Exception {
ServiceConfiguration conf;
final Properties properties = new Properties();
properties.setProperty("maxSecondsToClearTopicNameCache", "2");
properties.setProperty("topicNameCacheMaxCapacity", "100");
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
}

@Test
public void testLookupProperties() throws Exception {
var confFile = "lookup.key1=value1\nkey=value\nlookup.key2=value2";
Expand Down
Loading
Loading