-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][broker] Implement topic-level delayed delivery tracking with in-memory manager #24927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…fixed-delay detection and memory optimization
…h enhanced memory management and concurrency controls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a topic-level delayed delivery tracking mechanism to optimize memory usage in multi-subscription scenarios. Instead of each subscription maintaining its own delayed message index, a single shared index is maintained at the topic level with per-subscription views.
- Refactors delayed delivery tracking from subscription-level to topic-level with shared indexes
- Introduces
TopicDelayedDeliveryTrackerManagerinterface for managing topic-level delayed delivery - Implements
InMemoryTopicDelayedDeliveryTrackerManagerthat maintains a global delayed message index shared by all subscriptions - Creates
InMemoryTopicDelayedDeliveryTrackerViewas per-subscription facade that forwards operations to the topic-level manager - Updates
InMemoryDelayedDeliveryTrackerFactoryto use topic-level managers with caching and automatic cleanup
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| TopicDelayedDeliveryTrackerManager.java | New interface defining the contract for topic-level delayed delivery managers |
| InMemoryTopicDelayedDeliveryTrackerManager.java | New implementation of topic-level manager with shared delayed message index and fine-grained locking |
| InMemoryTopicDelayedDeliveryTrackerView.java | New per-subscription view that delegates to the topic-level manager |
| InMemoryDelayedDeliveryTrackerFactory.java | Updated to instantiate topic-level managers and cache them per topic |
| InMemoryTopicDeliveryTrackerTest.java | New comprehensive test suite for the topic-level tracker |
| DelayedDeliveryTrackerFactoryTest.java | Updated assertions to use the new view class instead of the legacy tracker |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
Outdated
Show resolved
Hide resolved
...oker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTracker.java
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Outdated
Show resolved
Hide resolved
…InMemoryTopicDelayedDeliveryTrackerManager
… for improved performance and memory efficiency
…eliveryTrackerManager for event-driven updates and improved performance
…emoving deprecated constructor and methods for improved clarity
…ryDelayedDeliveryTrackerFactory for improved testability
…ryTopicDelayedDeliveryTrackerManager for improved accuracy and performance
…s in InMemoryTopicDelayedDeliveryTrackerManager for improved clarity and maintainability
… thread-safe updates and improve concurrency handling in InMemoryTopicDelayedDeliveryTrackerManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java:1
- The test uses polling with Thread.sleep() which can make tests flaky and slow. Consider using more reliable synchronization mechanisms like CountDownLatch or CompletableFuture for test coordination.
/*
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...er/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Outdated
Show resolved
Hide resolved
...-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java
Outdated
Show resolved
Hide resolved
…or topic-level delayed message tracking and update related configurations
…layed delivery tracker and rename view class for clarity
…tests in InMemoryTopicDelayedDeliveryTrackerTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 11 out of 12 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactory.java
Outdated
Show resolved
Hide resolved
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
Show resolved
Hide resolved
…rackerFactory and related configurations to clarify memory usage and implementation details
…d tracker across dispatcher implementations
| public void onTickTimeUpdated(long newTickTimeMillis) { | ||
| if (this.tickTimeMillis == newTickTimeMillis) { | ||
| return; | ||
| } | ||
| this.tickTimeMillis = newTickTimeMillis; | ||
| // Propagate to all subscriptions | ||
| for (SubContext sc : subscriptionContexts.values()) { | ||
| sc.tickTimeMillis = newTickTimeMillis; | ||
| } | ||
| // Re-evaluate timer scheduling with new tick time | ||
| timerLock.lock(); | ||
| try { | ||
| updateTimerLocked(); | ||
| } finally { | ||
| timerLock.unlock(); | ||
| } | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", newTickTimeMillis); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lhotari @codelipenghui @coderzc @Apurva007 @thetumbled @dao-jun @BewareMyPower
Currently, getScheduledMessagesForSub does not mark the returned position as "Scheduled for this subscription" nor does it delete it in the topic-level index (can only wait for unified pruning after all subscriptions mark-delete). As a result, before the ack, the next getScheduledMessagesForSub will still return the same position, causing duplicate scheduling and duplicate reading, which in severe cases can lead to duplicate delivery.
The original per-sub InMemoryDelayedDeliveryTracker usually "takes out and removes" from the subscription structure when getScheduledMessages, avoiding duplicate scheduling; after the shared index, a per-sub "pending/scheduled but unconfirmed" collection or cursor is also needed, used to filter out the positions that have been scheduled but not confirmed when getScheduledMessagesForSub, and will not return these positions again before mark-delete.
This needs optimization, but it feels like there's no low-cost way to handle it.
1.Index bucketing uses a fixed indexGranularityMillis (can be the initial tick or a fixed constant of 10ms/1ms), no longer changes with resetTickTime; resetTickTime only affects the triggering frequency of the Timer.
2.Deduplication on the read side. In SubContext, maintain a pending collection of ledgerId -> Roaring64Bitmap(entryId); get ScheduledMessagesForSub records the returned positions in pending, and subsequent calls filter pending; when mark-delete is advanced, remove from pending; Timer judgment also filters pending to avoid empty rotation.
3.Block and rebuild delayedMessageMap here at onTickTimeUpdated.
Fixes #xyz
Main Issue: #24600
PIP: #24928
Motivation
The primary motivation for this proposal is to address the high memory consumption caused by the current per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages becomes prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of O(num_delayed_messages * num_subscriptions).
This excessive memory usage can cause:
Increased memory pressure on Pulsar brokers.
More frequent and longer Garbage Collection (GC) pauses, impacting broker performance.
Potential OutOfMemoryErrors, leading to broker instability.
Limited scalability for use cases that rely on many subscriptions per topic, such as IoT or large-scale microservices with shared subscriptions.
By optimizing the delayed message tracking to be more memory-efficient, we can enhance broker stability and scalability, allowing Pulsar to better support these critical use cases.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: Denovo1998#16