Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9dd5726
Replace broker cache eviction algorithm with centralized removal queu…
lhotari May 29, 2025
676925b
Refactor: Remove the timestamp from EntryImpl, remove CachedEntryImpl…
lhotari Jul 23, 2025
ca9ceb4
Rename matchesKey to matchesPosition
lhotari Jul 23, 2025
d52dfa8
[improve][test] Remove EntryCacheCreator from ManagedLedgerFactoryImp…
lhotari Jul 23, 2025
3f2c30e
Merge branch 'lh-pre-cherry-pick-pr24552' into lh-broker-cache-evicti…
lhotari Jul 23, 2025
8c6c2ad
Fix checkstyle
lhotari Jul 23, 2025
0d760d4
Don't add to removal queue first
lhotari Jul 23, 2025
365fc07
Merge remote-tracking branch 'origin/master' into lh-broker-cache-evi…
lhotari Jul 24, 2025
f104935
Use ManagedLedgerImpl's thread to run ledger specific evictions
lhotari Jul 24, 2025
6d24e99
Reference original Position when creating with createWithRetainedDupl…
lhotari Jul 24, 2025
992dc5d
Add toString to EntryImpl
lhotari Jul 24, 2025
c9060a2
Copy position from source entry
lhotari Jul 24, 2025
f57b874
Add tests for EntryImpl
lhotari Jul 24, 2025
99773a8
Fix typo causing a bug with caching
lhotari Jul 24, 2025
bd91168
Fix test after sharing EntryImpl position instance
lhotari Jul 24, 2025
9563f40
Make PositionFactory.create(Position) return the same instance if it'…
lhotari Jul 24, 2025
0c5a719
Add blockPendingCacheEvictions test method, fix PrometheusMetricsTest
lhotari Jul 24, 2025
6fb9b8e
Fix NPE in test
lhotari Jul 24, 2025
b62ccdd
Add waitForPendingCacheEvictions to ManagedLedgerImpl and fix Managed…
lhotari Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,13 @@ public interface Entry {
* of data reached to 0).
*/
boolean release();

