Skip to content

Commit a091ea7

Browse files
[fix][ml] Fix ledger trimming race causing cursor to point to deleted ledgers (#24855)
Co-authored-by: Claude <[email protected]>
1 parent d743278 commit a091ea7

File tree

2 files changed

+151
-1
lines changed

2 files changed

+151
-1
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2711,7 +2711,8 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
27112711

27122712
public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
27132713
for (ManagedCursor cursor : cursors) {
2714-
Position lastAckedPosition = cursor.getMarkDeletedPosition();
2714+
Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null
2715+
? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition();
27152716
LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
27162717
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
27172718
.map(Map.Entry::getValue).orElse(null);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4593,4 +4593,153 @@ public void testRemoveLedgerProperty() throws Exception {
45934593
Assert.assertEquals(ml.getLedgersInfo().get(firstLedger).getPropertiesCount(), 0);
45944594
Assert.assertEquals(ml.getLedgersInfo().get(lastLedger).getPropertiesCount(), 0);
45954595
}
4596+
4597+
/**
4598+
* Verifies that ledger trimming respects the persistent cursor position, not just the in-memory position.
4599+
*
4600+
* <p><b>Test Flow:</b>
4601+
* <ol>
4602+
* <li><b>Setup:</b> Create 60 entries across multiple ledgers (10 entries per ledger)
4603+
* <li><b>Initial Acks:</b> Delete entries 0, 5-9 and wait for persistence
4604+
* <ul><li>Persistent position: entry 0</li><li>In-memory position: entry 0</li></ul>
4605+
* <li><b>Inject Delay:</b> Add 30-second delay to BookKeeper writes (simulates slow ZK/BK)
4606+
* <li><b>Delayed Acks:</b> Asynchronously delete entries 1-4
4607+
* <ul><li>Persistent position: entry 0 (delayed)</li><li>In-memory position: entry 9</li></ul>
4608+
* <li><b>Pre-Trim Sync:</b> Call {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()}
4609+
* <li><b>Trigger Trim:</b> Start ledger trimming process
4610+
* <li><b>Verify:</b> First ledger is preserved because persistent position (entry 0) still points to it
4611+
* </ol>
4612+
*
4613+
* <p><b>Success Criteria:</b>
4614+
* The first ledger must NOT be deleted, preventing the cursor from pointing to a non-existent
4615+
* ledger after topic reload. This avoids negative backlog calculations.
4616+
*
4617+
* <p><b>What This Tests:</b>
4618+
* Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} correctly uses the
4619+
* persistent cursor position (not in-memory) when determining which ledgers are safe to trim.
4620+
*/
4621+
@Test
4622+
public void testCursorPointsToDeletedLedgerAfterTrim() throws Exception {
4623+
final String ledgerName = "testCursorPointsToDeletedLedgerAfterTrimAndReload";
4624+
final String cursorName = "test-cursor";
4625+
4626+
// ===== SETUP: Create managed ledger with small ledgers =====
4627+
ManagedLedgerConfig config = new ManagedLedgerConfig();
4628+
config.setMaxEntriesPerLedger(10);
4629+
4630+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, config);
4631+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName);
4632+
4633+
// ===== PHASE 1: Write entries to create multiple ledgers =====
4634+
int totalEntries = 60;
4635+
log.info("=== PHASE 1: Writing {} entries to create multiple ledgers ===", totalEntries);
4636+
for (int i = 0; i < totalEntries; i++) {
4637+
Position pos = ledger.addEntry(("message-" + i).getBytes());
4638+
log.info("Added entry: {}", pos);
4639+
}
4640+
4641+
List<LedgerInfo> ledgersAfterWrite = ledger.getLedgersInfoAsList();
4642+
log.info("Created {} ledgers: {}", ledgersAfterWrite.size(),
4643+
ledgersAfterWrite.stream()
4644+
.map(l -> String.format("L%d(%d entries)", l.getLedgerId(), l.getEntries()))
4645+
.toArray());
4646+
4647+
assertTrue(ledgersAfterWrite.size() >= 5, "Should have at least 5 ledgers");
4648+
long firstLedgerId = ledgersAfterWrite.get(0).getLedgerId();
4649+
4650+
// ===== PHASE 2: Initial acknowledgments (entries 0, 5-9) and wait for persistence =====
4651+
log.info("=== PHASE 2: Acknowledging initial entries in first ledger {} ===", firstLedgerId);
4652+
List<Entry> entries = cursor.readEntries(10);
4653+
4654+
// Delete entries 5-9 first (out of order)
4655+
log.info("Deleting entries 5-9");
4656+
for (int i = 5; i < 10; i++) {
4657+
cursor.delete(entries.get(i).getPosition());
4658+
}
4659+
4660+
// Delete entry 0, which advances mark-delete position
4661+
log.info("Deleting entry 0 - this advances mark-delete position");
4662+
cursor.delete(entries.get(0).getPosition());
4663+
4664+
// Verify in-memory cursor position
4665+
Position initialMarkDelete = cursor.getMarkDeletedPosition();
4666+
assertEquals(initialMarkDelete.getLedgerId(), firstLedgerId,
4667+
"Mark-delete should be in first ledger");
4668+
assertEquals(initialMarkDelete.getEntryId(), entries.get(0).getEntryId(),
4669+
"Mark-delete should be at entry 0");
4670+
4671+
// Wait for this position to be persisted
4672+
log.info("Waiting for initial mark-delete position to persist: {}", initialMarkDelete);
4673+
Awaitility.await().untilAsserted(() -> {
4674+
assertEquals(cursor.getPersistentMarkDeletedPosition(), initialMarkDelete,
4675+
"Persistent position should catch up to in-memory position");
4676+
});
4677+
log.info("Initial position persisted successfully");
4678+
4679+
// ===== PHASE 3: Inject delay to simulate slow persistence =====
4680+
long delay = 30;
4681+
log.info("=== PHASE 3: Injecting {}s delay for cursor persistence ===",
4682+
delay);
4683+
bkc.addEntryResponseDelay(delay, TimeUnit.SECONDS);
4684+
4685+
// ===== PHASE 4: Asynchronously acknowledge entries 1-4 (persistence will be delayed) =====
4686+
log.info("=== PHASE 4: Asynchronously acknowledging entries 1-4 (will be delayed) ===");
4687+
for (int i = 1; i < 5; i++) {
4688+
final int index = i;
4689+
cursor.asyncDelete(entries.get(i).getPosition(), new AsyncCallbacks.DeleteCallback() {
4690+
@Override
4691+
public void deleteComplete(Object ctx) {
4692+
log.info("Entry {} deletion completed", index);
4693+
}
4694+
4695+
@Override
4696+
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
4697+
log.error("Entry {} deletion failed", index, exception);
4698+
}
4699+
}, null);
4700+
}
4701+
4702+
// Verify in-memory position has advanced to entry 9
4703+
Position newMarkDelete = cursor.getMarkDeletedPosition();
4704+
assertEquals(newMarkDelete.getLedgerId(), firstLedgerId,
4705+
"Mark-delete should still be in first ledger");
4706+
assertEquals(newMarkDelete.getEntryId(), entries.get(9).getEntryId(),
4707+
"Mark-delete should have advanced to entry 9 (in-memory)");
4708+
log.info("In-memory mark-delete position: {}", newMarkDelete);
4709+
4710+
// ===== PHASE 5: Update cursor before trimming (important synchronization point) =====
4711+
log.info("=== PHASE 5: Calling maybeUpdateCursorBeforeTrimmingConsumedLedger ===");
4712+
ledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
4713+
4714+
// ===== PHASE 6: Trigger ledger trimming =====
4715+
log.info("=== PHASE 6: Triggering ledger trimming ===");
4716+
CompletableFuture<Void> trimFuture = new CompletableFuture<>();
4717+
ledger.trimConsumedLedgersInBackground(trimFuture);
4718+
trimFuture.get();
4719+
log.info("Trimming completed");
4720+
4721+
// ===== VERIFICATION: Ledgers should NOT be trimmed =====
4722+
log.info("=== VERIFICATION ===");
4723+
4724+
// Persistent position should still be at old position (entry 0)
4725+
Position persistentPosition = cursor.getPersistentMarkDeletedPosition();
4726+
assertEquals(persistentPosition, initialMarkDelete,
4727+
"Persistent position should not have advanced (delayed)");
4728+
log.info("Persistent mark-delete position (as expected): {}", persistentPosition);
4729+
log.info("In-memory mark-delete position: {}", newMarkDelete);
4730+
4731+
// First ledger should still exist (not trimmed)
4732+
Awaitility.await().untilAsserted(() -> {
4733+
long firstRemainingLedger = ledger.getFirstPosition().getLedgerId();
4734+
assertEquals(firstRemainingLedger, ledgersAfterWrite.get(0).getLedgerId(),
4735+
"First ledger should NOT be trimmed because persistent cursor position "
4736+
+ "is still pointing to it (entry 0)");
4737+
});
4738+
log.info("SUCCESS: First ledger {} was correctly preserved", firstLedgerId);
4739+
4740+
// ===== CLEANUP =====
4741+
entries.forEach(Entry::release);
4742+
cursor.close();
4743+
ledger.close();
4744+
}
45964745
}

0 commit comments

Comments
 (0)