-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][broker] Implement topic-level delayed delivery tracking with in-memory manager #24922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a topic-level delayed delivery tracking system to optimize memory usage in multi-subscription scenarios. Instead of maintaining separate delayed message indices per subscription, the new implementation uses a shared global index at the topic level with per-subscription views.
Key changes:
- Introduces
TopicDelayedDeliveryTrackerManagerinterface andInMemoryTopicDelayedDeliveryTrackerManagerimplementation for shared topic-level delayed message tracking - Adds
InMemoryTopicDelayedDeliveryTrackerViewas a per-subscription adapter that delegates to the topic-level manager - Updates
InMemoryDelayedDeliveryTrackerFactoryto create and cache topic-level managers and return subscription views - Updates test assertions to expect the new
InMemoryTopicDelayedDeliveryTrackerViewtype instead ofInMemoryDelayedDeliveryTracker
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| TopicDelayedDeliveryTrackerManager.java | New interface defining the contract for topic-level delayed delivery tracking management |
| InMemoryTopicDelayedDeliveryTrackerManager.java | New implementation of topic-level manager with shared delayed message index and per-subscription context tracking |
| InMemoryTopicDelayedDeliveryTrackerView.java | New per-subscription view adapter that implements DelayedDeliveryTracker and delegates to the topic-level manager |
| InMemoryDelayedDeliveryTrackerFactory.java | Updated to create topic-level managers and cache them, returning subscription views instead of individual trackers |
| DelayedDeliveryTrackerFactoryTest.java | Updated test assertions to expect InMemoryTopicDelayedDeliveryTrackerView instances |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, | ||
| k -> new InMemoryTopicDelayedDeliveryTrackerManager(timer, tickTimeMillis, | ||
| isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead)); |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The topicManagers map could accumulate stale entries for topics that have been unloaded. When a topic is unloaded and all subscriptions are closed, the manager should be removed from the map. Consider adding cleanup logic in InMemoryTopicDelayedDeliveryTrackerManager.unregister() that notifies the factory to remove the manager from the cache, or add a callback mechanism for the manager to remove itself when it closes.
| public void updateMarkDeletePosition(Position position) { | ||
| checkClosed(); | ||
| manager.updateMarkDeletePosition(subContext, position); | ||
| } |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The updateMarkDeletePosition method is public but not part of the DelayedDeliveryTracker interface. This creates an API inconsistency where callers need to cast to the concrete type to call this method. Consider either adding this method to the interface or finding an alternative design pattern (such as having the dispatcher directly notify the manager) to avoid breaking the interface abstraction.
|
|
||
| synchronized (this) { | ||
| subscriptionContexts.remove(subscriptionName); | ||
|
|
||
| // If no more subscriptions, close the manager | ||
| if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty()) { | ||
| close(); | ||
| } | ||
| } |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling close() within unregister() while already holding the lock could lead to maintenance issues. The close() method also acquires the same lock (line 180), which is reentrant but adds complexity. Consider either documenting this self-closing behavior clearly or extracting the cleanup logic to avoid nested locking patterns.
| synchronized (this) { | |
| subscriptionContexts.remove(subscriptionName); | |
| // If no more subscriptions, close the manager | |
| if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty()) { | |
| close(); | |
| } | |
| } | |
| boolean shouldClose = false; | |
| synchronized (this) { | |
| subscriptionContexts.remove(subscriptionName); | |
| // If no more subscriptions, close the manager | |
| if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty()) { | |
| shouldClose = true; | |
| } | |
| } | |
| if (shouldClose) { | |
| close(); | |
| } |
| // For now, tick time updates are not supported after initialization | ||
| // This could be enhanced to update all subscription contexts | ||
| log.warn("Tick time updates are not currently supported for topic-level delayed delivery managers"); |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging a warning every time onTickTimeUpdated is called could spam logs if this method is invoked frequently. Consider either implementing the functionality, using a rate-limited logger, or throwing an UnsupportedOperationException to clearly indicate this is not supported rather than silently logging warnings.
| // For now, tick time updates are not supported after initialization | |
| // This could be enhanced to update all subscription contexts | |
| log.warn("Tick time updates are not currently supported for topic-level delayed delivery managers"); | |
| throw new UnsupportedOperationException("Tick time updates are not currently supported for topic-level delayed delivery managers"); |
| SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, | ||
| k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, | ||
| fixedDelayDetectionLookahead)); | ||
|
|
||
| return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext); |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The createOrGetView method always creates a new InMemoryTopicDelayedDeliveryTrackerView instance even when the SubContext already exists. This means multiple view instances could be created for the same subscription, each holding a reference to the same SubContext. When multiple views are closed, the unregister method will remove the subscription context prematurely, breaking other view instances. The method should track and return existing view instances or document that only one view per subscription should be active at a time.
| AtomicInteger counter = new AtomicInteger(0); | ||
| InMemoryDelayedDeliveryTracker tracker = (InMemoryDelayedDeliveryTracker) optional.get(); | ||
| InMemoryTopicDelayedDeliveryTrackerView tracker = (InMemoryTopicDelayedDeliveryTrackerView) optional.get(); | ||
| tracker = Mockito.spy(tracker); |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double space between = and Mockito.spy(tracker). Should be a single space for consistency with code formatting standards.
| tracker = Mockito.spy(tracker); | |
| tracker = Mockito.spy(tracker); |
|
It would be worthwhile to create a PIP to cover these changes. In this case the reference is currently to #24600 as the motivation. However, there's no clear problem description or goals what is done and why. For reviewers, it's most necessary to understand the problem that the PR intends to solve. That's one role of a PIP document.
LLM will generate 80% of the PIP document if you already have written a lot of the background material in other discussions. This solution requires a LLM model with sufficient context window size. If the window size is not sufficient, you can use text files instead of printed PDFs for the existing issues and background context discussions. |
|
@lhotari I‘m sorry that I mentioned that I chose the wrong repo. Originally, I was going to mention my own repo. Indeed, this is a big change that requires a PIP. This is only a preliminary implementation. I have found a lot of problems. There are more changes to be made. |
|
Yes. I have written a prompt before, which can generate documents in the correct format. https://gist.github.com/Denovo1998/163e55b3a612873364a00cf0df5a1b95 |
…fixed-delay detection and memory optimization
Fixes #xyz
Main Issue: #24600
PIP: #xyz
Motivation
Modifications
Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: #24927