Skip to content

Commit ae44425

Browse files
poorbarcodelhotari
authored andcommitted
[fix][broker]Wrong backlog: expected 0 but got 1 (#24938)
(cherry picked from commit ed31d82)
1 parent cd78e40 commit ae44425

File tree

5 files changed

+467
-23
lines changed

5 files changed

+467
-23
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,12 +1489,16 @@ public void operationComplete() {
14891489
// modify mark delete and read position since we are able to persist new position for cursor
14901490
lock.writeLock().lock();
14911491
try {
1492-
if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
1492+
// Correct the variable "messagesConsumedCounter".
1493+
// BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
1494+
// old one.
1495+
int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition);
1496+
if (compareRes > 0) {
14931497
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
1494-
Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
1495-
} else {
1498+
Range.openClosed(newMarkDeletePosition, markDeletePosition)));
1499+
} else if (compareRes < 0) {
14961500
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries(
1497-
Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
1501+
Range.openClosed(markDeletePosition, newMarkDeletePosition)));
14981502
}
14991503
markDeletePosition = newMarkDeletePosition;
15001504
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3831,6 +3831,32 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
38313831
});
38323832
}
38333833

3834+
/**
3835+
* Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid.
3836+
* For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position.
3837+
*/
3838+
@VisibleForTesting
3839+
int comparePositions(Position pos1, Position pos2) {
3840+
if (pos1 == null || pos2 == null) {
3841+
throw new IllegalArgumentException("Positions must not be null");
3842+
}
3843+
if (ledgers.isEmpty() || pos1.getLedgerId() < getFirstPosition().getLedgerId()
3844+
|| pos2.getLedgerId() < getFirstPosition().getLedgerId()
3845+
|| pos1.getLedgerId() > getLastPosition().getLedgerId()
3846+
|| pos2.getLedgerId() > getLastPosition().getLedgerId()) {
3847+
log.warn("[{}] Comparing un-exist position {} and {}", name, pos1, pos2,
3848+
new IllegalArgumentException("Comparing un-exist position"));
3849+
return pos1.compareTo(pos2);
3850+
}
3851+
if (pos1.getLedgerId() == pos2.getLedgerId()) {
3852+
return Long.compare(pos1.getEntryId(), pos2.getEntryId());
3853+
}
3854+
if (!isValidPosition(pos1) || !isValidPosition(pos2)) {
3855+
return getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2));
3856+
}
3857+
return pos1.compareTo(pos2);
3858+
}
3859+
38343860
/**
38353861
* Get the number of entries between a contiguous range of two positions.
38363862
*
@@ -3843,41 +3869,77 @@ public long getNumberOfEntries(Range<Position> range) {
38433869
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
38443870
Position toPosition = range.upperEndpoint();
38453871
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
3872+
if (comparePositions(fromPosition, toPosition) > 0) {
3873+
log.warn("[{}] Getting number of entries with an invalid range {} and {}", name, fromPosition, toPosition);
3874+
throw new IllegalArgumentException("Invalid range " + range);
3875+
}
3876+
3877+
// 1. If the "fromPosition" is after "toPosition", then there is no entry in the range.
3878+
// 2. If both "formPosition" and "toPosition" have negative entry id amd in the same ledger, then there is no
3879+
// entry in the range.
3880+
if (fromPosition.getLedgerId() > toPosition.getLedgerId()
3881+
|| (fromPosition.getLedgerId() == toPosition.getLedgerId()
3882+
&& fromPosition.getEntryId() > toPosition.getEntryId())
3883+
|| (fromPosition.getLedgerId() == toPosition.getLedgerId()
3884+
&& fromPosition.getEntryId() < 0 && toPosition.getEntryId() < 0)) {
3885+
return 0;
3886+
}
38463887

3888+
// If the 2 positions are in the same ledger.
38473889
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
38483890
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
38493891
if (li != null) {
38503892
// If the 2 positions are in the same ledger
38513893
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3852-
count += fromIncluded ? 1 : 0;
3853-
count += toIncluded ? 1 : 0;
3894+
count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 : 0;
3895+
count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0;
38543896
return count;
38553897
} else {
38563898
// if the ledgerId is not in the ledgers, it means it has been deleted
38573899
return 0;
38583900
}
3859-
} else {
3860-
long count = 0;
3861-
// If the from & to are pointing to different ledgers, then we need to :
3862-
// 1. Add the entries in the ledger pointed by toPosition
3863-
count += toPosition.getEntryId();
3901+
}
3902+
3903+
// If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId".
3904+
// 1. Add the entries in the ledger pointed by toPosition.
3905+
// 2. Add the entries in the ledger pointed by fromPosition.
3906+
// 3. Add the whole ledgers entries in between.
3907+
long count = 0;
3908+
3909+
// 1. Add the entries in the ledger pointed by toPosition.
3910+
// Add nothing if "toPosition" does not exit in "ledgers".
3911+
// Add nothing if "toPosition.entryId < 0".
3912+
LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId());
3913+
if (toPosition.getEntryId() >= 0 && toLedger != null) {
3914+
// To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that is larger
3915+
// than the LAC.
3916+
// To support this case, use "Long.MAX_VALUE" if the ledger is the last one.
3917+
long entriesInLedger = comparePositions(toPosition, lastConfirmedEntry) >= 0
3918+
? Long.MAX_VALUE : toLedger.getEntries();
3919+
count += Math.min(toPosition.getEntryId(), entriesInLedger - 1);
38643920
count += toIncluded ? 1 : 0;
3921+
}
38653922

3866-
// 2. Add the entries in the ledger pointed by fromPosition
3867-
LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
3868-
if (li != null) {
3869-
count += li.getEntries() - (fromPosition.getEntryId() + 1);
3923+
// 2. Add the entries in the ledger pointed by fromPosition.
3924+
// Add nothing if "toPosition.entryId < 0".
3925+
// Add nothing if "toPosition" does not exit in "ledgers".
3926+
LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId());
3927+
if (formLedger != null) {
3928+
if (fromPosition.getEntryId() < 0) {
3929+
count += formLedger.getEntries();
3930+
} else {
3931+
count += formLedger.getEntries() - (fromPosition.getEntryId() + 1);
38703932
count += fromIncluded ? 1 : 0;
38713933
}
3934+
}
38723935

3873-
// 3. Add the whole ledgers entries in between
3874-
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
3875-
.values()) {
3876-
count += ls.getEntries();
3877-
}
3878-
3879-
return count;
3936+
// 3. Add the whole ledgers entries in between
3937+
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
3938+
.values()) {
3939+
count += ls.getEntries();
38803940
}
3941+
3942+
return count;
38813943
}
38823944

38833945
/**

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
3333

3434
private static final Charset Encoding = StandardCharsets.UTF_8;
3535

36-
@Test(timeOut = 20000)
36+
@Test(timeOut = 20000 * 1000)
3737
void testMultiPositionDelete() throws Exception {
3838
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
3939

0 commit comments

Comments
 (0)