diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 279ed9ed73ebf..96a48e5b282ab 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -919,6 +919,58 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int brokerMaxConnections = 0; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferHighWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer exceeds this value, channel writable state will start to return \"false\"." + ) + private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferLowWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer is smaller than this value, channel writable state will start to return" + + " \"true\"." + ) + private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "If enabled, the broker will pause reading from the channel to deal with new request once the writer" + + " buffer is full, until it is changed to writable." + ) + private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited" + + " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines" + + " how long the rate limiting should last, in millis. Once the bytes that are waiting to be sent out" + + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative" + + " value will disable the rate limiting." + ) + private int pulsarChannelPauseReceivingCooldownMs = 5000; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + + " period of time to avoid overwhelming due to the backlog of requests. This parameter defines how" + + " many requests should be allowed in the rate limiting period." + + ) + private int pulsarChannelPauseReceivingCooldownRateLimitPermits = 5; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + + " period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the" + + " backlog of requests. This parameter defines the period of the rate limiter in milliseconds. If the rate" + + " limit period is set to 1000, then the unit is requests per 1000 milli seconds. When it's 10, the unit" + + " is requests per every 10ms." + + ) + private int pulsarChannelPauseReceivingCooldownRateLimitPeriodMs = 10; + @FieldContext( category = CATEGORY_POLICIES, doc = "The maximum number of connections per IP. If it exceeds, new connections are rejected." diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 68da1083d2205..dffd8260d463c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -81,6 +81,8 @@ protected void initChannel(SocketChannel ch) throws Exception { // disable auto read explicitly so that requests aren't served until auto read is enabled // ServerCnx must enable auto read in channelActive after PulsarService is ready to accept incoming requests ch.config().setAutoRead(false); + ch.config().setWriteBufferHighWaterMark(pulsar.getConfig().getPulsarChannelWriteBufferHighWaterMark()); + ch.config().setWriteBufferLowWaterMark(pulsar.getConfig().getPulsarChannelWriteBufferLowWaterMark()); ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true)); if (this.enableTls) { ch.pipeline().addLast(TLS_HANDLER, new SslHandler(this.sslFactory.createServerSslEngine(ch.alloc()))); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 44927c375b57b..dfd0b9cfd498a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -37,6 +37,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.FastThreadLocal; @@ -181,6 +182,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; +import org.apache.pulsar.utils.TimedSingleThreadRateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,6 +193,8 @@ * parameter instance lifecycle. */ public class ServerCnx extends PulsarHandler implements TransportCnx { + private static final Logger PAUSE_RECEIVING_LOG = LoggerFactory.getLogger(ServerCnx.class.getName() + + ".pauseReceiving"); private final BrokerService service; private final SchemaRegistryService schemaService; private final String listenerName; @@ -249,6 +253,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final long connectionLivenessCheckTimeoutMillis; private final TopicsPattern.RegexImplementation topicsPatternImplementation; + private final boolean pauseReceivingRequestsIfUnwritable; + private final TimedSingleThreadRateLimiter requestRateLimiter; + private final int pauseReceivingCooldownMilliSeconds; + private boolean pausedDueToRateLimitation = false; // Tracks and limits number of bytes pending to be published from a single specific IO thread. static final class PendingBytesPerThreadTracker { @@ -312,6 +320,14 @@ public ServerCnx(PulsarService pulsar, String listenerName) { // the null check is a workaround for #13620 super(pulsar.getBrokerService() != null ? pulsar.getBrokerService().getKeepAliveIntervalSeconds() : 0, TimeUnit.SECONDS); + this.pauseReceivingRequestsIfUnwritable = + pulsar.getConfig().isPulsarChannelPauseReceivingRequestsIfUnwritable(); + this.requestRateLimiter = new TimedSingleThreadRateLimiter( + pulsar.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPermits(), + pulsar.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(), + TimeUnit.MILLISECONDS); + this.pauseReceivingCooldownMilliSeconds = + pulsar.getConfig().getPulsarChannelPauseReceivingCooldownMs(); this.service = pulsar.getBrokerService(); this.schemaService = pulsar.getSchemaRegistryService(); this.listenerName = listenerName; @@ -438,11 +454,62 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } + private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) { + if (!pauseReceivingRequestsIfUnwritable + || pauseReceivingCooldownMilliSeconds <= 0 || cmd.getType() == BaseCommand.Type.PONG + || cmd.getType() == BaseCommand.Type.PING) { + return; + } + if (PAUSE_RECEIVING_LOG.isDebugEnabled()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + PAUSE_RECEIVING_LOG.debug("Start to handle request [{}], totalPendingWriteBytes: {}, channel" + + " isWritable: {}", cmd.getType(), outboundBuffer.totalPendingWriteBytes(), + ctx.channel().isWritable()); + } else { + PAUSE_RECEIVING_LOG.debug("Start to handle request [{}], channel isWritable: {}", + cmd.getType(), ctx.channel().isWritable()); + } + } + // "requestRateLimiter" will return the permits that you acquired if it is not opening(has been called + // "timingOpen(duration)"). + if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) { + log.warn("[{}] Reached rate limitation", this); + // Stop receiving requests. + pausedDueToRateLimitation = true; + ctx.channel().config().setAutoRead(false); + // Resume after 1 second. + ctx.channel().eventLoop().schedule(() -> { + if (pausedDueToRateLimitation) { + log.info("[{}] Resuming connection after rate limitation", this); + ctx.channel().config().setAutoRead(true); + pausedDueToRateLimitation = false; + } + }, 1, TimeUnit.SECONDS); + } + } + @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - if (log.isDebugEnabled()) { - log.debug("Channel writability has changed to: {}", ctx.channel().isWritable()); + if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) { + log.info("[{}] is writable, turn on channel auto-read", this); + ctx.channel().config().setAutoRead(true); + requestRateLimiter.timingOpen(pauseReceivingCooldownMilliSeconds, TimeUnit.MILLISECONDS); + } else if (pauseReceivingRequestsIfUnwritable && !ctx.channel().isWritable()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + if (PAUSE_RECEIVING_LOG.isDebugEnabled()) { + PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off channel auto-read," + + " totalPendingWriteBytes: {}", this, outboundBuffer.totalPendingWriteBytes()); + } + } else { + if (PAUSE_RECEIVING_LOG.isDebugEnabled()) { + PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off channel auto-read", this); + } + } + ctx.channel().config().setAutoRead(false); } + ctx.fireChannelWritabilityChanged(); } @Override @@ -3645,8 +3712,9 @@ public CompletableFuture> checkConnectionLiveness() { } @Override - protected void messageReceived() { - super.messageReceived(); + protected void messageReceived(BaseCommand cmd) { + checkPauseReceivingRequestsAfterResumeRateLimit(cmd); + super.messageReceived(cmd); if (connectionCheckInProgress != null && !connectionCheckInProgress.isDone()) { connectionCheckInProgress.complete(Optional.of(true)); connectionCheckInProgress = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java new file mode 100644 index 0000000000000..80043fb67f8ca --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.utils; + +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TimedSingleThreadRateLimiter { + + @Getter + private final int rate; + @Getter + private final long periodAtMs; + private long lastTimeReset; + @Getter + private int remaining; + private long closeAfterAtMs; + + public TimedSingleThreadRateLimiter(final int rate, final long period, final TimeUnit unit) { + this.rate = rate; + this.periodAtMs = unit.toMillis(period); + this.lastTimeReset = System.currentTimeMillis(); + this.remaining = rate; + } + + public int acquire(int permits) { + final long now = System.currentTimeMillis(); + if (permits < 0) { + return 0; + } + if (now > closeAfterAtMs) { + return permits; + } + mayRenew(now); + if (remaining > permits) { + remaining -= permits; + if (log.isDebugEnabled()) { + log.debug("acquired: {}, remaining:{}", permits, remaining); + } + return permits; + } else { + int acquired = remaining; + remaining = 0; + if (log.isDebugEnabled()) { + log.debug("acquired: {}, remaining:{}", acquired, remaining); + } + return acquired; + } + } + + public void timingOpen(long closeAfter, final TimeUnit unit) { + if (closeAfter <= 0) { + this.closeAfterAtMs = 0; + } else { + this.closeAfterAtMs = System.currentTimeMillis() + unit.toMillis(closeAfter); + } + } + + private void mayRenew(long now) { + if (now > lastTimeReset + periodAtMs) { + remaining = rate; + lastTimeReset = now; + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index 0ad8ca8f1c726..d6210767ed1f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -23,6 +23,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.util.Queue; +import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; @@ -71,7 +72,7 @@ public Object getCommand(Object obj) { private final PulsarDecoder decoder = new PulsarDecoder() { @Override - protected void messageReceived() { + protected void messageReceived(BaseCommand cmd) { } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java index e66880738cf26..b7e0ee429034a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSubscribeHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandTopicLookupHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandUnsubscribeHook; +import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.CommandCloseProducer; @@ -132,7 +133,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override - protected void messageReceived() { + protected void messageReceived(BaseCommand cmd) { } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java new file mode 100644 index 0000000000000..aa3e17c2ea03e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.naming.NamespaceName; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-impl") +public class PatternConsumerBackPressureTest extends MockedPulsarServiceBaseTest { + + @Override + @BeforeMethod + protected void setup() throws Exception { + isTcpLookup = true; + conf.setEnableBrokerSideSubscriptionPatternEvaluation(false); + super.internalSetup(); + setupDefaultTenantAndNamespace(); + } + + @Override + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + conf.setPulsarChannelPauseReceivingRequestsIfUnwritable(true); + // 5m. + conf.setPulsarChannelWriteBufferHighWaterMark(1 * 1024 * 1024); + // 32k. + conf.setPulsarChannelWriteBufferLowWaterMark(32 * 1024); + } + + @Test(timeOut = 60 * 1000) + public void testInfiniteGetThousandsTopics() throws PulsarAdminException, InterruptedException { + final int topicCount = 8192; + final int requests = 2048; + final String topicName = UUID.randomUUID().toString(); + admin.topics().createPartitionedTopic(topicName, topicCount); + final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime() + .availableProcessors()); + + final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; + final AtomicInteger success = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(requests); + for (int i = 0; i < requests; i++) { + executorService.execute(() -> { + pulsarClientImpl.getLookup() + .getTopicsUnderNamespace(NamespaceName.get("public", "default"), + CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", "") + .whenComplete((result, ex) -> { + if (ex == null) { + success.incrementAndGet(); + } else { + log.error("Failed to get topic list.", ex); + } + log.info("latch-count: {}, succeed: {}", latch.getCount(), success.get()); + latch.countDown(); + }); + }); + } + latch.await(); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(success.get(), requests); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java new file mode 100644 index 0000000000000..183d68e81ccb3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.utils; + +import static org.testng.Assert.assertEquals; +import java.util.concurrent.TimeUnit; +import org.testng.annotations.Test; + +/** + * Comprehensive test suite for TimedSingleThreadRateLimiter class. + */ +public class TimedSingleThreadRateLimiterTest { + + @Test + public void testConstructorAndGetters() { + int rate = 100; + long period = 5; + TimeUnit unit = TimeUnit.SECONDS; + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(rate, period, unit); + assertEquals(limiter.getRate(), rate); + assertEquals(limiter.getPeriodAtMs(), unit.toMillis(period)); + assertEquals(limiter.getRemaining(), rate); // Initially should have all permits + } + + @Test + public void testConstructorWithDifferentTimeUnits() { + // Test with milliseconds + TimedSingleThreadRateLimiter limiterMs = new TimedSingleThreadRateLimiter(50, 1000, TimeUnit.MILLISECONDS); + assertEquals(limiterMs.getPeriodAtMs(), 1000); + // Test with seconds + TimedSingleThreadRateLimiter limiterSec = new TimedSingleThreadRateLimiter(50, 2, TimeUnit.SECONDS); + assertEquals(limiterSec.getPeriodAtMs(), 2000); + // Test with minutes + TimedSingleThreadRateLimiter limiterMin = new TimedSingleThreadRateLimiter(50, 1, TimeUnit.MINUTES); + assertEquals(limiterMin.getPeriodAtMs(), 60000); + } + + @Test + public void testBasicAcquire() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(100, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Test acquiring single permit + int acquired = limiter.acquire(1); + assertEquals(acquired, 1); + assertEquals(limiter.getRemaining(), 99); + // Test acquiring multiple permits + acquired = limiter.acquire(10); + assertEquals(acquired, 10); + assertEquals(limiter.getRemaining(), 89); + } + + @Test + public void testAcquireMoreThanRemaining() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Acquire most permits + int acquired = limiter.acquire(8); + assertEquals(acquired, 8); + assertEquals(limiter.getRemaining(), 2); + // Try to acquire more than remaining + acquired = limiter.acquire(5); + assertEquals(acquired, 2); // Should only get remaining permits + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testAcquireWhenNoPermitsRemaining() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(5, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Exhaust all permits + limiter.acquire(5); + assertEquals(limiter.getRemaining(), 0); + // Try to acquire when no permits left + int acquired = limiter.acquire(3); + assertEquals(acquired, 0); + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testAcquireZeroPermits() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + int acquired = limiter.acquire(0); + assertEquals(acquired, 0); + assertEquals(limiter.getRemaining(), 10); // Should remain unchanged + } + + @Test + public void testPermitRenewalAfterPeriod() throws InterruptedException { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 100, TimeUnit.MILLISECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Exhaust all permits + limiter.acquire(10); + assertEquals(limiter.getRemaining(), 0); + // Wait for period to pass + Thread.sleep(150); + // Acquire should trigger renewal + int acquired = limiter.acquire(5); + assertEquals(acquired, 5); + assertEquals(limiter.getRemaining(), 5); + } + + @Test + public void testNoRenewalBeforePeriodExpires() throws InterruptedException { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Exhaust all permits + limiter.acquire(10); + assertEquals(limiter.getRemaining(), 0); + // Should not renew yet + int acquired = limiter.acquire(5); + assertEquals(acquired, 0); + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testTimingOpen() throws Exception { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS); + // Set timing to open for 500ms + limiter.timingOpen(500, TimeUnit.MILLISECONDS); + // During open period. + int acquired = limiter.acquire(15); + assertEquals(acquired, 10); + assertEquals(limiter.getRemaining(), 0); + // Closed. + Thread.sleep(1000); + int acquired2 = limiter.acquire(1000); + assertEquals(acquired2, 1000); + } + + @Test + public void testTimingOpenWithZeroDuration() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS); + // Set timing to open for 0 duration. + limiter.timingOpen(0, TimeUnit.MILLISECONDS); + // Closed. + int acquired = limiter.acquire(7000); + assertEquals(acquired, 7000); + } + + @Test + public void testHighRateAcquisition() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(1000, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Acquire permits in chunks + int totalAcquired = 0; + for (int i = 0; i < 10; i++) { + totalAcquired += limiter.acquire(100); + } + assertEquals(totalAcquired, 1000); + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testLowRateAcquisition() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(3, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Acquire all permits one by one + assertEquals(limiter.acquire(1), 1); + assertEquals(limiter.getRemaining(), 2); + assertEquals(limiter.acquire(1), 1); + assertEquals(limiter.getRemaining(), 1); + assertEquals(limiter.acquire(1), 1); + assertEquals(limiter.getRemaining(), 0); + // No more permits available + assertEquals(limiter.acquire(1), 0); + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testRenewalWithPartialAcquisition() throws InterruptedException { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 100, TimeUnit.MILLISECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Acquire some permits + limiter.acquire(6); + assertEquals(limiter.getRemaining(), 4); + // Wait for renewal + Thread.sleep(150); + // After renewal, should have full rate again + int acquired = limiter.acquire(8); + assertEquals(acquired, 8); + assertEquals(limiter.getRemaining(), 2); + } + + @Test + public void testConcurrentBehaviorSimulation() throws InterruptedException { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(20, 100, TimeUnit.MILLISECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Simulate rapid acquisitions + int totalAcquired = 0; + for (int i = 0; i < 5; i++) { + totalAcquired += limiter.acquire(5); + } + assertEquals(totalAcquired, 20); + assertEquals(limiter.getRemaining(), 0); + // Wait for renewal + Thread.sleep(150); + // Should be able to acquire again + int newAcquired = limiter.acquire(10); + assertEquals(newAcquired, 10); + assertEquals(limiter.getRemaining(), 10); + } + + @Test + public void testVeryShortPeriod() throws InterruptedException { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(5, 10, TimeUnit.MILLISECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Exhaust permits + limiter.acquire(5); + assertEquals(limiter.getRemaining(), 0); + // Wait for very short period + Thread.sleep(20); + // Should renew quickly + int acquired = limiter.acquire(3); + assertEquals(acquired, 3); + assertEquals(limiter.getRemaining(), 2); + } + + @Test + public void testVeryLongPeriod() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 1, TimeUnit.HOURS); + limiter.timingOpen(10, TimeUnit.SECONDS); + assertEquals(limiter.getPeriodAtMs(), TimeUnit.HOURS.toMillis(1)); + // Acquire some permits + int acquired = limiter.acquire(7); + assertEquals(acquired, 7); + assertEquals(limiter.getRemaining(), 3); + // Even after a short wait, should not renew (period is 1 hour) + int acquired2 = limiter.acquire(5); + assertEquals(acquired2, 3); // Only remaining permits + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testSinglePermitRate() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(1, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + assertEquals(limiter.getRate(), 1); + assertEquals(limiter.getRemaining(), 1); + // Acquire the only permit + int acquired = limiter.acquire(1); + assertEquals(acquired, 1); + assertEquals(limiter.getRemaining(), 0); + // Try to acquire more + acquired = limiter.acquire(1); + assertEquals(acquired, 0); + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testLargePermitRequest() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Request much more than available + int acquired = limiter.acquire(1000); + assertEquals(acquired, 10); // Should get all available permits + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testNegativePermitRequest() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Request negative permits (edge case) + int acquired = limiter.acquire(-5); + // The implementation doesn't explicitly handle negative permits + // This test documents the current behavior + assertEquals(acquired, 0); // Should not return negative + assertEquals(limiter.getRemaining(), 10); // Remaining should not go negative + } + + @Test + public void testMultipleRenewalCycles() throws InterruptedException { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(5, 50, TimeUnit.MILLISECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // First cycle + limiter.acquire(5); + assertEquals(limiter.getRemaining(), 0); + // Wait for first renewal + Thread.sleep(60); + limiter.acquire(3); + assertEquals(limiter.getRemaining(), 2); + // Wait for second renewal + Thread.sleep(60); + int acquired = limiter.acquire(4); + assertEquals(acquired, 4); + assertEquals(limiter.getRemaining(), 1); + // Wait for third renewal + Thread.sleep(60); + acquired = limiter.acquire(5); + assertEquals(acquired, 5); + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testRapidAcquisitionPattern() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(100, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Simulate rapid small acquisitions + int totalAcquired = 0; + for (int i = 0; i < 50; i++) { + totalAcquired += limiter.acquire(2); + } + assertEquals(totalAcquired, 100); + assertEquals(limiter.getRemaining(), 0); + } + + @Test + public void testBurstAcquisitionPattern() { + TimedSingleThreadRateLimiter limiter = new TimedSingleThreadRateLimiter(50, 1, TimeUnit.SECONDS); + limiter.timingOpen(10, TimeUnit.SECONDS); + // Large burst acquisition + int acquired1 = limiter.acquire(30); + assertEquals(acquired1, 30); + assertEquals(limiter.getRemaining(), 20); + // Another burst + int acquired2 = limiter.acquire(25); + assertEquals(acquired2, 20); // Only remaining permits + assertEquals(limiter.getRemaining(), 0); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index c05b1d796dfdd..b61664d9571da 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -124,7 +124,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (log.isDebugEnabled()) { log.debug("[{}] Received cmd {}", ctx.channel(), cmd.getType()); } - messageReceived(); + messageReceived(cmd); switch (cmd.getType()) { case PARTITIONED_METADATA: @@ -486,7 +486,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } - protected abstract void messageReceived(); + protected abstract void messageReceived(BaseCommand cmd); private ServerError getServerError(int errorCode) { ServerError serverError = ServerError.valueOf(errorCode); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index 020b753086f42..e8010ea1a514f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -26,6 +26,7 @@ import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import lombok.Setter; +import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandPing; import org.apache.pulsar.common.api.proto.CommandPong; import org.apache.pulsar.common.api.proto.ProtocolVersion; @@ -61,7 +62,7 @@ public PulsarHandler(int keepAliveInterval, TimeUnit unit) { } @Override - protected void messageReceived() { + protected void messageReceived(BaseCommand cmd) { waitingForPingResponse = false; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java index 6fd77c1662474..af68f6cd28b2b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.testng.annotations.Test; @@ -43,7 +44,7 @@ protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { } @Override - protected void messageReceived() { + protected void messageReceived(BaseCommand cmd) { } }); decoder.channelRead(mock(ChannelHandlerContext.class), cmdBuf); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 775108a75e6b7..5f4456d356e58 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.FeatureFlags; @@ -395,7 +396,7 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) { } @Override - protected void messageReceived() { + protected void messageReceived(BaseCommand cmd) { // no-op }