/**
* Check if this entry is for the given Position.
* @param position the position to check against
* @return true if the entry matches the position, false otherwise
*/
default boolean matchesPosition(Position position) {
return position != null && position.compareTo(getLedgerId(), getEntryId()) == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger;

import java.util.Objects;
import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;

/**
Expand Down Expand Up @@ -48,12 +49,17 @@ public static Position create(long ledgerId, long entryId) {
}

/**
* Create a new position.
* Create a new position or returns the other instance if it's immutable.
*
* @param other other position
* @return new position
*/
public static Position create(Position other) {
Objects.requireNonNull(other, "Position cannot be null");
if (other instanceof ImmutablePositionImpl) {
// Return the same instance if it's already an ImmutablePositionImpl
return other;
}
return new ImmutablePositionImpl(other);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;
package org.apache.bookkeeper.mledger;

import java.util.List;
import io.netty.util.ReferenceCounted;

/**
* Cache eviction policy abstraction interface.
*
* An Entry that is also reference counted.
*/
public interface EntryCacheEvictionPolicy {
/**
* Perform the cache eviction of at least sizeToFree bytes on the supplied list of caches.
*
* @param caches
* the list of caches to consider
* @param sizeToFree
* the minimum size in bytes to be freed
*/
void doEviction(List<EntryCache> caches, long sizeToFree);
public interface ReferenceCountedEntry extends Entry, ReferenceCounted {

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCounted;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;
import org.apache.bookkeeper.mledger.util.RangeCache;

public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>,
RangeCache.ValueWithKeyValidation<Position> {
public final class EntryImpl extends AbstractCASReferenceCounted
implements ReferenceCountedEntry, Comparable<EntryImpl> {

private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
@Override
Expand All @@ -42,7 +44,6 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
};

private final Handle<EntryImpl> recyclerHandle;
private long timestamp;
private long ledgerId;
private long entryId;
private Position position;
Expand All @@ -52,7 +53,6 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {

public static EntryImpl create(LedgerEntry ledgerEntry) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerEntry.getLedgerId();
entry.entryId = ledgerEntry.getEntryId();
entry.data = ledgerEntry.getEntryBuffer();
Expand All @@ -61,10 +61,30 @@ public static EntryImpl create(LedgerEntry ledgerEntry) {
return entry;
}

public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
if (interceptor != null) {
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
processorHandle = interceptor
.processPayloadBeforeEntryCache(duplicateBuffer);
if (processorHandle != null) {
ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
ledgerEntry.getLength(), processorHandle.getProcessedPayload());
} else {
duplicateBuffer.release();
}
}
EntryImpl returnEntry = create(ledgerEntry);
if (processorHandle != null) {
processorHandle.release();
ledgerEntry.close();
}
return returnEntry;
}

@VisibleForTesting
public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.data = Unpooled.wrappedBuffer(data);
Expand All @@ -74,7 +94,6 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.data = data;
Expand All @@ -85,7 +104,7 @@ public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {

public static EntryImpl create(Position position, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.position = PositionFactory.create(position);
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.data = data;
Expand All @@ -94,16 +113,37 @@ public static EntryImpl create(Position position, ByteBuf data) {
return entry;
}

public static EntryImpl createWithRetainedDuplicate(Position position, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
entry.position = PositionFactory.create(position);
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.data = data.retainedDuplicate();
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(EntryImpl other) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
// handle case where other.position is null due to lazy initialization
entry.position = other.position != null ? PositionFactory.create(other.position) : null;
entry.ledgerId = other.ledgerId;
entry.entryId = other.entryId;
entry.data = other.data.retainedDuplicate();
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(Entry other) {
EntryImpl entry = RECYCLER.get();
entry.position = PositionFactory.create(other.getPosition());
entry.ledgerId = other.getLedgerId();
entry.entryId = other.getEntryId();
entry.data = other.getDataBuffer().retainedDuplicate();
entry.setRefCnt(1);
return entry;
}

private EntryImpl(Recycler.Handle<EntryImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
Expand All @@ -124,10 +164,6 @@ public void onDeallocate(Runnable r) {
}
}

public long getTimestamp() {
return timestamp;
}

@Override
public ByteBuf getDataBuffer() {
return data;
Expand Down Expand Up @@ -201,15 +237,20 @@ protected void deallocate() {
}
data.release();
data = null;
timestamp = -1;
ledgerId = -1;
entryId = -1;
position = null;
recyclerHandle.recycle(this);
}

@Override
public boolean matchesKey(Position key) {
return key.compareTo(ledgerId, entryId) == 0;
public boolean matchesPosition(Position key) {
return key != null && key.compareTo(ledgerId, entryId) == 0;
}

@Override
public String toString() {
return getClass().getName() + "@" + System.identityHashCode(this)
+ "{ledgerId=" + ledgerId + ", entryId=" + entryId + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
import com.google.common.collect.BoundType;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -118,6 +119,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ManagedLedgerFactoryConfig config;
@Getter
protected final OrderedScheduler scheduledExecutor;
@Getter
private final ScheduledExecutorService cacheEvictionExecutor;

@Getter
Expand Down Expand Up @@ -147,6 +149,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
*/
@Getter
private boolean metadataServiceAvailable;
private final ManagedLedgerConfig defaultManagedLedgerConfig;

private static class PendingInitializeManagedLedger {

Expand All @@ -170,7 +173,8 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfi
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(),
new ManagedLedgerConfig());
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
Expand All @@ -181,15 +185,23 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper), config);
this(metadataStore, bookKeeper, config, new ManagedLedgerConfig());
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
ManagedLedgerFactoryConfig config, ManagedLedgerConfig defaultManagedLedgerConfig)
throws Exception {
this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper),
false /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(),
defaultManagedLedgerConfig);
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(), new ManagedLedgerConfig());
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
Expand All @@ -198,15 +210,17 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
OpenTelemetry openTelemetry)
throws Exception {
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
config, statsLogger, openTelemetry);
config, statsLogger, openTelemetry, new ManagedLedgerConfig());
}

private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
boolean isBookkeeperManaged,
ManagedLedgerFactoryConfig config,
StatsLogger statsLogger,
OpenTelemetry openTelemetry) throws Exception {
OpenTelemetry openTelemetry,
ManagedLedgerConfig defaultManagedLedgerConfig) throws Exception {
this.defaultManagedLedgerConfig = defaultManagedLedgerConfig;
MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
config.getCompressionConfigForManagedLedgerInfo();
MetadataCompressionConfig compressionConfigForManagedCursorInfo =
Expand Down Expand Up @@ -303,17 +317,60 @@ private synchronized void refreshStats() {
lastStatTimestamp = now;
}

private synchronized void doCacheEviction() {
@VisibleForTesting
public synchronized void doCacheEviction() {
long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos;
entryCacheManager.doCacheEviction(maxTimestamp);
}

ledgers.values().forEach(mlfuture -> {
if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
ManagedLedgerImpl ml = mlfuture.getNow(null);
if (ml != null) {
ml.doCacheEviction(maxTimestamp);
}
/**
* Waits for all pending cache evictions based on total cache size or entry TTL to complete.
* This is for testing purposes only, so that we can ensure all cache evictions are done before proceeding with
* further operations. Please notice that Managed Ledger level cache eviction is not handled here. There's
* a similar method {@link ManagedLedgerImpl#waitForPendingCacheEvictions()} that waits for pending cache evictions
* at the Managed Ledger level.
*/
@VisibleForTesting
public void waitForPendingCacheEvictions() {
try {
cacheEvictionExecutor.submit(() -> {
// no-op
}).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

/**
* Blocks the pending cache evictions until the returned runnable is called.
* This is for testing purposes only, so that asynchronous cache evictions can be blocked for consistent
* test results.
*
* @return
* @throws InterruptedException
*/
@VisibleForTesting
public Runnable blockPendingCacheEvictions() throws InterruptedException {
CountDownLatch blockedLatch = new CountDownLatch(1);
CountDownLatch releaseLatch = new CountDownLatch(1);
cacheEvictionExecutor.execute(() -> {
blockedLatch.countDown();
try {
releaseLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
blockedLatch.await();
return () -> {
if (releaseLatch.getCount() == 0) {
throw new IllegalStateException("Releasing should only be called once");
}
releaseLatch.countDown();
waitForPendingCacheEvictions();
};
}

/**
Expand All @@ -329,7 +386,7 @@ public Map<String, ManagedLedger> getManagedLedgers() {

@Override
public ManagedLedger open(String name) throws InterruptedException, ManagedLedgerException {
return open(name, new ManagedLedgerConfig());
return open(name, defaultManagedLedgerConfig);
}

@Override
Expand Down Expand Up @@ -365,7 +422,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncOpen(String name, OpenLedgerCallback callback, Object ctx) {
asyncOpen(name, new ManagedLedgerConfig(), callback, null, ctx);
asyncOpen(name, defaultManagedLedgerConfig, callback, null, ctx);
}

@Override
Expand Down
Loading
Loading