diff --git a/conf/broker.conf b/conf/broker.conf index 13cbb9528f462..423e61243e2dc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -159,14 +159,6 @@ skipBrokerShutdownOnOOM=false # Factory class-name to create topic with custom workflow topicFactoryClassName= -# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache -# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. -topicNameCacheMaxCapacity=100000 - -# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when -# there are too many topics are in use. -maxSecondsToClearTopicNameCache=7200 - # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/microbench/README.md b/microbench/README.md index 5c69c3bba819a..d316162bef6fd 100644 --- a/microbench/README.md +++ b/microbench/README.md @@ -70,9 +70,12 @@ java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp Profiling benchmarks with [async-profiler](https://github.com/async-profiler/async-profiler): ```shell -# example of profiling with async-profiler -# download async-profiler from https://github.com/async-profiler/async-profiler/releases +# example of profiling with async-profiler 4.0 +# download async-profiler 4.0 from https://github.com/async-profiler/async-profiler/releases +# macos LIBASYNCPROFILER_PATH=$HOME/async-profiler/lib/libasyncProfiler.dylib -java -jar microbench/target/microbenchmarks.jar -prof async:libPath=$LIBASYNCPROFILER_PATH\;output=flamegraph\;dir=profile-results ".*BenchmarkName.*" +# linux +LIBASYNCPROFILER_PATH=$HOME/async-profiler/lib/libasyncProfiler.so +java -jar microbench/target/microbenchmarks.jar -prof async:libPath=$LIBASYNCPROFILER_PATH\;output=flamegraph\;dir=profile-results\;rawCommand=cstack=vmx ".*BenchmarkName.*" ``` diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java new file mode 100644 index 0000000000000..535fc39e5f931 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/TopicNameBenchmark.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.common.naming; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.IterationParams; +import org.openjdk.jmh.runner.IterationType; + +/** + * Benchmark TopicName.get performance. + */ +@Fork(value = 3, jvmArgs = {"-Xms500m", "-Xmx500m", "-XX:+UseG1GC"}) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class TopicNameBenchmark { + public static final int MAX_TOPICS = 100000; + + public enum TestMode { + basic, + invalidateCache, + strongReferences + } + + + @State(Scope.Benchmark) + public static class BenchmarkState { + public static final int PAUSE_MILLIS_BEFORE_MEASUREMENT = 5000; + private static final AtomicBoolean paused = new AtomicBoolean(false); + @Param + private TestMode testMode; + private String[] topicNames; + // Used to hold strong references to TopicName objects when strongReferences is true. + // This is to prevent them from being garbage collected during the benchmark since the cache holds soft refs. + private TopicName[] strongTopicNameReferences; + + @Setup(Level.Trial) + public void setup() { + topicNames = new String[MAX_TOPICS]; + for (int i = 0; i < topicNames.length; i++) { + topicNames[i] = String.format("persistent://tenant-%d/ns-%d/topic-%d", i % 100, i % 1000, i); + } + if (testMode == TestMode.strongReferences) { + strongTopicNameReferences = new TopicName[MAX_TOPICS]; + for (int i = 0; i < topicNames.length; i++) { + strongTopicNameReferences[i] = TopicName.get(topicNames[i]); + } + } + } + + @Setup(Level.Iteration) + public void pauseBetweenWarmupAndMeasurement(IterationParams params) throws InterruptedException { + if (params.getType() == IterationType.MEASUREMENT && paused.compareAndSet(false, true)) { + System.out.println("Pausing before starting measurement iterations..."); + // pause to allow JIT compilation to happen before measurement starts + Thread.sleep(PAUSE_MILLIS_BEFORE_MEASUREMENT); + System.out.println("Starting measurement iterations..."); + } + } + + @TearDown(Level.Iteration) + public void tearDown() { + if (testMode == TestMode.invalidateCache) { + TopicName.invalidateCache(); + NamespaceName.invalidateCache(); + } + } + + public String getNextTopicName(long counter) { + return topicNames[(int) (counter % topicNames.length)]; + } + } + + @State(Scope.Thread) + public static class TestState { + private long counter = 0; + + @TearDown(Level.Iteration) + public void tearDown() { + counter = 0; + } + + public TopicName runTest(BenchmarkState benchmarkState) { + return TopicName.get(benchmarkState.getNextTopicName(counter++)); + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Threads(1) + public TopicName topicLookup001(BenchmarkState benchmarkState, TestState state) { + return state.runTest(benchmarkState); + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Threads(10) + public TopicName topicLookup010(BenchmarkState benchmarkState, TestState state) { + return state.runTest(benchmarkState); + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Threads(100) + public TopicName topicLookup100(BenchmarkState benchmarkState, TestState state) { + return state.runTest(benchmarkState); + } +} diff --git a/microbench/src/main/java/org/apache/pulsar/common/naming/package-info.java b/microbench/src/main/java/org/apache/pulsar/common/naming/package-info.java new file mode 100644 index 0000000000000..f9843c1f09934 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/common/naming/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4ba326f31f6e9..829f094beb52f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -603,21 +603,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean backlogQuotaCheckEnabled = true; - @FieldContext( - dynamic = true, - category = CATEGORY_POLICIES, - doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache" - + " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName." - ) - private int topicNameCacheMaxCapacity = 100_000; - - @FieldContext( - category = CATEGORY_POLICIES, - doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache" - + " frequently when there are too many topics are in use." - ) - private int maxSecondsToClearTopicNameCache = 3600 * 2; - @FieldContext( category = CATEGORY_POLICIES, doc = "Whether to enable precise time based backlog quota check. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 359c0daf5b8ea..f5dc5344cf5b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -641,16 +641,6 @@ public void start() throws Exception { this.updateBrokerDispatchThrottlingMaxRate(); this.startCheckReplicationPolicies(); this.startDeduplicationSnapshotMonitor(); - this.startClearInvalidateTopicNameCacheTask(); - } - - protected void startClearInvalidateTopicNameCacheTask() { - final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache(); - inactivityMonitor.scheduleAtFixedRate( - () -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()), - maxSecondsToClearTopicNameCache, - maxSecondsToClearTopicNameCache, - TimeUnit.SECONDS); } protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index dd2f9288071a5..5e44fe408112a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -24,14 +24,11 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertSame; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.testng.annotations.AfterMethod; @@ -59,8 +56,6 @@ protected void doInitConf() throws Exception { super.doInitConf(); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); - conf.setTopicNameCacheMaxCapacity(5000); - conf.setMaxSecondsToClearTopicNameCache(5); if (useStaticPorts) { conf.setBrokerServicePortTls(Optional.of(6651)); conf.setBrokerServicePort(Optional.of(6660)); @@ -195,34 +190,6 @@ public void testDynamicBrokerPort() throws Exception { assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get()); } - @Test - public void testTopicCacheConfiguration() throws Exception { - cleanup(); - setup(); - assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000); - assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5); - - List topicNameCached = new ArrayList<>(); - for (int i = 0; i < 20; i++) { - topicNameCached.add(TopicName.get("public/default/tp_" + i)); - } - - // Verify: the cache does not clear since it is not larger than max capacity. - Thread.sleep(10 * 1000); - for (int i = 0; i < 20; i++) { - assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); - } - - // Update max capacity. - admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10"); - - // Verify: the cache were cleared. - Thread.sleep(10 * 1000); - for (int i = 0; i < 20; i++) { - assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); - } - } - @Test public void testBacklogAndRetentionCheck() throws PulsarServerException { ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 1244300378a8c..fb838e7efeabe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -50,7 +50,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,6 +67,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.reflect.MethodUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; @@ -89,8 +90,8 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -1296,10 +1297,10 @@ public void testReplicationCountMetrics() throws Exception { */ @Test public void testCloseTopicAfterStartReplicationFailed() throws Exception { - Field fieldTopicNameCache = TopicName.class.getDeclaredField("cache"); + Field fieldTopicNameCache = + ClassUtils.getClass("org.apache.pulsar.common.naming.TopicNameCache").getDeclaredField("INSTANCE"); fieldTopicNameCache.setAccessible(true); - ConcurrentHashMap topicNameCache = - (ConcurrentHashMap) fieldTopicNameCache.get(null); + Object topicNameCache = fieldTopicNameCache.get(null); final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); // 1.Create topic, does not enable replication now. admin1.topics().createNonPartitionedTopic(topicName); @@ -1324,9 +1325,9 @@ public void testCloseTopicAfterStartReplicationFailed() throws Exception { // - Since the topic should not be touched anymore, we use "TopicName" to confirm whether it be used by // Replication again. Thread.sleep(10 * 1000); - topicNameCache.remove(topicName); + MethodUtils.invokeMethod(topicNameCache, "invalidateCache"); Thread.sleep(60 * 1000); - assertTrue(!topicNameCache.containsKey(topicName)); + assertNull(MethodUtils.invokeMethod(topicNameCache, "getIfPresent", topicName)); // cleanup. admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index 541408b781be2..7089aa12d2cad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -63,8 +63,6 @@ public void testInitialize() throws Exception { assertEquals(standalone.getConfig().getAdvertisedListeners(), "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651"); assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true); - assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); - assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200); assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(), true); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 5972c6f724d8c..5f513938858ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import java.beans.Introspector; import java.beans.PropertyDescriptor; import java.io.ByteArrayInputStream; @@ -75,8 +74,6 @@ public void testInit() throws Exception { assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05); assertEquals(config.getHttpMaxRequestHeaderSize(), 1234); assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); - assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); - assertEquals(config.getTopicNameCacheMaxCapacity(), 200); assertEquals(config.isCreateTopicToRemoteClusterForReplication(), false); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); @@ -384,17 +381,6 @@ public void testAllowAutoTopicCreationType() throws Exception { assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED); } - @Test - public void testTopicNameCacheConfiguration() throws Exception { - ServiceConfiguration conf; - final Properties properties = new Properties(); - properties.setProperty("maxSecondsToClearTopicNameCache", "2"); - properties.setProperty("topicNameCacheMaxCapacity", "100"); - conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); - assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); - assertEquals(conf.getTopicNameCacheMaxCapacity(), 100); - } - @Test public void testLookupProperties() throws Exception { var confFile = "lookup.key1=value1\nkey=value\nlookup.key2=value2"; diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 0fdb29e06866f..6bd949e4c25c5 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -104,6 +104,4 @@ transactionPendingAckBatchedWriteEnabled=true transactionPendingAckBatchedWriteMaxRecords=44 transactionPendingAckBatchedWriteMaxSize=55 transactionPendingAckBatchedWriteMaxDelayInMillis=66 -topicNameCacheMaxCapacity=200 -maxSecondsToClearTopicNameCache=1 createTopicToRemoteClusterForReplication=false diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index d3f9430f29b48..085457067163b 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -95,6 +95,4 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true -topicNameCacheMaxCapacity=200 -maxSecondsToClearTopicNameCache=1 createTopicToRemoteClusterForReplication=true diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java new file mode 100644 index 0000000000000..b801fe9f4564a --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NameCache.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.common.util.StringInterner; + +/** + * A cache for TopicName and NamespaceName instances that allows deduplication and efficient memory usage. + * It uses soft references to allow garbage collection of unused instances under heavy memory pressure. + * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache + * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for TopicName lookups. + */ +abstract class NameCache { + // Cache instances using ConcurrentHashMap and SoftReference to allow garbage collection to clear unreferenced + // entries when heap memory is running low. + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + // Reference queue to hold cleared soft references, which will be used to remove entries from the cache. + private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + // Flag to indicate if the cache size needs to be reduced. This is set when the cache exceeds the maximum size. + private final AtomicBoolean cacheShrinkNeeded = new AtomicBoolean(false); + // Next timestamp to run cache maintenance. Handled when cache is accessed. + private final AtomicLong nextCacheMaintenance = new AtomicLong(); + // Deduplicates instances when the cached entry isn't in the actual cache. + // Holds weak references to the value so it won't prevent garbage collection. + private final Interner valueInterner = Interners.newWeakInterner(); + + // Values are held as soft references to allow garbage collection when memory is low. + private final class SoftReferenceValue extends SoftReference { + private final String key; + + public SoftReferenceValue(String key, V referent, ReferenceQueue q) { + super(referent, q); + this.key = key; + } + + public String getKey() { + return key; + } + } + + protected abstract V createValue(String key); + + protected abstract int getCacheMaxSize(); + + protected abstract int getReduceSizeByPercentage(); + + protected abstract long getMaintenanceTaskIntervalMillis(); + + public void invalidateCache() { + cache.clear(); + } + + public V getIfPresent(String keyParam) { + SoftReferenceValue softReferenceValue = cache.get(keyParam); + return softReferenceValue != null ? softReferenceValue.get() : null; + } + public V get(String keyParam) { + // first do a quick lookup in the cache + V valueInstance = getIfPresent(keyParam); + if (valueInstance == null) { + // intern the topic name to deduplicate topic names used as keys, since this will reduce heap memory usage + keyParam = StringInterner.intern(keyParam); + // add new entry or replace the possible stale entry + valueInstance = cache.compute(keyParam, (key, existingRef) -> { + if (existingRef == null || existingRef.get() == null) { + return createSoftReferenceValue(key); + } + return existingRef; + }).get(); + if (cache.size() > getCacheMaxSize()) { + cacheShrinkNeeded.set(true); + } + } + doCacheMaintenance(); + return valueInstance; + } + + private void doCacheMaintenance() { + long localNextCacheMaintenance = nextCacheMaintenance.get(); + long now = System.currentTimeMillis(); + if (now > localNextCacheMaintenance) { + if (cacheShrinkNeeded.compareAndSet(true, false)) { + shrinkCacheSize(); + } + if (nextCacheMaintenance.compareAndSet(localNextCacheMaintenance, + now + getMaintenanceTaskIntervalMillis())) { + purgeReferenceQueue(); + } + } + } + + private SoftReferenceValue createSoftReferenceValue(String key) { + V valueInstance = valueInterner.intern(createValue(key)); + return new SoftReferenceValue(key, valueInstance, referenceQueue); + } + + private void shrinkCacheSize() { + int cacheMaxSizeAsInt = getCacheMaxSize(); + if (cache.size() > cacheMaxSizeAsInt) { + // Reduce the cache size after reaching the maximum size + int reduceSizeBy = + cache.size() - (int) (cacheMaxSizeAsInt * ((100 - getReduceSizeByPercentage()) / 100.0)); + // this doesn't remove the oldest entries, but rather reduces the size by a percentage + // keeping the order of added entries would add more overhead and Caffeine Cache would be a better fit + // in that case. + for (String key : cache.keySet()) { + if (reduceSizeBy <= 0) { + break; + } + SoftReferenceValue ref = cache.remove(key); + if (ref != null) { + ref.clear(); + } + reduceSizeBy--; + } + } + } + + private void purgeReferenceQueue() { + // Clean up the reference queue to remove any references cleared by the garbage collector. + while (true) { + SoftReferenceValue ref = (SoftReferenceValue) referenceQueue.poll(); + if (ref == null) { + break; + } + cache.remove(ref.getKey()); + } + } + + public int size() { + return cache.size(); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index a804e7b6506ad..2380162dcba10 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -18,15 +18,10 @@ */ package org.apache.pulsar.common.naming; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.util.StringInterner; /** * Parser of a value from the namespace field provided in configuration. @@ -39,23 +34,26 @@ public class NamespaceName implements ServiceUnitId { private final String cluster; private final String localName; - private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { - @Override - public NamespaceName load(String name) throws Exception { - return new NamespaceName(name); - } - }); + public static void invalidateCache() { + NamespaceNameCache.INSTANCE.invalidateCache(); + } public static final NamespaceName SYSTEM_NAMESPACE = NamespaceName.get("pulsar/system"); public static NamespaceName get(String tenant, String namespace) { - validateNamespaceName(tenant, namespace); + if ((tenant == null || tenant.isEmpty()) || (namespace == null || namespace.isEmpty())) { + throw new IllegalArgumentException( + String.format("Invalid namespace format. namespace: %s/%s", tenant, namespace)); + } return get(tenant + '/' + namespace); } public static NamespaceName get(String tenant, String cluster, String namespace) { - validateNamespaceName(tenant, cluster, namespace); + if ((tenant == null || tenant.isEmpty()) || (cluster == null || cluster.isEmpty()) + || (namespace == null || namespace.isEmpty())) { + throw new IllegalArgumentException( + String.format("Invalid namespace format. namespace: %s/%s/%s", tenant, cluster, namespace)); + } return get(tenant + '/' + cluster + '/' + namespace); } @@ -63,17 +61,11 @@ public static NamespaceName get(String namespace) { if (namespace == null || namespace.isEmpty()) { throw new IllegalArgumentException("Invalid null namespace: " + namespace); } - try { - return cache.get(namespace); - } catch (ExecutionException e) { - throw (RuntimeException) e.getCause(); - } catch (UncheckedExecutionException e) { - throw (RuntimeException) e.getCause(); - } + return NamespaceNameCache.INSTANCE.get(namespace); } public static Optional getIfValid(String namespace) { - NamespaceName ns = cache.getIfPresent(namespace); + NamespaceName ns = NamespaceNameCache.INSTANCE.getIfPresent(namespace); if (ns != null) { return Optional.of(ns); } @@ -91,7 +83,7 @@ public static Optional getIfValid(String namespace) { } @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") - private NamespaceName(String namespace) { + NamespaceName(String namespace) { // Verify it's a proper namespace // The namespace name is composed of / // or in the legacy format with the cluster name: @@ -103,16 +95,16 @@ private NamespaceName(String namespace) { // New style namespace : / validateNamespaceName(parts[0], parts[1]); - tenant = parts[0]; + tenant = StringInterner.intern(parts[0]); cluster = null; - localName = parts[1]; + localName = StringInterner.intern(parts[1]); } else if (parts.length == 3) { // Old style namespace: // validateNamespaceName(parts[0], parts[1], parts[2]); - tenant = parts[0]; - cluster = parts[1]; - localName = parts[2]; + tenant = StringInterner.intern(parts[0]); + cluster = StringInterner.intern(parts[1]); + localName = StringInterner.intern(parts[2]); } else { throw new IllegalArgumentException("Invalid namespace format. namespace: " + namespace); } @@ -121,7 +113,7 @@ private NamespaceName(String namespace) { + " expected / or // " + "but got: " + namespace, e); } - this.namespace = namespace; + this.namespace = StringInterner.intern(namespace); } public String getTenant() { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java new file mode 100644 index 0000000000000..06552344a1622 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceNameCache.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import java.util.concurrent.TimeUnit; + +/** + * An efficient cache for NamespaceName instances that allows deduplication and efficient memory usage. + * It uses soft references to allow garbage collection of unused NamespaceName instances under heavy memory pressure. + * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache + * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for cache lookups. + */ +class NamespaceNameCache extends NameCache { + static final NamespaceNameCache INSTANCE = new NamespaceNameCache(); + // Configuration for the cache. These settings aren't currently exposed to end users. + static int cacheMaxSize = 100000; + static int reduceSizeByPercentage = 25; + static long cacheMaintenanceTaskIntervalMillis = TimeUnit.SECONDS.toMillis(5); + + @Override + protected NamespaceName createValue(String key) { + return new NamespaceName(key); + } + + @Override + protected int getCacheMaxSize() { + return cacheMaxSize; + } + + @Override + protected int getReduceSizeByPercentage() { + return reduceSizeByPercentage; + } + + @Override + protected long getMaintenanceTaskIntervalMillis() { + return cacheMaintenanceTaskIntervalMillis; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index b2f96bfe6e259..bb1923760a71e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -23,17 +23,16 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.StringInterner; /** * Encapsulate the parsing of the completeTopicName name. */ public class TopicName implements ServiceUnitId { - - public static final String PUBLIC_TENANT = "public"; - public static final String DEFAULT_NAMESPACE = "default"; + public static final String PUBLIC_TENANT = StringInterner.intern("public"); + public static final String DEFAULT_NAMESPACE = StringInterner.intern("default"); public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-"; @@ -49,16 +48,8 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; - private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); - - public static void clearIfReachedMaxCapacity(int maxCapacity) { - if (maxCapacity < 0) { - // Unlimited cache. - return; - } - if (cache.size() > maxCapacity) { - cache.clear(); - } + public static void invalidateCache() { + TopicNameCache.INSTANCE.invalidateCache(); } public static TopicName get(String domain, NamespaceName namespaceName, String topic) { @@ -78,11 +69,7 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - TopicName tp = cache.get(topic); - if (tp != null) { - return tp; - } - return cache.computeIfAbsent(topic, k -> new TopicName(k)); + return TopicNameCache.INSTANCE.get(topic); } public static TopicName getPartitionedTopicName(String topic) { @@ -111,7 +98,7 @@ public static String getPattern(String topic) { } @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") - private TopicName(String completeTopicName) { + TopicName(String completeTopicName) { try { // The topic name can be in two different forms, one is fully qualified topic name, // the other one is short topic name @@ -152,18 +139,18 @@ private TopicName(String completeTopicName) { parts = Splitter.on("/").limit(4).splitToList(rest); if (parts.size() == 3) { // New topic name without cluster name - this.tenant = parts.get(0); + this.tenant = StringInterner.intern(parts.get(0)); this.cluster = null; - this.namespacePortion = parts.get(1); - this.localName = parts.get(2); + this.namespacePortion = StringInterner.intern(parts.get(1)); + this.localName = StringInterner.intern(parts.get(2)); this.partitionIndex = getPartitionIndex(completeTopicName); this.namespaceName = NamespaceName.get(tenant, namespacePortion); } else if (parts.size() == 4) { // Legacy topic name that includes cluster name - this.tenant = parts.get(0); - this.cluster = parts.get(1); - this.namespacePortion = parts.get(2); - this.localName = parts.get(3); + this.tenant = StringInterner.intern(parts.get(0)); + this.cluster = StringInterner.intern(parts.get(1)); + this.namespacePortion = StringInterner.intern(parts.get(2)); + this.localName = StringInterner.intern(parts.get(3)); this.partitionIndex = getPartitionIndex(completeTopicName); this.namespaceName = NamespaceName.get(tenant, cluster, namespacePortion); } else { @@ -179,12 +166,12 @@ private TopicName(String completeTopicName) { throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e); } if (isV2()) { - this.completeTopicName = String.format("%s://%s/%s/%s", - domain, tenant, namespacePortion, localName); + this.completeTopicName = StringInterner.intern(String.format("%s://%s/%s/%s", + domain, tenant, namespacePortion, localName)); } else { - this.completeTopicName = String.format("%s://%s/%s/%s/%s", + this.completeTopicName = StringInterner.intern(String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, - namespacePortion, localName); + namespacePortion, localName)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java new file mode 100644 index 0000000000000..cd076336721ae --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicNameCache.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import java.util.concurrent.TimeUnit; + +/** + * A cache for TopicName instances that allows deduplication and efficient memory usage. + * It uses soft references to allow garbage collection of unused TopicName instances under heavy memory pressure. + * This cache uses ConcurrentHashMap for lookups for performance over Guava Cache and Caffeine Cache + * since there was a concern in https://github.com/apache/pulsar/pull/23052 about high CPU usage for cache lookups. + */ +class TopicNameCache extends NameCache { + static final TopicNameCache INSTANCE = new TopicNameCache(); + // Configuration for the cache. These settings aren't currently exposed to end users. + static int cacheMaxSize = 100000; + static int reduceSizeByPercentage = 25; + static long cacheMaintenanceTaskIntervalMillis = TimeUnit.SECONDS.toMillis(5); + + @Override + protected TopicName createValue(String key) { + return new TopicName(key); + } + + @Override + protected int getCacheMaxSize() { + return cacheMaxSize; + } + + @Override + protected int getReduceSizeByPercentage() { + return reduceSizeByPercentage; + } + + @Override + protected long getMaintenanceTaskIntervalMillis() { + return cacheMaintenanceTaskIntervalMillis; + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java new file mode 100644 index 0000000000000..65c6d7003234f --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameCacheTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import org.apache.commons.lang3.RandomStringUtils; +import org.testng.annotations.Test; + +public class TopicNameCacheTest { + + @Test + public void shrinkCache() { + // Test that the cache can shrink when the size exceeds the maximum limit + TopicNameCache cache = TopicNameCache.INSTANCE; + for (int i = 0; i < TopicNameCache.cacheMaxSize; i++) { + cache.get("persistent://tenant/namespace/topic" + i); + } + + // check that the cache size is at maximum + assertEquals(cache.size(), TopicNameCache.cacheMaxSize); + + // Add one more topic to trigger the cache shrink + cache.get("persistent://tenant/namespace/topic100101"); + + // The cache should have reduced its size by the configured percentage + assertThat(cache.size()).isEqualTo( + (int) (TopicNameCache.cacheMaxSize * ((100 - TopicNameCache.reduceSizeByPercentage) / 100.0))) + .as("Cache size should be reduced after adding an extra topic beyond the max size"); + } + + @Test + public void softReferenceHandling() { + int defaultCacheMaxSize = TopicNameCache.cacheMaxSize; + long defaultCacheMaintenceTaskIntervalMillis = TopicNameCache.cacheMaintenanceTaskIntervalMillis; + try { + TopicNameCache.cacheMaxSize = Integer.MAX_VALUE; + TopicNameCache.cacheMaintenanceTaskIntervalMillis = 10L; + + TopicNameCache cache = TopicNameCache.INSTANCE; + for (int i = 0; i < 2_000_000; i++) { + cache.get("persistent://tenant/namespace/topic" + RandomStringUtils.randomAlphabetic(100)); + if (i % 100_000 == 0) { + System.out.println(i + " topics added to cache. Current size: " + cache.size()); + } + } + } finally { + // Reset the cache settings to default after the test + TopicNameCache.cacheMaxSize = defaultCacheMaxSize; + TopicNameCache.cacheMaintenanceTaskIntervalMillis = defaultCacheMaintenceTaskIntervalMillis; + } + } +} \ No newline at end of file