Skip to content

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jun 17, 2025

Motivation

See the PIP-434

How it works

After setting pulsarChannelPauseReceivingRequestsIfUnwritable to true, the channel state will be changed as follows.

  • Netty sets channel.writable to false when there is too much data that is waiting to be sent out(the size of the data cached in ChannelOutboundBuffer is larger than {pulsarChannelWriteBufferHighWaterMark})
    • Netty will trigger an event channelWritabilityChanged
  • Stops receiving requests that come into the channel, which relies on the attribute autoRead of the Netty channel.
  • Netty sets channel.writable to true once the size of the data that is waiting to be sent out is less than {pulsarChannelWriteBufferLowWaterMark}
  • Starts to receive requests(sets channel.autoRead to true).
    • To avoid avalanches, Pulsar will start a timed rate-limiter, which limits the rate of handling the request backlog("pulsarChannelRateLimitingRateAfterResumeFromUnreadable" requests per second).
    • After "{pulsarChannelRateLimitingSecondsAfterResumeFromUnreadable}" seconds, the rate-limiter will be removed automatically. Once the bytes that are waiting to be sent out reach the pulsarChannelWriteBufferHighWaterMark", the timer will be reset.

With the settings pulsarChannelPauseReceivingRequestsIfUnwritable=false(default settings), the behaviour is exactly the same as the previous.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added this to the 4.1.0 milestone Jun 17, 2025
@poorbarcode poorbarcode self-assigned this Jun 17, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 17, 2025
@poorbarcode poorbarcode changed the title [fix][broker]Broker OOM because ServerCnx handling too many pending write, when the client can not handle response quick enough [fix][broker]Broker OOM: ServerCnx reply too much response but can not send out because the client can not handle response quick enough Jun 17, 2025
@poorbarcode poorbarcode changed the title [fix][broker]Broker OOM: ServerCnx reply too much response but can not send out because the client can not handle response quick enough [fix][broker]OOM issue: ServerCnx replies too many responses, but can not send out because the client can not handle them quick enough, leading ServerCnx to maintain too much data that should be sent out. Jun 17, 2025
@poorbarcode poorbarcode changed the title [fix][broker]OOM issue: ServerCnx replies too many responses, but can not send out because the client can not handle them quick enough, leading ServerCnx to maintain too much data that should be sent out. [fix][broker]Broker OOM: ServerCnx replies too many responses, but can not send out because the client can not handle them quick enough, leading ServerCnx to maintain too much data that should be sent out. Jun 17, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulsar Broker already contains multiple solutions for backpressure.

The channel writability is controlled with https://netty.io/4.1/api/io/netty/channel/WriteBufferWaterMark.html settings which we don't currently expose, since Pulsar hasn't been using writability for backpressure (excluding the Proxy).

For consumers, there's already a backpressure solution for outbound entries with managedLedgerMaxReadsInFlightSizeInMB.
In this case, based on the test case, this is specifically about returning topic pattern listing responses.

The problem with this PR is that the limit is per connection to the broker. That's not very effective in limiting the broker memory usage. In Netty, the standard way to handle this is to change autoread to false when writability changes to false. However, that could cause performance issues for Pulsar broker.

Instead, a more effective solution would be to have a rate limiter or semaphore in place that specifically targets the topic pattern listing responses and watchers. When this is handled asynchronously, the solution could add delay to responding to requests when limits are reached.

Besides GET_TOPICS_OF_NAMESPACE/GET_TOPICS_OF_NAMESPACE_RESPONSE, there's also WATCH_TOPIC_LIST with WATCH_TOPIC_LIST_SUCCESS and WATCH_TOPIC_UPDATE responses. That's why it makes more sense to design a proper backpressure fore topic listings.

@codecov-commenter
Copy link

codecov-commenter commented Jun 17, 2025

Codecov Report

