Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
fc876b7
[feat][broker] Implement topic-level delayed delivery tracking with i…
Denovo1998 Oct 30, 2025
dee4363
feat[broker] Enhance InMemoryTopicDelayedDeliveryTrackerManager with …
Denovo1998 Nov 1, 2025
e455512
[feat][broker] Improve InMemoryTopicDelayedDeliveryTrackerManager wit…
Denovo1998 Nov 1, 2025
60f6bcd
feat[broker] Add pruning mechanism and improve timestamp handling in …
Denovo1998 Nov 1, 2025
2c30c94
[feat][broker] Refactor delayedMessageMap to use Long2ObjectRBTreeMap…
Denovo1998 Nov 1, 2025
6fd6ead
[feat][broker] Refactor mark-delete handling in InMemoryTopicDelayedD…
Denovo1998 Nov 1, 2025
6784c43
feat[broker] Simplify InMemoryTopicDelayedDeliveryTrackerManager by r…
Denovo1998 Nov 1, 2025
701ffbf
feat[broker] Add testing-friendly constructor and accessors to InMemo…
Denovo1998 Nov 1, 2025
c74a879
[feat][broker] Refactor timestamp handling and bucket logic in InMemo…
Denovo1998 Nov 1, 2025
e576c65
[feat][broker] Remove deprecated methods and simplify test annotation…
Denovo1998 Nov 1, 2025
938e4f6
[feat][broker] Replace highestDeliveryTimeTracked with AtomicLong for…
Denovo1998 Nov 1, 2025
e1f2c89
[feat][broker] Introduce InMemoryTopicDelayedDeliveryTrackerFactory f…
Denovo1998 Nov 2, 2025
32f2170
[feat][broker] Add configuration options for in-memory topic-level de…
Denovo1998 Nov 2, 2025
14fc81b
[feat][broker] Replace manual wait loops with Awaitility for pruning …
Denovo1998 Nov 2, 2025
23fbfb6
[feat][broker] Update documentation for InMemoryTopicDelayedDeliveryT…
Denovo1998 Nov 2, 2025
4aada39
feat[broker] Centralize mark-delete propagation to topic-level delaye…
Denovo1998 Nov 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,9 @@ delayedDeliveryEnabled=true
# Class name of the factory that implements the delayed deliver tracker.
# If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
# will create bucket based delayed message index tracker.
# If value is "org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory",
# will use a topic-level in-memory delayed message index tracker (a distinct implementation that shares
# the index across subscriptions to reduce memory usage in multi-subscription scenarios).
delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory

# Control the tick time for when retrying on delayed delivery,
Expand All @@ -669,6 +672,24 @@ delayedDeliveryTickTimeMillis=1000
# delayedDeliveryTickTimeMillis.
isDelayedDeliveryDeliverAtTimeStrict=false

# Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed delivery tracker.
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
# Set to a positive value to override the default adaptive interval based on delayedDeliveryTickTimeMillis.
# Set to 0 or a negative value to use the default adaptive interval.
delayedDeliveryPruneMinIntervalMillis=0

# The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an opportunistic
# prune in the in-memory topic-level delayed delivery tracker.
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
# For example, 0.5 means prune when at least half of the subscriptions are eligible. Default is 0.5.
delayedDeliveryPruneEligibleRatio=0.5

# Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager.
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
# When the last subscription is unregistered, the manager will be removed from the factory cache after this idle
# timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove immediately (default).
delayedDeliveryTopicManagerIdleMillis=0

# The delayed message index bucket min index count.
# When the index count of the current bucket is more than this value and all message indexes of current ledger
# have already been added to the tracker we will seal the bucket.
Expand Down
21 changes: 21 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,9 @@ delayedDeliveryEnabled=true
# Class name of the factory that implements the delayed deliver tracker.
# If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
# will create bucket based delayed message index tracker.
# If value is "org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory",
# will use a topic-level in-memory delayed message index tracker (a distinct implementation that shares
# the index across subscriptions to reduce memory usage in multi-subscription scenarios).
delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory

# Control the tick time for when retrying on delayed delivery,
Expand All @@ -1437,6 +1440,24 @@ delayedDeliveryTickTimeMillis=1000
# delayedDeliveryTickTimeMillis.
isDelayedDeliveryDeliverAtTimeStrict=false

# Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed delivery tracker.
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
# Set to a positive value to override the default adaptive interval based on delayedDeliveryTickTimeMillis.
# Set to 0 or a negative value to use the default adaptive interval.
delayedDeliveryPruneMinIntervalMillis=0

# The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an opportunistic
# prune in the in-memory topic-level delayed delivery tracker.
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
# For example, 0.5 means prune when at least half of the subscriptions are eligible. Default is 0.5.
delayedDeliveryPruneEligibleRatio=0.5

# Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager.
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
# When the last subscription is unregistered, the manager will be removed from the factory cache after this idle
# timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove immediately (default).
delayedDeliveryTopicManagerIdleMillis=0

# The delayed message index bucket min index count.
# When the index count of the current bucket is more than this value and all message indexes of current ledger
# have already been added to the tracker we will seal the bucket.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(category = CATEGORY_SERVER, doc = """
Class name of the factory that implements the delayed deliver tracker.
If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", \
will create bucket based delayed message index tracker.
will create bucket based delayed message index tracker.\n
If value is "org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory", \
will create topic-level in-memory delayed message index tracker.\n
If value is "org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory", \
will create in-memory delayed delivery tracker (per existing implementation).
""")
private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed"
+ ".InMemoryDelayedDeliveryTrackerFactory";
Expand Down Expand Up @@ -417,6 +421,28 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
+ "logic to handle fixed delays in messages in a different way.")
private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;

@FieldContext(category = CATEGORY_SERVER, doc = """
Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed
delivery tracker. Set to a positive value to override the default adaptive interval based on
delayedDeliveryTickTimeMillis. Set to 0 or a negative value to use the default adaptive interval.
""")
private long delayedDeliveryPruneMinIntervalMillis = 0;

@FieldContext(category = CATEGORY_SERVER, doc = """
The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an
opportunistic prune in the in-memory topic-level delayed delivery tracker. For example, 0.5 means prune
when at least half of the subscriptions are eligible. Default is 0.5.
""")
private double delayedDeliveryPruneEligibleRatio = 0.5;

@FieldContext(category = CATEGORY_SERVER, doc = """
Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager. When the
last subscription is unregistered, the manager will be removed from the factory cache after this idle
timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove
immediately (default).
""")
private long delayedDeliveryTopicManagerIdleMillis = 0;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which \
exceeds this max delay, then it will return an error to the producer. \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void initialize(PulsarService pulsarService) {
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
try {
tracker = newTracker0(dispatcher);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import java.util.NavigableSet;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;

/**
* Subscription-scoped tracker implementing {@link DelayedDeliveryTracker} by delegating all
* operations to the topic-level {@link InMemoryTopicDelayedDeliveryTrackerManager}.
*/
@Slf4j
public class InMemoryTopicDelayedDeliveryTracker implements DelayedDeliveryTracker {

private final InMemoryTopicDelayedDeliveryTrackerManager manager;
private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext;
private volatile boolean closed = false;

public InMemoryTopicDelayedDeliveryTracker(InMemoryTopicDelayedDeliveryTrackerManager manager,
InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) {
this.manager = manager;
this.subContext = subContext;
}

@Override
public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
checkClosed();
return manager.addMessageForSub(subContext, ledgerId, entryId, deliveryAt);
}

@Override
public boolean hasMessageAvailable() {
checkClosed();
return manager.hasMessageAvailableForSub(subContext);
}

@Override
public long getNumberOfDelayedMessages() {
checkClosed();
// Return an estimate of visible delayed messages for this subscription
// For now, return the total count - could be enhanced to count only visible messages
return manager.topicDelayedMessages();
}

@Override
public long getBufferMemoryUsage() {
checkClosed();
// Return the topic-level memory usage (shared by all subscriptions)
return manager.topicBufferMemoryBytes();
}

@Override
public NavigableSet<Position> getScheduledMessages(int maxMessages) {
checkClosed();
return manager.getScheduledMessagesForSub(subContext, maxMessages);
}

@Override
public boolean shouldPauseAllDeliveries() {
checkClosed();
return manager.shouldPauseAllDeliveriesForSub(subContext);
}

@Override
public void resetTickTime(long tickTime) {
checkClosed();
manager.onTickTimeUpdated(tickTime);
}

@Override
public CompletableFuture<Void> clear() {
checkClosed();
// For topic-level manager, clear is a no-op for individual subscriptions
manager.clearForSub();
return CompletableFuture.completedFuture(null);
}

@Override
public void close() {
if (closed) {
return;
}
closed = true;
manager.unregister(subContext.getDispatcher());
}

/**
* Update the mark delete position for this subscription.
* This is called by the dispatcher when messages are acknowledged.
*/
public void updateMarkDeletePosition(Position position) {
checkClosed();
manager.updateMarkDeletePosition(subContext, position);
}

private void checkClosed() {
if (closed) {
throw new IllegalStateException("DelayedDeliveryTracker is already closed");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Clock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryTopicDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {

private static final Logger log = LoggerFactory.getLogger(InMemoryTopicDelayedDeliveryTrackerFactory.class);

private Timer timer;

private long tickTimeMillis;

private boolean isDelayedDeliveryDeliverAtTimeStrict;

private long fixedDelayDetectionLookahead;

// New tuning knobs
private long pruneMinIntervalMillis;
private double pruneEligibleRatio;
private long topicManagerIdleMillis;

// Cache of topic-level managers: topic name -> manager instance
private final ConcurrentMap<String, TopicDelayedDeliveryTrackerManager> topicManagers = new ConcurrentHashMap<>();

@VisibleForTesting
InMemoryTopicDelayedDeliveryTrackerFactory(Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
this.pruneMinIntervalMillis = 0;
this.pruneEligibleRatio = 0.5;
this.topicManagerIdleMillis = 0;
}

@VisibleForTesting
int getCachedManagersSize() {
return topicManagers.size();
}

@VisibleForTesting
boolean hasManagerForTopic(String topicName) {
return topicManagers.containsKey(topicName);
}

@Override
public void initialize(PulsarService pulsarService) {
ServiceConfiguration config = pulsarService.getConfig();
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
this.fixedDelayDetectionLookahead = config.getDelayedDeliveryFixedDelayDetectionLookahead();
this.pruneMinIntervalMillis = config.getDelayedDeliveryPruneMinIntervalMillis();
this.pruneEligibleRatio = config.getDelayedDeliveryPruneEligibleRatio();
this.topicManagerIdleMillis = config.getDelayedDeliveryTopicManagerIdleMillis();
}

@Override
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
try {
tracker = newTracker0(dispatcher);
} catch (Exception e) {
// it should never go here
log.warn("Failed to create InMemoryTopicDelayedDeliveryTracker, topic {}, subscription {}",
topicName, subscriptionName, e);
}
return tracker;
}

@VisibleForTesting
DelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();

// Get or create topic-level manager for this topic with onEmpty callback to remove from cache
final TopicDelayedDeliveryTrackerManager[] holder = new TopicDelayedDeliveryTrackerManager[1];
TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, k -> {
InMemoryTopicDelayedDeliveryTrackerManager m = new InMemoryTopicDelayedDeliveryTrackerManager(
timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead, pruneMinIntervalMillis, pruneEligibleRatio, () -> {
if (topicManagerIdleMillis <= 0) {
topicManagers.remove(topicName, holder[0]);
} else {
timer.newTimeout(__ -> {
TopicDelayedDeliveryTrackerManager tm = holder[0];
if (tm instanceof InMemoryTopicDelayedDeliveryTrackerManager) {
if (!((InMemoryTopicDelayedDeliveryTrackerManager) tm).hasActiveSubscriptions()) {
topicManagers.remove(topicName, tm);
}
} else {
// If the manager has been replaced or removed, ensure entry is cleaned up
topicManagers.remove(topicName, tm);
}
}, topicManagerIdleMillis, TimeUnit.MILLISECONDS);
}
});
holder[0] = m;
return m;
});

// Create a per-subscription tracker from the topic-level manager
return manager.createOrGetTracker(dispatcher);
}

@Override
public void close() {
// Close all topic-level managers
for (TopicDelayedDeliveryTrackerManager manager : topicManagers.values()) {
try {
manager.close();
} catch (Exception e) {
log.warn("Failed to close topic-level delayed delivery manager", e);
}
}
topicManagers.clear();

if (timer != null) {
timer.stop();
}
}
}
Loading