Skip to content

Commit f36d605

Browse files
poorbarcodeganesh-ctds
authored andcommitted
[fix][broker]Wrong backlog: expected 0 but got 1 (apache#24938)
(cherry picked from commit ed31d82) (cherry picked from commit a1f4f42)
1 parent 5f4a1df commit f36d605

File tree

5 files changed

+467
-23
lines changed

5 files changed

+467
-23
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1488,12 +1488,16 @@ public void operationComplete() {
14881488
// modify mark delete and read position since we are able to persist new position for cursor
14891489
lock.writeLock().lock();
14901490
try {
1491-
if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
1491+
// Correct the variable "messagesConsumedCounter".
1492+
// BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
1493+
// old one.
1494+
int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition);
1495+
if (compareRes > 0) {
14921496
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
1493-
Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
1494-
} else {
1497+
Range.openClosed(newMarkDeletePosition, markDeletePosition)));
1498+
} else if (compareRes < 0) {
14951499
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries(
1496-
Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
1500+
Range.openClosed(markDeletePosition, newMarkDeletePosition)));
14971501
}
14981502
markDeletePosition = newMarkDeletePosition;
14991503
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3703,6 +3703,32 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
37033703
});
37043704
}
37053705

3706+
/**
3707+
* Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid.
3708+
* For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position.
3709+
*/
3710+
@VisibleForTesting
3711+
int comparePositions(Position pos1, Position pos2) {
3712+
if (pos1 == null || pos2 == null) {
3713+
throw new IllegalArgumentException("Positions must not be null");
3714+
}
3715+
if (ledgers.isEmpty() || pos1.getLedgerId() < getFirstPosition().getLedgerId()
3716+
|| pos2.getLedgerId() < getFirstPosition().getLedgerId()
3717+
|| pos1.getLedgerId() > getLastPosition().getLedgerId()
3718+
|| pos2.getLedgerId() > getLastPosition().getLedgerId()) {
3719+
log.warn("[{}] Comparing un-exist position {} and {}", name, pos1, pos2,
3720+
new IllegalArgumentException("Comparing un-exist position"));
3721+
return pos1.compareTo(pos2);
3722+
}
3723+
if (pos1.getLedgerId() == pos2.getLedgerId()) {
3724+
return Long.compare(pos1.getEntryId(), pos2.getEntryId());
3725+
}
3726+
if (!isValidPosition(pos1) || !isValidPosition(pos2)) {
3727+
return getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2));
3728+
}
3729+
return pos1.compareTo(pos2);
3730+
}
3731+
37063732
/**
37073733
* Get the number of entries between a contiguous range of two positions.
37083734
*
@@ -3715,41 +3741,77 @@ public long getNumberOfEntries(Range<Position> range) {
37153741
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
37163742
Position toPosition = range.upperEndpoint();
37173743
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
3744+
if (comparePositions(fromPosition, toPosition) > 0) {
3745+
log.warn("[{}] Getting number of entries with an invalid range {} and {}", name, fromPosition, toPosition);
3746+
throw new IllegalArgumentException("Invalid range " + range);
3747+
}
3748+
3749+
// 1. If the "fromPosition" is after "toPosition", then there is no entry in the range.
3750+
// 2. If both "formPosition" and "toPosition" have negative entry id amd in the same ledger, then there is no
3751+
// entry in the range.
3752+
if (fromPosition.getLedgerId() > toPosition.getLedgerId()
3753+
|| (fromPosition.getLedgerId() == toPosition.getLedgerId()
3754+
&& fromPosition.getEntryId() > toPosition.getEntryId())
3755+
|| (fromPosition.getLedgerId() == toPosition.getLedgerId()
3756+
&& fromPosition.getEntryId() < 0 && toPosition.getEntryId() < 0)) {
3757+
return 0;
3758+
}
37183759

3760+
// If the 2 positions are in the same ledger.
37193761
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
37203762
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
37213763
if (li != null) {
37223764
// If the 2 positions are in the same ledger
37233765
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3724-
count += fromIncluded ? 1 : 0;
3725-
count += toIncluded ? 1 : 0;
3766+
count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 : 0;
3767+
count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0;
37263768
return count;
37273769
} else {
37283770
// if the ledgerId is not in the ledgers, it means it has been deleted
37293771
return 0;
37303772
}
3731-
} else {
3732-
long count = 0;
3733-
// If the from & to are pointing to different ledgers, then we need to :
3734-
// 1. Add the entries in the ledger pointed by toPosition
3735-
count += toPosition.getEntryId();
3773+
}
3774+
3775+
// If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId".
3776+
// 1. Add the entries in the ledger pointed by toPosition.
3777+
// 2. Add the entries in the ledger pointed by fromPosition.
3778+
// 3. Add the whole ledgers entries in between.
3779+
long count = 0;
3780+
3781+
// 1. Add the entries in the ledger pointed by toPosition.
3782+
// Add nothing if "toPosition" does not exit in "ledgers".
3783+
// Add nothing if "toPosition.entryId < 0".
3784+
LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId());
3785+
if (toPosition.getEntryId() >= 0 && toLedger != null) {
3786+
// To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that is larger
3787+
// than the LAC.
3788+
// To support this case, use "Long.MAX_VALUE" if the ledger is the last one.
3789+
long entriesInLedger = comparePositions(toPosition, lastConfirmedEntry) >= 0
3790+
? Long.MAX_VALUE : toLedger.getEntries();
3791+
count += Math.min(toPosition.getEntryId(), entriesInLedger - 1);
37363792
count += toIncluded ? 1 : 0;
3793+
}
37373794

3738-
// 2. Add the entries in the ledger pointed by fromPosition
3739-
LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
3740-
if (li != null) {
3741-
count += li.getEntries() - (fromPosition.getEntryId() + 1);
3795+
// 2. Add the entries in the ledger pointed by fromPosition.
3796+
// Add nothing if "toPosition.entryId < 0".
3797+
// Add nothing if "toPosition" does not exit in "ledgers".
3798+
LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId());
3799+
if (formLedger != null) {
3800+
if (fromPosition.getEntryId() < 0) {
3801+
count += formLedger.getEntries();
3802+
} else {
3803+
count += formLedger.getEntries() - (fromPosition.getEntryId() + 1);
37423804
count += fromIncluded ? 1 : 0;
37433805
}
3806+
}
37443807

3745-
// 3. Add the whole ledgers entries in between
3746-
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
3747-
.values()) {
3748-
count += ls.getEntries();
3749-
}
3750-
3751-
return count;
3808+
// 3. Add the whole ledgers entries in between
3809+
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
3810+
.values()) {
3811+
count += ls.getEntries();
37523812
}
3813+
3814+
return count;
37533815
}
37543816

37553817
/**

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
3333

3434
private static final Charset Encoding = StandardCharsets.UTF_8;
3535

36-
@Test(timeOut = 20000)
36+
@Test(timeOut = 20000 * 1000)
3737
void testMultiPositionDelete() throws Exception {
3838
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
3939

0 commit comments

Comments
 (0)