Skip to content

Commit ed31d82

Browse files
authored
[fix][broker]Wrong backlog: expected 0 but got 1 (#24938)
1 parent ee3e5ea commit ed31d82

File tree

5 files changed

+467
-23
lines changed

5 files changed

+467
-23
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,12 +1489,16 @@ public void operationComplete() {
14891489
// modify mark delete and read position since we are able to persist new position for cursor
14901490
lock.writeLock().lock();
14911491
try {
1492-
if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
1492+
// Correct the variable "messagesConsumedCounter".
1493+
// BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
1494+
// old one.
1495+
int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition);
1496+
if (compareRes > 0) {
14931497
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
1494-
Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
1495-
} else {
1498+
Range.openClosed(newMarkDeletePosition, markDeletePosition)));
1499+
} else if (compareRes < 0) {
14961500
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries(
1497-
Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
1501+
Range.openClosed(markDeletePosition, newMarkDeletePosition)));
14981502
}
14991503
markDeletePosition = newMarkDeletePosition;
15001504
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3868,6 +3868,32 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
38683868
});
38693869
}
38703870

3871+
/**
3872+
* Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid.
3873+
* For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position.
3874+
*/
3875+
@VisibleForTesting
3876+
int comparePositions(Position pos1, Position pos2) {
3877+
if (pos1 == null || pos2 == null) {
3878+
throw new IllegalArgumentException("Positions must not be null");
3879+
}
3880+
if (ledgers.isEmpty() || pos1.getLedgerId() < getFirstPosition().getLedgerId()
3881+
|| pos2.getLedgerId() < getFirstPosition().getLedgerId()
3882+
|| pos1.getLedgerId() > getLastPosition().getLedgerId()
3883+
|| pos2.getLedgerId() > getLastPosition().getLedgerId()) {
3884+
log.warn("[{}] Comparing un-exist position {} and {}", name, pos1, pos2,
3885+
new IllegalArgumentException("Comparing un-exist position"));
3886+
return pos1.compareTo(pos2);
3887+
}
3888+
if (pos1.getLedgerId() == pos2.getLedgerId()) {
3889+
return Long.compare(pos1.getEntryId(), pos2.getEntryId());
3890+
}
3891+
if (!isValidPosition(pos1) || !isValidPosition(pos2)) {
3892+
return getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2));
3893+
}
3894+
return pos1.compareTo(pos2);
3895+
}
3896+
38713897
/**
38723898
* Get the number of entries between a contiguous range of two positions.
38733899
*
@@ -3880,41 +3906,77 @@ public long getNumberOfEntries(Range<Position> range) {
38803906
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
38813907
Position toPosition = range.upperEndpoint();
38823908
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
3909+
if (comparePositions(fromPosition, toPosition) > 0) {
3910+
log.warn("[{}] Getting number of entries with an invalid range {} and {}", name, fromPosition, toPosition);
3911+
throw new IllegalArgumentException("Invalid range " + range);
3912+
}
3913+
3914+
// 1. If the "fromPosition" is after "toPosition", then there is no entry in the range.
3915+
// 2. If both "formPosition" and "toPosition" have negative entry id amd in the same ledger, then there is no
3916+
// entry in the range.
3917+
if (fromPosition.getLedgerId() > toPosition.getLedgerId()
3918+
|| (fromPosition.getLedgerId() == toPosition.getLedgerId()
3919+
&& fromPosition.getEntryId() > toPosition.getEntryId())
3920+
|| (fromPosition.getLedgerId() == toPosition.getLedgerId()
3921+
&& fromPosition.getEntryId() < 0 && toPosition.getEntryId() < 0)) {
3922+
return 0;
3923+
}
38833924

3925+
// If the 2 positions are in the same ledger.
38843926
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
38853927
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
38863928
if (li != null) {
38873929
// If the 2 positions are in the same ledger
38883930
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3889-
count += fromIncluded ? 1 : 0;
3890-
count += toIncluded ? 1 : 0;
3931+
count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 : 0;
3932+
count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0;
38913933
return count;
38923934
} else {
38933935
// if the ledgerId is not in the ledgers, it means it has been deleted
38943936
return 0;
38953937
}
3896-
} else {
3897-
long count = 0;
3898-
// If the from & to are pointing to different ledgers, then we need to :
3899-
// 1. Add the entries in the ledger pointed by toPosition
3900-
count += toPosition.getEntryId();
3938+
}
3939+
3940+
// If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId".
3941+
// 1. Add the entries in the ledger pointed by toPosition.
3942+
// 2. Add the entries in the ledger pointed by fromPosition.
3943+
// 3. Add the whole ledgers entries in between.
3944+
long count = 0;
3945+
3946+
// 1. Add the entries in the ledger pointed by toPosition.
3947+
// Add nothing if "toPosition" does not exit in "ledgers".
3948+
// Add nothing if "toPosition.entryId < 0".
3949+
LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId());
3950+
if (toPosition.getEntryId() >= 0 && toLedger != null) {
3951+
// To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that is larger
3952+
// than the LAC.
3953+
// To support this case, use "Long.MAX_VALUE" if the ledger is the last one.
3954+
long entriesInLedger = comparePositions(toPosition, lastConfirmedEntry) >= 0
3955+
? Long.MAX_VALUE : toLedger.getEntries();
3956+
count += Math.min(toPosition.getEntryId(), entriesInLedger - 1);
39013957
count += toIncluded ? 1 : 0;
3958+
}
39023959

3903-
// 2. Add the entries in the ledger pointed by fromPosition
3904-
LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
3905-
if (li != null) {
3906-
count += li.getEntries() - (fromPosition.getEntryId() + 1);
3960+
// 2. Add the entries in the ledger pointed by fromPosition.
3961+
// Add nothing if "toPosition.entryId < 0".
3962+
// Add nothing if "toPosition" does not exit in "ledgers".
3963+
LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId());
3964+
if (formLedger != null) {
3965+
if (fromPosition.getEntryId() < 0) {
3966+
count += formLedger.getEntries();
3967+
} else {
3968+
count += formLedger.getEntries() - (fromPosition.getEntryId() + 1);
39073969
count += fromIncluded ? 1 : 0;
39083970
}
3971+
}
39093972

3910-
// 3. Add the whole ledgers entries in between
3911-
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
3912-
.values()) {
3913-
count += ls.getEntries();
3914-
}
3915-
3916-
return count;
3973+
// 3. Add the whole ledgers entries in between
3974+
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
3975+
.values()) {
3976+
count += ls.getEntries();
39173977
}
3978+
3979+
return count;
39183980
}
39193981

39203982
/**

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
3333

3434
private static final Charset Encoding = StandardCharsets.UTF_8;
3535

36-
@Test(timeOut = 20000)
36+
@Test(timeOut = 20000 * 1000)
3737
void testMultiPositionDelete() throws Exception {
3838
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
3939

0 commit comments

Comments
 (0)