❌ Patch coverage is 73.80952% with 22 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.17%. Comparing base (7504538) to head (3b36906).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...va/org/apache/pulsar/broker/service/ServerCnx.java 58.13% 10 Missing and 8 partials ⚠️
...che/pulsar/utils/TimedSingleThreadRateLimiter.java 87.09% 2 Missing and 2 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24423      +/-   ##
============================================
- Coverage     74.27%   74.17%   -0.10%     
+ Complexity    33264    33254      -10     
============================================
  Files          1902     1905       +3     
  Lines        148463   148591     +128     
  Branches      17213    17228      +15     
============================================
- Hits         110267   110221      -46     
- Misses        29404    29577     +173     
- Partials       8792     8793       +1     
Flag Coverage Δ
inttests 26.27% <29.76%> (-0.25%) ⬇️
systests 22.76% <29.76%> (-0.02%) ⬇️
unittests 73.71% <73.80%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...org/apache/pulsar/broker/ServiceConfiguration.java 98.20% <100.00%> (+0.02%) ⬆️
...ulsar/broker/service/PulsarChannelInitializer.java 94.73% <100.00%> (+0.19%) ⬆️
...g/apache/pulsar/common/protocol/PulsarDecoder.java 64.61% <100.00%> (ø)
...g/apache/pulsar/common/protocol/PulsarHandler.java 66.66% <ø> (ø)
...apache/pulsar/proxy/server/DirectProxyHandler.java 72.76% <ø> (ø)
...che/pulsar/utils/TimedSingleThreadRateLimiter.java 87.09% <87.09%> (ø)
...va/org/apache/pulsar/broker/service/ServerCnx.java 72.34% <58.13%> (-0.54%) ⬇️

... and 95 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@poorbarcode
Copy link
Contributor Author

@lhotari

The problem with this PR is that the limit is per connection to the broker. That's not very effective in limiting the broker memory usage. In Netty, the standard way to handle this is to change autoread to false when writability changes to false. However, that could cause performance issues for Pulsar broker.

Changed the implementation to setting channel auto-read to fasle, but the drawback is that it also affects connection health check.

@poorbarcode poorbarcode requested a review from lhotari June 18, 2025 05:22
@poorbarcode
Copy link
Contributor Author

@lhotari

The channel writability is controlled with https://netty.io/4.1/api/io/netty/channel/WriteBufferWaterMark.html settings which we don't currently expose, since Pulsar hasn't been using writability for backpressure (excluding the Proxy).
For consumers, there's already a backpressure solution for outbound entries with managedLedgerMaxReadsInFlightSizeInMB.
In this case, based on the test case, this is specifically about returning topic pattern listing responses.

Added a motivation section to the Motivation

This fix is very meaningful for the following scenario

  • thousands of topics per namespace
  • thousands of regexp consumers per namespace, and they have different regexp expressions
  • most consumers start at the same time, which will cause huge request of getting topics of namespaces

@poorbarcode
Copy link
Contributor Author

Instead, a more effective solution would be to have a rate limiter or semaphore in place that specifically targets the topic patter listing responses and watchers. When this is handled asynchronously, the solution could add delay to responding to requests when limits are reached.
Besides GET_TOPICS_OF_NAMESPACE/GET_TOPICS_OF_NAMESPACE_RESPONSE, there's also WATCH_TOPIC_LIST with WATCH_TOPIC_LIST_SUCCESS and WATCH_TOPIC_UPDATE responses. That's why it makes more sense to design a proper backpressure fore topic listings.

For the scenario that I described here, for different namespaces, which have different quantities of topics, we can hardly set an appropriate value for the rate-limiter. So the current implementation is better.

@dao-jun
Copy link
Member

dao-jun commented Jun 18, 2025

The root cause is similar with apache/bookkeeper#4556. I think maybe we can consider close the channel if it has too many pending buffer to write.

@dao-jun
Copy link
Member

dao-jun commented Jun 18, 2025

Agree with lari, I think the current impl can not solve the root cause.
If there are mannnnnny connections connected to the broker, even though we set connectionMaxPendingWriteBytes to a small value, the broker can also OOM

@lhotari
Copy link
Member

lhotari commented Jun 18, 2025

The root cause is similar with apache/bookkeeper#4556. I think maybe we can consider close the channel if it has too many pending buffer to write.

