From 4b326a475f72157117faada4f118fe91e5daee0e Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 23 Jun 2025 16:58:40 +0300 Subject: [PATCH 1/3] [pip] PIP-430: Pulsar Broker cache improvements: refactoring eviction and adding a new cache strategy based on expected read count --- pip/pip-430.md | 335 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 pip/pip-430.md diff --git a/pip/pip-430.md b/pip/pip-430.md new file mode 100644 index 0000000000000..0ef01451eeb0e --- /dev/null +++ b/pip/pip-430.md @@ -0,0 +1,335 @@ +# PIP-430: Pulsar Broker cache improvements: refactoring eviction and adding a new cache strategy based on expected read count + +# Background knowledge + +Apache Pulsar brokers maintain an in-memory entry cache to reduce latency and load on BookKeeper and tiered storage (S3) by serving frequently accessed message data directly from memory. + +Key concepts: +- **`ManagedLedgerImpl`**: Manages the storage for a single topic partition. Each `ManagedLedgerImpl` instance has its own `EntryCache` (typically `RangeEntryCacheImpl`) instance. +- **`EntryCache`**: Stores message entries (payloads) in memory. The default implementation is `RangeEntryCacheImpl`, which uses a `RangeCache` internally. +- **`RangeCache`**: A specialized cache storing entries mapped by their `Position` (ledgerId, entryId). It supports range-based operations for retrieval and expiration. +- **`EntryCacheManager` (`RangeEntryCacheManagerImpl`)**: A global component that limits the total size of all entry caches in a broker. When the total size exceeds a threshold, it triggers eviction. +- **Cache Eviction Policies**: Mechanisms to remove entries from the cache to make space or remove old data. + - **Timestamp-based eviction**: Removes entries older than a configured threshold (e.g., `managedLedgerCacheEvictionTimeThresholdMillis`). This is currently handled periodically by `ManagedLedgerFactoryImpl` iterating over all managed ledgers. + - **Size-based eviction**: Removes entries when the total cache size exceeds a configured limit (`managedLedgerCacheSizeMB`). The current implementation resides in `EntryCacheDefaultEvictionPolicy`, which selects a subset of larger caches and proportionally evicts entries from each entry cache to keep the total cache size under the limit. + - **Cursor-based eviction**: Invalidates cache entries up to the slowest consumer's read position or mark-delete position, depending on `cacheEvictionByMarkDeletedPosition`. + +The broker cache serves various read patterns: +- **Tailing reads**: Consumers reading the latest published messages. +- **Catch-up reads (backlogged cursors)**: Consumers reading older messages to catch up. +- **Key_Shared subscription reads**: Messages with the same key are routed to the same consumer. If a consumer is slow, messages might be replayed while the cursor reads more messages for available consumers. + +# Motivation + +The current Pulsar broker entry cache implementation and its eviction mechanisms face several challenges that impact performance, efficiency, and predictability: + +1. **Inefficient and Flawed Size-Based Eviction**: + The `EntryCacheDefaultEvictionPolicy` (the current default for size-based eviction) does not guarantee the removal of the oldest entries globally. It sorts individual `EntryCache` instances by their size, selects a percentage of the largest caches, and then asks each of them to evict a proportional amount of data. This can lead to newer entries being evicted from large, active caches while older, less relevant entries remain in smaller or less active caches, resulting in suboptimal cache utilization and potentially lower hit rates. + +2. **Inefficient and Incorrect Timestamp-Based Eviction**: + The existing timestamp-based eviction mechanism, triggered by `ManagedLedgerFactoryImpl`, has significant performance and correctness issues: + * **Performance**: It iterates through *all* `ManagedLedgerImpl` instances and their respective `EntryCache` instances periodically (default every 10ms). In brokers with a large number of topics, this frequent and exhaustive iteration leads to high CPU utilization and memory pressure. + * **Correctness**: The per-cache eviction (e.g., `RangeCache.evictLEntriesBeforeTimestamp`) often assumes entries within a single `RangeCache` are primarily ordered by timestamp due to typical append-only workloads. This assumption breaks down with mixed read patterns like catch-up reads or when entries are inserted out of their natural position order (Key_shared subscription replay queue scenario), potentially leading to incorrect eviction decisions or inefficient scanning. + +3. **Limited Cache Scope and Effectiveness for Diverse Read Patterns**: + The original `RangeCache` was primarily designed with tailing reads in mind. While support for caching for backlogged cursors and replay queue reads was added later, the eviction algorithms were not holistically updated to effectively manage mixed read patterns (tailing, catch-up, replays in Key_Shared). This can lead to: + * Unnecessary BookKeeper and tiered storage (S3) reads during catch-up scenarios, even if data was recently read for another consumer. + * Poor cache hit rates for Key_Shared subscriptions with slow consumers, as entries might be evicted before a replayed message (due to consumer unacknowledgment or redelivery request) is read again. + +4. **Foundation for Advanced Caching Strategies Needed**: + The current cache architecture makes it difficult to implement more intelligent caching strategies that could further optimize for common Pulsar use cases, such as efficiently handling fan-out to multiple shared consumers or retaining entries expected to be read by several cursors. + +Addressing these issues is crucial for improving broker performance, reducing operational costs (lower BookKeeper load), and providing a more robust caching layer that can adapt to diverse workloads. + +# Goals + +The refactoring aims to make the cache eviction more robust, performant, and predictable. The "expected read count" strategy is an attempt to make the cache more aware of Pulsar's specific consumption patterns. + +## In Scope + +- **Refactor Cache Eviction Mechanism**: + - Replace the existing per-cache iteration for timestamp eviction and the `EntryCacheDefaultEvictionPolicy` for size-based eviction with a centralized, insertion-order aware mechanism. + - Implement a `RangeCacheRemovalQueue` that tracks all cached entries globally in approximate insertion order. + - Ensure timestamp-based eviction reliably removes entries older than the threshold by processing this queue. + - Ensure size-based eviction globally removes the oldest entries first from this queue until the target size is met. +- **Introduce "Expected Read Count" Cache Strategy**: + - Implement a new caching strategy where entries track an "expected read count," representing how many active cursors are anticipated to read that entry. + - Prioritize retaining entries with a positive expected read count during size-based eviction. + - Provide a new configuration option (`cacheEvictionByExpectedReadCount`) to enable this strategy. +- **Improve Performance and Efficiency**: + - Reduce CPU overhead associated with cache eviction, particularly timestamp-based eviction. + - Improve overall cache hit rates by making better eviction decisions. +- **Enhance Correctness**: Ensure eviction policies work correctly across various read patterns. +- **Provide a Foundation for Future Cache Optimizations**: The refactored design should make it easier to implement further caching improvements, such as more sophisticated strategies for catch-up reads or Key_Shared subscriptions. +- **Simplify RangeCache Implementation**: Remove unnecessary generic type parameters from the RangeCache implementation. Since RangeCache is used exclusively for a single purpose within the Pulsar codebase (caching managed ledger entries), the generic key and value parameters add unnecessary complexity without providing any practical benefit. This simplification will improve code readability and reduce cognitive overhead. + +# High Level Design + +The proposed solution involves two main components: a refactored eviction mechanism using a centralized removal queue and a new cache strategy based on expected read count. + +## 1. Centralized Cache Eviction with `RangeCacheRemovalQueue` + +A new component, `RangeCacheRemovalQueue`, will be introduced at the `EntryCacheManager` level. +- **Entry Tracking**: When an entry is inserted into any `RangeEntryCacheImpl` (the per-ledger cache), a lightweight wrapper for this entry (`RangeCacheEntryWrapper`) is also added to this global `RangeCacheRemovalQueue`. This queue maintains entries in their global insertion order (FIFO). This wrapper is already necessary in `RangeEntryCacheImpl` to prevent consistency issues. The current internal wrapper class is refactored to a top-level class so that it can be used with the removal queue. +- **Timestamp-Based Eviction**: A single, periodic task (e.g., managed by `ManagedLedgerFactoryImpl`'s `cacheEvictionExecutor`) will process the `RangeCacheRemovalQueue`. It will iterate from the head of the queue, removing entries whose `timestampNanos` are older than the `cacheEvictionTimeThresholdNanos`. Since the queue is insertion-ordered, this process can often stop early once it encounters an entry that is not expired. +- **Size-Based Eviction**: When the `EntryCacheManager` detects that the total cache size exceeds `evictionTriggerThresholdPercent * maxSize`, it will trigger an eviction cycle. This cycle will also process the `RangeCacheRemovalQueue` from the head, removing the oldest entries (regardless of which specific ledger they belong to) until the cache size is brought down to `cacheEvictionWatermark * maxSize`. +- **Stashing Mechanism**: During size-based eviction, if an entry is encountered that should not be evicted immediately (e.g., due to a positive "expected read count" as per the new strategy, and it hasn't met timestamp expiration), it will be temporarily "stashed" (moved to a secondary list within `RangeCacheRemovalQueue`). Stashed entries are reconsidered in subsequent eviction passes or if they eventually expire by timestamp. This prevents premature eviction of entries that are likely to be read again soon. + +This centralized approach replaces the distributed, per-cache iteration for timestamp eviction and the less precise `EntryCacheDefaultEvictionPolicy` for size eviction, leading to more globally optimal and efficient eviction decisions. + +## 2. "Expected Read Count" Cache Strategy + +This new strategy aims to improve cache hit rates by retaining entries that are likely to be read by multiple active consumers/cursors. +- **`EntryReadCountHandler`**: Each cached entry (`CachedEntryImpl`) will be associated with an `EntryReadCountHandlerImpl`. This handler maintains an `expectedReadCount` (an atomic integer). +- **Initialization**: + - When a new entry is added to the ledger (`OpAddEntry`), its `expectedReadCount` is initialized to the number of *active* cursors. + - When entries are read from BookKeeper or tiered storage and inserted into the cache (`RangeEntryCacheImpl.readFromStorage`), the `expectedReadCount` is initialized based on the current state of active cursors currently positioned before or at the entry being added. This information is sourced from `ManagedCursorContainer.getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor)`. +- **Dynamic Updates**: + - When an entry is actually delivered to a consumer from the cache and subsequently released by the delivery mechanism, its `expectedReadCount` is decremented (via `EntryReadCountHandler.markRead()`). + - The `expectedReadCount` on a cached entry is incremented when the entry is added to the replay queue. This will de-prioritize the removal of the entry from the cache when size based eviction is performed so that when the Key_Shared consumer is available to read the entry, it would more likely be available in the cache. +- **Eviction Consideration**: + - The `RangeCacheRemovalQueue`'s size-based eviction logic will consult `CachedEntry.canEvict()`. This method returns `true` if `expectedReadCount <= 0`. + - If `canEvict()` is false and the entry hasn't met timestamp expiration, it's stashed instead of being immediately evicted during a size-based eviction pass. + - If an entry is forcefully removed from the cache (e.g., ledger deletion, or direct invalidation call that bypasses normal eviction), `EntryReadCountHandler.markEvicted()` is called to set a special state, preventing further increments and ensuring it's recognized as fully processed. +- **Configuration**: A new broker configuration setting, `cacheEvictionByExpectedReadCount` (boolean, default: `true`), will enable this strategy. If set to `false`, the cache will fall back to simpler eviction based on timestamp and potentially `cacheEvictionByMarkDeletedPosition` (if enabled, though the latter's effectiveness is reduced without the new read count logic). + +This strategy allows the cache to be more intelligent about retaining entries that have higher utility, especially in fan-out scenarios or with slightly lagging consumers in Key_Shared subscriptions. The replay queue is also used in Shared subscriptions when more entries are read than can be consumed by the available consumers. This strategy will also avoid cache misses in those scenarios. + +When using the expected read count caching strategy, there would be separate configuration values for specifying the time-to-live values for entries that have already reached their "expected read count" and for entries that still have a positive "expected read count". The current `managedLedgerCacheEvictionTimeThresholdMillis` setting would be used for entries that have reached their "expected read count". For entries that still have a positive "expected read count", a separate setting `managedLedgerCacheEvictionTimeThresholdMillisMax` would be used. Having separate settings would avoid keeping entries in the cache for a longer period of time when it is not expected that they would be read again. When an entry is in the cache expecting more reads, it would be useful to keep it in the cache for a longer period of time. This can be controlled with the separate setting. Besides Key_Shared subscription replay queue reads, this would also avoid reading from BookKeeper in cases where slightly slower consumers "drop off the tail" and get backlogged more than `managedLedgerCacheEvictionTimeThresholdMillis`. + +# Detailed Design + +## Design & Implementation Details + +### 1. `RangeCacheRemovalQueue` +- **Structure**: + - `removalQueue`: `org.jctools.queues.MpscUnboundedArrayQueue` to hold entries in insertion order. + - `stash`: `RangeCacheRemovalQueueStash` (internal class), a list-based mechanism to temporarily hold entries that are not immediately evictable during size-based eviction (e.g., due to positive expected read count). + - `RangeCacheRemovalQueueStash.add(RangeCacheEntryWrapper)`: Adds an entry to the stash. + - `RangeCacheRemovalQueueStash.evictEntries(...)`: Iterates stashed entries, applying eviction predicate. + - `RangeCacheRemovalQueueStash.maybeTrim()`: Compacts the stash list if many entries are removed to reclaim space. +- **Eviction Logic (`evictEntries` method with `EvictionPredicate`)**: + - Synchronized method to ensure single-threaded eviction. + - First processes the `stash`, then the `removalQueue`. + - `EvictionPredicate` determines the `EvictionResult` for each entry: + - `REMOVE`: Entry is evicted. + - `STASH`: Entry is moved from `removalQueue` to `stash` (only during size-based eviction if entry is not evictable by read count but not yet timestamp-expired). + - `STOP`: Stops processing further entries in the current pass (e.g., for timestamp eviction when a non-expired entry is found, or for size eviction when enough space is freed). + - `MISSING`: Entry was already removed from cache by other means. + - `evictLEntriesBeforeTimestamp(long timestampNanos)`: Uses a predicate: `e.timestampNanos < timestampNanos ? REMOVE : STOP`. (Note: The `STASH_AND_STOP` mentioned in a discussion snippet was refined; for timestamp eviction, it's just `STOP` if not expired, as stashing by itself is mainly for size-based eviction policy related to `expectedReadCount`). + - `evictLeastAccessedEntries(long sizeToFree, long expirationTimestampNanos)`: Uses a predicate: + ```java + (e, c) -> { + if (c.removedSize >= sizeToFree) return EvictionResult.STOP; + boolean expired = e.timestampNanos < expirationTimestampNanos; + if (!(e.value.canEvict() || expired)) return EvictionResult.STASH; // Not evictable by read count and not expired + return EvictionResult.REMOVE; + } + ``` + +### 2. `CachedEntry` + +Then value entries in the range cache are `CachedEntry` instances. The key is a `Position`. + +```java +/** + * Interface for cached entries in the {@link RangeCache}. + */ +public interface CachedEntry extends Entry, ReferenceCounted { + boolean matchesKey(Position key); + boolean canEvict(); + boolean increaseReadCount(int expectedReadCount); +} +``` + +These are the relevant details related to the "expected read count" implementation: +- **`CachedEntry` interface**: + - `boolean canEvict()`: Returns `true` if `expectedReadCount <= 0` (or if no read count handler is present). + - `boolean increaseReadCount(int expectedReadCount)`: Increments the internal expected read count. +- **`CachedEntryImpl` class**: Implements `CachedEntry`. Holds a reference to `EntryReadCountHandlerImpl`. + +### 3. `EntryReadCountHandler` + +A `CachedEntry` implementatation holds a reference to a `EntryReadCountHandler` which handles the state related to read count. +`EntryReadCountHandler` is used as an abstraction and integration between `RangeCacheRemovalQueue`, `CachedEntry` and the Managed Ledger layer. The `EntryImpl` in Managed Ledger layer has also been modified to hold an `EntryReadCountHandler` instance so that the information can be passed on. The implementation of `EntryReadCountHandler` holds the state in a single `volatile int` field. + +```java +public interface EntryReadCountHandler { + int getExpectedReadCount(); + boolean incrementExpectedReadCount(); + boolean incrementExpectedReadCount(int increment); + void markRead(); + void markEvicted(); +} +``` + +- **`EntryReadCountHandler` interface**: + - `int getExpectedReadCount()` + - `boolean incrementExpectedReadCount(int increment)` + - `boolean incrementExpectedReadCount()` + - `void markRead()`: Decrements the count. + - `void markEvicted()`: Sets count to a special "evicted" value (e.g., `Integer.MIN_VALUE / 2`) to stop further modifications and identify it as fully processed. +- **`EntryReadCountHandlerImpl` class**: + - Uses `AtomicIntegerFieldUpdater` for `expectedReadCount`. + - Handles the special `EVICTED_VALUE`. + +### 4. Integration with `ManagedLedgerImpl` and `ManagedCursorContainer` + +- **`OpAddEntry`**: + - When an entry is successfully added to a ledger: + ```java + if (!(ml instanceof ShadowManagedLedgerImpl)) { + int activeCursorCount = ml.getActiveCursors().size(); + if (activeCursorCount > 0) { + int expectedReadCount = 0; + if (ml.getConfig().isCacheEvictionByExpectedReadCount()) { + // For newly added entries, all active cursors are effectively "before" the added entry for future reads. + expectedReadCount = activeCursorCount; + } + EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount); + entry.setDecreaseReadCountOnRelease(false); // Cache owns the primary read count handling now + ml.entryCache.insert(entry); + entry.release(); + } + } + ``` +- **`ManagedLedgerImpl.asyncReadEntry`**: + - The `expectedReadCount` parameter (an `IntSupplier`) is passed down to `RangeEntryCacheImpl.asyncReadEntry0`. + - The `IntSupplier` would resolve to `opReadEntry.cursor.getNumberOfCursorsAtSamePositionOrBefore()` when `cacheEvictionByExpectedReadCount` is true. +- **`RangeEntryCacheImpl.readFromStorage`**: + - Accepts an `IntSupplier expectedReadCount`. + - When creating `EntryImpl` from `LedgerEntry`, it passes `expectedReadCount.getAsInt()` to `EntryImpl.create`. +- **`RangeEntryCacheImpl.insert(Entry entry)`**: + - If an entry with the same position already exists in the cache: + ```java + CachedEntry previousEntry = entries.get(position); + if (previousEntry != null) { + try { + if (entry.getReadCountHandler() != null) { + // If the entry is already in the cache, increase the expected read count on the existing entry + // by the count from the new entry (which represents new potential readers for this insert op) + if (previousEntry.increaseReadCount(entry.getReadCountHandler().getExpectedReadCount())) { + return false; // Successfully updated existing entry's read count + } + } + } finally { + previousEntry.release(); + } + } + // ... proceed to create new CachedEntryImpl and put it ... + CachedEntryImpl cacheEntry = CachedEntryImpl.create(position, cachedData, (EntryReadCountHandlerImpl) entry.getReadCountHandler()); + ``` +- **`ManagedCursorContainer`**: + - Needs an efficient way to count active cursors at or before a given position so that "expected read count" can be calculated for backlog reads. `ManagedCursorContainer` has been modified to use `TreeMap` so that cursors can be sorted by position. + ```java + // ManagedCursorContainer + private final NavigableMap> sortedByPosition = new TreeMap<>(); + // ... methods to add/remove/update items in sortedByPosition ... + public int getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor cursor) { + long stamp = rwLock.readLock(); + try { + Item item = cursors.get(cursor.getName()); // cursors is the existing ConcurrentMap + if (item == null || item.position == null) { // item.position can be null if cursor is not reading + return 0; + } else { + int sum = 0; + // Iterate through positions up to and including the cursor's current read position + for (Map.Entry> entry : sortedByPosition.headMap(item.position, true).entrySet()) { + sum += entry.getValue().size(); + } + return sum; + } + } finally { + rwLock.unlockRead(stamp); + } + } + ``` +- **`EntryImpl.release()`**: + - A new flag `decreaseReadCountOnRelease` (default `false`) is added to `EntryImpl`. + - When `EntryImpl.create(Entry other)` (copy constructor for dispatch) is called, this flag is set to `true` on the copy. + - `EntryImpl.beforeDeallocate()`: if `decreaseReadCountOnRelease` is true and `readCountHandler` is not null, call `readCountHandler.markRead()`. + This ensures that only when an entry copy created for dispatch is released, the read count on the original cached entry's handler is decremented. + +### 4. `RangeEntryCacheManagerImpl` +- `doCacheEviction`: Called by the global eviction scheduler. + - Invokes `evictionHandler.invalidateEntriesBeforeTimestampNanos`. Would also have to take separate `managedLedgerCacheEvictionTimeThresholdMillis` and `managedLedgerCacheEvictionTimeThresholdMillisMax` settings into account. + - Calls `doEvictToWatermarkWhenOverThreshold()` to ensure cache size is within limits. +- `triggerEvictionWhenNeeded()`: Called after an entry is added. + - If size > threshold, schedules `doEvictToWatermarkWhenOverThreshold()` on `cacheEvictionExecutor`. +- `doEvictToWatermarkWhenOverThreshold()`: + - Calculates `sizeToEvict`. + - Calls `evictionHandler.evictEntries(sizeToEvict, expirationTimestampNanosForNonEvictableEntries)`. + +## Note about entry cache size and how it's related to broker cache total size + +The calculation of the entry cache size is not accurate unless the `managedLedgerCacheCopyEntries` setting is set to `true`. This occurs because when the entry payload is read from BookKeeper or received from a Pulsar client publisher, it is "sliced" from an underlying buffer. The actual memory consumption can be higher when the other entries of the shared underlying buffer have been released and there is a single entry that retains the complete underlying buffer. + +For entries received from a Pulsar client publisher, the buffer size is between 16kB to 1MB. This is currently not configurable and it is set in code in [BrokerService.defaultServerBootstrap method](https://github.com/apache/pulsar/blob/19b4a05f888da35a584cf7ff6d594dd9df5e03d1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L564-L565). + +```java + bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, + new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); +``` + +The actual underlying buffer size will also depend on Netty's LengthFieldBasedFrameDecoder (extends ByteToMessageDecoder) logic which will cumulate buffers by copying until the complete entry has been read. In that case, the underlying buffer size can be larger than 1MB. The cumulated buffer will shared be shared across multiple entries since entry buffers are sliced from the cumulated buffer. + +For entries received from BookKeeper, it's similar. However, BookKeeper client defaults to Netty's [default channel config settings](https://github.com/netty/netty/blob/c3416d8ad2260f3c9bc98a9061d25bba92020f93/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java#L74-L76) for RCVBUF_ALLOCATOR, which has a maximum buffer size of 64kB. + +It would be useful to make the Pulsar broker side `RCVBUF_ALLOCATOR` parameters configurable so that the parameters could be tuned to a smaller size. It is very unlikely that the 1MB maximum size improves performance significantly. There is a performance tradeoff in the case where messages are large and buffers would have to be merged by copying in the LengthFieldBasedFrameDecoder. To avoid this overhead, it would be possible to make LengthFieldBasedFrameDecoder to use composite buffers for merging by setting the `cumulator` property to `ByteToMessageDecoder.COMPOSITE_CUMULATOR`, however that could have a negative performance impact on code that expects that the complete payload is a single non-continous buffer. + +Since in many cases, the underlying buffer where the slice is taken for a single entry is shared with entries that have been published or retrieved together, it is also common that these entries will get evicted together. Making the RCVBUF_ALLOCATOR settings configurable in the Pulsar broker is a sufficient mitigation for the problem. +In cases where shared buffers add a lot of overhead of consumed memory, it will be possible to reduce it by setting the maximum size for `AdaptiveRecvByteBufAllocator` to 64kB, with the tradeoff of possible unnecessary buffer copies for messages exceeding 64kB. In BookKeeper server, there are settings `byteBufAllocatorSizeInitial`, `byteBufAllocatorSizeMin` and `byteBufAllocatorSizeMax` to configure the `AdaptiveRecvByteBufAllocator` parameters. The naming of these parameters in BookKeeper isn't optimal since the settings are specifically about `AdaptiveRecvByteBufAllocator` parameters and not "byteBufAllocator" parameters. + +The proposal would be to add these configuration parameters to `broker.conf` for controlling Broker's `AdaptiveRecvByteBufAllocator` parameters: +```properties +# Netty adaptive receive buffer allocator's minimum size +brokerAdaptiveRecvByteBufAllocatorMinimumSize=1024 +# Netty adaptive receive buffer allocator's initial size +brokerAdaptiveRecvByteBufAllocatorInitialSize=16384 +# Netty receive adaptive buffer allocator's maximum size +# Tune this value to a lower value to reduce overhead of the entries cached in the Broker cache due to shared underlying buffers +brokerAdaptiveRecvByteBufAllocatorMaximumSize=1048576 +``` + +## Public-facing Changes + +### Configuration +- **New Configuration**: `broker.conf` + - `cacheEvictionByExpectedReadCount` (boolean): Enables the new eviction strategy based on expected read count. When true, entries with `expectedReadCount > 0` are less likely to be evicted by size-based eviction unless they also meet timestamp expiration. + Default: `true`. + - `managedLedgerCacheEvictionTimeThresholdMillisMax` (Integer): cache entry time-to-live for entries that have pending expected reads (applies only when `cacheEvictionByExpectedReadCount=true`) + Default: `5000` + - `brokerAdaptiveRecvByteBufAllocatorMinimumSize` (int): Netty adaptive receive buffer allocator's minimum size in bytes. Default: `1024`. + - `brokerAdaptiveRecvByteBufAllocatorInitialSize` (int): Netty adaptive receive buffer allocator's initial size in bytes. Default: `16384`. + - `brokerAdaptiveRecvByteBufAllocatorMaximumSize` (int): Netty adaptive receive buffer allocator's maximum size in bytes. Tune this value to a lower value to reduce overhead of the entries cached in the Broker cache due to shared underlying buffers. Default: `1048576`. +- **Modified Behavior of Existing Configurations**: + - `managedLedgerCacheEvictionTimeThresholdMillis`: Applies only to entries which have reached the expected read count when `cacheEvictionByExpectedReadCount=true`. + - `managedLedgerCacheSizeMB`: Still applies. The new size-based eviction will use the `RangeCacheRemovalQueue`. + - `cacheEvictionByMarkDeletedPosition`: If `cacheEvictionByExpectedReadCount` is `true`, this setting's direct influence on preserving entries is diminished, as `expectedReadCount` provides a more granular control. However, mark-delete position updates still occur and will lead to `expectedReadCount` decrementing as cursors move past entries. If `cacheEvictionByExpectedReadCount` is `false`, this setting functions as before (though the underlying eviction for "up to slowest reader" now also uses the central queue). + - `managedLedgerMinimumBacklogCursorsForCaching`, `managedLedgerMinimumBacklogEntriesForCaching`, `managedLedgerMaxBacklogBetweenCursorsForCaching`: These primarily affect whether a cursor is considered "active" for caching purposes (i.e., `ManagedCursorImpl.isCacheReadEntry()`). The `expectedReadCount` logic naturally incorporates this by only counting active cursors. If `cacheEvictionByExpectedReadCount` is true, these settings become less critical for direct cache retention decisions, as `expectedReadCount` is the primary driver. + +### Metrics & Monitoring + +Existing broker cache metrics will continue to function, reflecting the behavior of the new eviction system and broker cache strategy. + +### Backward & Forward Compatibility + +There are no compatibility concerns since the broker cache is handled at runtime, in a single broker. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations +- This PIP does not directly interact with or change geo-replication mechanisms. Cache behavior is local to each broker in each cluster. Compatibility considerations are the same as for a standalone broker. + +# Alternatives + +1. **Simple LRU (Least Recently Used) for Global Cache**: + - A global LRU cache could be considered. + - **Reason for rejection**: Standard LRU doesn't naturally account for Pulsar's specific read patterns where an entry might be predictably read multiple times (e.g., by different consumers in a shared subscription after a backlog is cleared, or by Key_Shared consumers). The "expected read count" provides a more domain-specific heuristic. Moreover, managing a global LRU across all topics with high concurrency and varying entry sizes can be complex and may require significant locking or sophisticated concurrent data structures, potentially negating performance gains. The per-ledger cache structure with a central eviction coordinator is a less disruptive change. + +2. **Priority Queue for `RangeCacheRemovalQueue`**: + - Instead of a simple FIFO queue, a priority queue ordered by `expectedReadCount` (ascending) and then timestamp (ascending) could be used. + - **Reason for rejection (for this PIP)**: While potentially more precise, managing priorities in a queue where `expectedReadCount` can change dynamically for entries already in the queue adds significant complexity. Updates would require O(log N) or O(N) operations depending on the queue implementation and how updates are handled. The current MPSC queue with a "stash" for non-evictable items offers a simpler, lower-overhead approach for the initial refactoring. Performance of MPSC queues is very high for enqueue/dequeue. The "stash" handles the "deprioritizing" of eviction for entries with positive read counts without complex queue reordering. This can be revisited if the stash mechanism proves insufficient. + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: From 16547d584236685679bc79bebafef2a3470871b1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 23 Jun 2025 21:46:02 +0300 Subject: [PATCH 2/3] Add mailing list discussion thread link --- pip/pip-430.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-430.md b/pip/pip-430.md index 0ef01451eeb0e..1004cfbc2ba36 100644 --- a/pip/pip-430.md +++ b/pip/pip-430.md @@ -331,5 +331,5 @@ There are no compatibility concerns since the broker cache is handled at runtime # Links -* Mailing List discussion thread: +* Mailing List discussion thread: https://lists.apache.org/thread/o1ozbg468kxfd38pxk2ppzsstdnxnok2 * Mailing List voting thread: From 901533010c07fadacb146e707879022a76dce284 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 24 Jul 2025 16:53:15 +0300 Subject: [PATCH 3/3] Add voting thread link --- pip/pip-430.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-430.md b/pip/pip-430.md index 1004cfbc2ba36..74d8065bfd802 100644 --- a/pip/pip-430.md +++ b/pip/pip-430.md @@ -332,4 +332,4 @@ There are no compatibility concerns since the broker cache is handled at runtime # Links * Mailing List discussion thread: https://lists.apache.org/thread/o1ozbg468kxfd38pxk2ppzsstdnxnok2 -* Mailing List voting thread: +* Mailing List voting thread: https://lists.apache.org/thread/p6jroxhtjpk7hbgqt3hzjtjd2233zmxy