Skip to content

Commit acad78c

Browse files
authored
[improve][broker] Part-2 of PIP-434: Use ServerCnxThrottleTracker, instead of modifying channel.readable directly (#24799)
1 parent a091ea7 commit acad78c

File tree

11 files changed

+565
-196
lines changed

11 files changed

+565
-196
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.pulsar.broker.qos.MonotonicClock;
2222
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
2323
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
24+
import org.apache.pulsar.broker.service.ServerCnxThrottleTracker;
2425
import org.apache.pulsar.common.policies.data.Policies;
2526
import org.apache.pulsar.common.policies.data.PublishRate;
2627
import org.apache.pulsar.common.policies.data.ResourceGroup;
@@ -30,7 +31,13 @@ public class ResourceGroupPublishLimiter extends PublishRateLimiterImpl {
3031
private volatile long publishMaxByteRate;
3132

3233
public ResourceGroupPublishLimiter(ResourceGroup resourceGroup, MonotonicClock monotonicClock) {
33-
super(monotonicClock);
34+
super(monotonicClock, producer -> {
35+
producer.getCnx().getThrottleTracker().markThrottled(
36+
ServerCnxThrottleTracker.ThrottleType.ResourceGroupPublishRate);
37+
}, producer -> {
38+
producer.getCnx().getThrottleTracker().unmarkThrottled(
39+
ServerCnxThrottleTracker.ThrottleType.ResourceGroupPublishRate);
40+
});
3441
update(resourceGroup);
3542
}
3643

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static java.util.Objects.requireNonNull;
2323
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
24+
import static org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType;
2425
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
2526
import com.google.common.annotations.VisibleForTesting;
2627
import com.google.common.base.MoreObjects;
@@ -193,7 +194,12 @@ public AbstractTopic(String topic, BrokerService brokerService) {
193194
updateTopicPolicyByBrokerConfig();
194195

195196
this.lastActive = System.nanoTime();
196-
topicPublishRateLimiter = new PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicClock());
197+
topicPublishRateLimiter = new PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicClock(),
198+
producer -> {
199+
producer.getCnx().getThrottleTracker().markThrottled(ThrottleType.TopicPublishRate);
200+
}, producer -> {
201+
producer.getCnx().getThrottleTracker().unmarkThrottled(ThrottleType.TopicPublishRate);
202+
});
197203
updateActiveRateLimiters();
198204

199205
additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,13 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
339339
this.pulsar = pulsar;
340340
this.clock = pulsar.getClock();
341341
this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
342-
this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsar.getMonotonicClock());
342+
this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsar.getMonotonicClock(), producer -> {
343+
producer.getCnx().getThrottleTracker().markThrottled(
344+
ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
345+
}, producer -> {
346+
producer.getCnx().getThrottleTracker().unmarkThrottled(
347+
ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
348+
});
343349
this.dispatchRateLimiterFactory = createDispatchRateLimiterFactory(pulsar.getConfig());
344350
this.managedLedgerStorage = pulsar.getManagedLedgerStorage();
345351
this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -899,30 +899,6 @@ public boolean isDisconnecting() {
899899

900900
private static final Logger log = LoggerFactory.getLogger(Producer.class);
901901

902-
/**
903-
* This method increments a counter that is used to control the throttling of a connection.
904-
* The connection's read operations are paused when the counter's value is greater than 0, indicating that
905-
* throttling is in effect.
906-
* It's important to note that after calling this method, it is the caller's responsibility to ensure that the
907-
* counter is decremented by calling the {@link #decrementThrottleCount()} method when throttling is no longer
908-
* needed on the connection.
909-
*/
910-
public void incrementThrottleCount() {
911-
cnx.incrementThrottleCount();
912-
}
913-
914-
/**
915-
* This method decrements a counter that is used to control the throttling of a connection.
916-
* The connection's read operations are resumed when the counter's value is 0, indicating that
917-
* throttling is no longer in effect.
918-
* It's important to note that before calling this method, the caller should have previously
919-
* incremented the counter by calling the {@link #incrementThrottleCount()} method when throttling
920-
* was needed on the connection.
921-
*/
922-
public void decrementThrottleCount() {
923-
cnx.decrementThrottleCount();
924-
}
925-
926902
public Attributes getOpenTelemetryAttributes() {
927903
if (attributes != null) {
928904
return attributes;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.function.Consumer;
2728
import lombok.extern.slf4j.Slf4j;
2829
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
2930
import org.apache.pulsar.broker.qos.MonotonicClock;
@@ -42,9 +43,14 @@ public class PublishRateLimiterImpl implements PublishRateLimiter {
4243

4344
private final AtomicInteger throttledProducersCount = new AtomicInteger(0);
4445
private final AtomicBoolean processingQueuedProducers = new AtomicBoolean(false);
46+
private final Consumer<Producer> throttleAction;
47+
private final Consumer<Producer> unthrottleAction;
4548

46-
public PublishRateLimiterImpl(MonotonicClock monotonicClock) {
49+
public PublishRateLimiterImpl(MonotonicClock monotonicClock, Consumer<Producer> throttleAction,
50+
Consumer<Producer> unthrottleAction) {
4751
this.monotonicClock = monotonicClock;
52+
this.throttleAction = throttleAction;
53+
this.unthrottleAction = unthrottleAction;
4854
}
4955

5056
/**
@@ -68,7 +74,7 @@ public void handlePublishThrottling(Producer producer, int numOfMessages,
6874
}
6975
if (shouldThrottle) {
7076
// throttle the producer by incrementing the throttle count
71-
producer.incrementThrottleCount();
77+
throttleAction.accept(producer);
7278
// schedule decrementing the throttle count to possibly unthrottle the producer after the
7379
// throttling period
7480
scheduleDecrementThrottleCount(producer);
@@ -136,7 +142,8 @@ private void unthrottleQueuedProducers(ScheduledExecutorService executor) {
136142
while ((throttlingDuration = calculateThrottlingDurationNanos()) == 0L
137143
&& (producer = unthrottlingQueue.poll()) != null) {
138144
try {
139-
producer.decrementThrottleCount();
145+
final Producer producerFinal = producer;
146+
producer.getCnx().execute(() -> unthrottleAction.accept(producerFinal));
140147
} catch (Exception e) {
141148
log.error("Failed to unthrottle producer {}", producer, e);
142149
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.apache.commons.lang3.StringUtils.isNotBlank;
2626
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
2727
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
28+
import static org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType;
2829
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl;
2930
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException;
3031
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
@@ -283,7 +284,8 @@ public void incrementPublishBytes(long bytes, long maxPendingBytesPerThread) {
283284
if (maxPendingBytesPerThread > 0 && pendingBytes > maxPendingBytesPerThread
284285
&& !limitExceeded) {
285286
limitExceeded = true;
286-
cnxsPerThread.get().forEach(cnx -> cnx.throttleTracker.setPublishBufferLimiting(true));
287+
cnxsPerThread.get().forEach(cnx -> cnx.throttleTracker.markThrottled(
288+
ThrottleType.IOThreadMaxPendingPublishBytesExceeded));
287289
}
288290
}
289291

@@ -293,7 +295,8 @@ public void decrementPublishBytes(long bytes, long resumeThresholdPendingBytesPe
293295
// we resume all connections sharing the same thread
294296
if (limitExceeded && pendingBytes <= resumeThresholdPendingBytesPerThread) {
295297
limitExceeded = false;
296-
cnxsPerThread.get().forEach(cnx -> cnx.throttleTracker.setPublishBufferLimiting(false));
298+
cnxsPerThread.get().forEach(cnx -> cnx.throttleTracker.unmarkThrottled(
299+
ThrottleType.IOThreadMaxPendingPublishBytesExceeded));
297300
}
298301
}
299302
}
@@ -311,6 +314,7 @@ enum State {
311314
Start, Connected, Failed, Connecting
312315
}
313316

317+
@Getter
314318
private final ServerCnxThrottleTracker throttleTracker;
315319

316320
public ServerCnx(PulsarService pulsar) {
@@ -481,12 +485,12 @@ private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) {
481485
log.warn("[{}] Reached rate limitation", this);
482486
// Stop receiving requests.
483487
pausedDueToRateLimitation = true;
484-
ctx.channel().config().setAutoRead(false);
488+
getThrottleTracker().markThrottled(ThrottleType.ConnectionPauseReceivingCooldownRateLimit);
485489
// Resume after 1 second.
486490
ctx.channel().eventLoop().schedule(() -> {
487491
if (pausedDueToRateLimitation) {
488492
log.info("[{}] Resuming connection after rate limitation", this);
489-
ctx.channel().config().setAutoRead(true);
493+
getThrottleTracker().unmarkThrottled(ThrottleType.ConnectionPauseReceivingCooldownRateLimit);
490494
pausedDueToRateLimitation = false;
491495
}
492496
}, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS);
@@ -497,7 +501,7 @@ private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) {
497501
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
498502
if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
499503
log.info("[{}] is writable, turn on channel auto-read", this);
500-
ctx.channel().config().setAutoRead(true);
504+
getThrottleTracker().unmarkThrottled(ThrottleType.ConnectionOutboundBufferFull);
501505
requestRateLimiter.timingOpen(pauseReceivingCooldownMilliSeconds, TimeUnit.MILLISECONDS);
502506
} else if (pauseReceivingRequestsIfUnwritable && !ctx.channel().isWritable()) {
503507
final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer();
@@ -511,7 +515,7 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
511515
PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off channel auto-read", this);
512516
}
513517
}
514-
ctx.channel().config().setAutoRead(false);
518+
getThrottleTracker().markThrottled(ThrottleType.ConnectionOutboundBufferFull);
515519
}
516520
ctx.fireChannelWritabilityChanged();
517521
}
@@ -3399,7 +3403,7 @@ public boolean isWritable() {
33993403
// or the pending publish bytes
34003404
private void increasePendingSendRequestsAndPublishBytes(int msgSize) {
34013405
if (++pendingSendRequest == maxPendingSendRequests) {
3402-
throttleTracker.setPendingSendRequestsExceeded(true);
3406+
throttleTracker.markThrottled(ThrottleType.ConnectionMaxPendingPublishRequestsExceeded);
34033407
}
34043408
PendingBytesPerThreadTracker.getInstance().incrementPublishBytes(msgSize, maxPendingBytesPerThread);
34053409
}
@@ -3424,7 +3428,7 @@ public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
34243428
PendingBytesPerThreadTracker.getInstance().decrementPublishBytes(msgSize, resumeThresholdPendingBytesPerThread);
34253429

34263430
if (--pendingSendRequest == resumeReadsThreshold) {
3427-
throttleTracker.setPendingSendRequestsExceeded(false);
3431+
throttleTracker.unmarkThrottled(ThrottleType.ConnectionMaxPendingPublishRequestsExceeded);
34283432
}
34293433

34303434
if (isNonPersistentTopic) {
@@ -3803,22 +3807,6 @@ protected void setAuthRole(String authRole) {
38033807
this.authRole = authRole;
38043808
}
38053809

3806-
/**
3807-
* {@inheritDoc}
3808-
*/
3809-
@Override
3810-
public void incrementThrottleCount() {
3811-
throttleTracker.incrementThrottleCount();
3812-
}
3813-
3814-
/**
3815-
* {@inheritDoc}
3816-
*/
3817-
@Override
3818-
public void decrementThrottleCount() {
3819-
throttleTracker.decrementThrottleCount();
3820-
}
3821-
38223810
@VisibleForTesting
38233811
void setAuthState(AuthenticationState authState) {
38243812
this.authState = authState;

0 commit comments

Comments
 (0)