I don't think that there's a need to close the channel. For Pulsar brokers, the memory is already more or less bounded with maxMessagePublishBufferSizeInMB, managedLedgerMaxReadsInFlightSizeInMB and managedLedgerCacheSizeMB settings. The only major gap seems to be the GET_TOPICS_OF_NAMESPACE/GET_TOPICS_OF_NAMESPACE_RESPONSE and WATCH_TOPIC_LIST/WATCH_TOPIC_LIST_SUCCESS & WATCH_TOPIC_UPDATE handling.

@lhotari
Copy link
Member

lhotari commented Jun 18, 2025

Regarding the TopicListWatcher, it will consume a significant amount of heap memory if the topic names aren't deduplicated String instances:


Guava's Interner class is a good way to deduplicate String instances. Example from Gradle source code: https://github.com/gradle/gradle/blob/master/platforms/core-runtime/base-services/src/main/java/org/gradle/api/internal/cache/StringInterner.java . The usage of java.lang.String.intern could be problematic since the GC of the instances isn't well defined.

@dao-jun
Copy link
Member

dao-jun commented Jun 18, 2025

The root cause is similar with apache/bookkeeper#4556. I think maybe we can consider close the channel if it has too many pending buffer to write.

I don't think that there's a need to close the channel. For Pulsar brokers, the memory is already more or less bounded with maxMessagePublishBufferSizeInMB, managedLedgerMaxReadsInFlightSizeInMB and managedLedgerCacheSizeMB settings. The only major gap seems to be the GET_TOPICS_OF_NAMESPACE/GET_TOPICS_OF_NAMESPACE_RESPONSE and WATCH_TOPIC_LIST/WATCH_TOPIC_LIST_SUCCESS & WATCH_TOPIC_UPDATE handling.

Yes, but maybe we should call cnx.completedSendOperation after response send finished.
like here
image

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be fine with this solution if it would be based on
ChannelOption.WRITE_BUFFER_WATER_MARK settings and handle autoread
toggling with ServerCnxThrottleTracker's
incrementThrottleCount/decrementThrottleCount in the
channelWritabilityChanged callback method. There should be a separate
setting to enable it, since configuring WriteBufferWaterMark high and
low values are separate from the throttling based on channel
writability. This solution would be useful with suitable setting, but it wouldn't be the correct way to fix the actual problem of broker OOM.
More details in email response, https://lists.apache.org/thread/2nknhw2rbspjbb4y2ocsgqh15yj7lzyh (previous one: https://lists.apache.org/thread/2qf12h5nh1m0n6y4t7szxz061dljw90f).

@poorbarcode poorbarcode changed the title [fix][broker]Broker OOM: ServerCnx replies too many responses, but can not send out because the client can not handle them quick enough, leading ServerCnx to maintain too much data that should be sent out. [fix][broker] Part-1 of PIP-434: xpose Netty channel configuration WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests when channel is unwritable Aug 8, 2025
@poorbarcode poorbarcode changed the title [fix][broker] Part-1 of PIP-434: xpose Netty channel configuration WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests when channel is unwritable [improve][broker] Part-1 of PIP-434: xpose Netty channel configuration WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests when channel is unwritable Aug 8, 2025
@poorbarcode poorbarcode requested a review from lhotari September 23, 2025 17:29
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments, I hope these are useful.

@poorbarcode poorbarcode requested a review from lhotari September 24, 2025 03:37
@poorbarcode poorbarcode requested a review from lhotari September 24, 2025 11:32
@lhotari
Copy link
Member

lhotari commented Sep 24, 2025

@poorbarcode checkstyle:

Error:  src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:[506] (sizes) LineLength: Line is longer than 120 characters (found 125).

A quick way to run checkstyle before pushing is mvn -T 1C initialize checkstyle:check

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good work and thanks for the patience @poorbarcode

@lhotari lhotari merged commit 1b74fe0 into apache:master Sep 24, 2025
96 of 98 checks passed
@lhotari
Copy link
Member

lhotari commented Oct 8, 2025

There's a flaky test, #24827. @poorbarcode Please check.

walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
…on WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests when channel is unwritable (apache#24423)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants