Skip to content

Commit 4a2f75e

Browse files
committed
[Broker] Disable memory limit controller for broker client and replication clients (#15723)
- disable memory limit by default for broker client and replication clients - restore maxPendingMessages and maxPendingMessagesAcrossPartitions when memory limit is disabled so that pre-PIP-120 default configuration is restored when limit is disabled
1 parent abec5d8 commit 4a2f75e

File tree

4 files changed

+25
-1
lines changed

4 files changed

+25
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,10 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
13741374
if (this.client == null) {
13751375
try {
13761376
ClientConfigurationData conf = new ClientConfigurationData();
1377+
1378+
// Disable memory limit for broker client
1379+
conf.setMemoryLimitBytes(0);
1380+
13771381
conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
13781382
? this.brokerServiceUrlTls : this.brokerServiceUrl);
13791383
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
128128
import org.apache.pulsar.client.api.ClientBuilder;
129129
import org.apache.pulsar.client.api.PulsarClient;
130+
import org.apache.pulsar.client.api.SizeUnit;
130131
import org.apache.pulsar.client.impl.ClientBuilderImpl;
131132
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
132133
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1199,6 +1200,10 @@ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> c
11991200
.enableTcpNoDelay(false)
12001201
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
12011202
.statsInterval(0, TimeUnit.SECONDS);
1203+
1204+
// Disable memory limit for replication client
1205+
clientBuilder.memoryLimit(0, SizeUnit.BYTES);
1206+
12021207
if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) {
12031208
clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters());
12041209
} else if (pulsar.getConfiguration().isAuthenticationEnabled()) {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,8 @@ public void releaseMemory(long size) {
8080
public long currentUsage() {
8181
return currentUsage.get();
8282
}
83+
84+
public boolean isMemoryLimited() {
85+
return memoryLimit > 0;
86+
}
8387
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ public class PulsarClientImpl implements PulsarClient {
9292

9393
private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
9494

95+
// default limits for producers when memory limit controller is disabled
96+
private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES = 1000;
97+
private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;
98+
9599
protected final ClientConfigurationData conf;
96100
private final boolean createdExecutorProviders;
97101
private LookupService lookup;
@@ -251,7 +255,14 @@ public ProducerBuilder<byte[]> newProducer() {
251255

252256
@Override
253257
public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
254-
return new ProducerBuilderImpl<>(this, schema);
258+
ProducerBuilderImpl<T> producerBuilder = new ProducerBuilderImpl<>(this, schema);
259+
if (!memoryLimitController.isMemoryLimited()) {
260+
// set default limits for producers when memory limit controller is disabled
261+
producerBuilder.maxPendingMessages(NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES);
262+
producerBuilder.maxPendingMessagesAcrossPartitions(
263+
NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
264+
}
265+
return producerBuilder;
255266
}
256267

257268
@Override

0 commit comments

Comments
 (0)