Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1489,12 +1489,16 @@ public void operationComplete() {
// modify mark delete and read position since we are able to persist new position for cursor
lock.writeLock().lock();
try {
if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
// Correct the variable "messagesConsumedCounter".
// BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
// old one.
int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition);
if (compareRes > 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
} else {
Range.openClosed(newMarkDeletePosition, markDeletePosition)));
} else if (compareRes < 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries(
Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
Range.openClosed(markDeletePosition, newMarkDeletePosition)));
}
markDeletePosition = newMarkDeletePosition;
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3868,6 +3868,32 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
});
}

/**
* Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid.
* For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position.
*/
@VisibleForTesting
int comparePositions(Position pos1, Position pos2) {
if (pos1 == null || pos2 == null) {
throw new IllegalArgumentException("Positions must not be null");
}
if (ledgers.isEmpty() || pos1.getLedgerId() < getFirstPosition().getLedgerId()
|| pos2.getLedgerId() < getFirstPosition().getLedgerId()
|| pos1.getLedgerId() > getLastPosition().getLedgerId()
|| pos2.getLedgerId() > getLastPosition().getLedgerId()) {
log.warn("[{}] Comparing un-exist position {} and {}", name, pos1, pos2,
new IllegalArgumentException("Comparing un-exist position"));
return pos1.compareTo(pos2);
}
if (pos1.getLedgerId() == pos2.getLedgerId()) {
return Long.compare(pos1.getEntryId(), pos2.getEntryId());
}
if (!isValidPosition(pos1) || !isValidPosition(pos2)) {
return getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2));
}
return pos1.compareTo(pos2);
}

/**
* Get the number of entries between a contiguous range of two positions.
*
Expand All @@ -3880,41 +3906,77 @@ public long getNumberOfEntries(Range<Position> range) {
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
Position toPosition = range.upperEndpoint();
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
if (comparePositions(fromPosition, toPosition) > 0) {
log.warn("[{}] Getting number of entries with an invalid range {} and {}", name, fromPosition, toPosition);
throw new IllegalArgumentException("Invalid range " + range);
}

// 1. If the "fromPosition" is after "toPosition", then there is no entry in the range.
// 2. If both "formPosition" and "toPosition" have negative entry id amd in the same ledger, then there is no
// entry in the range.
if (fromPosition.getLedgerId() > toPosition.getLedgerId()
|| (fromPosition.getLedgerId() == toPosition.getLedgerId()
&& fromPosition.getEntryId() > toPosition.getEntryId())
|| (fromPosition.getLedgerId() == toPosition.getLedgerId()
&& fromPosition.getEntryId() < 0 && toPosition.getEntryId() < 0)) {
return 0;
}

// If the 2 positions are in the same ledger.
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
if (li != null) {
// If the 2 positions are in the same ledger
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
count += fromIncluded ? 1 : 0;
count += toIncluded ? 1 : 0;
count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 : 0;
count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0;
return count;
} else {
// if the ledgerId is not in the ledgers, it means it has been deleted
return 0;
}
} else {
long count = 0;
// If the from & to are pointing to different ledgers, then we need to :
// 1. Add the entries in the ledger pointed by toPosition
count += toPosition.getEntryId();
}

// If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId".
// 1. Add the entries in the ledger pointed by toPosition.
// 2. Add the entries in the ledger pointed by fromPosition.
// 3. Add the whole ledgers entries in between.
long count = 0;

// 1. Add the entries in the ledger pointed by toPosition.
// Add nothing if "toPosition" does not exit in "ledgers".
// Add nothing if "toPosition.entryId < 0".
LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId());
if (toPosition.getEntryId() >= 0 && toLedger != null) {
// To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that is larger
// than the LAC.
// To support this case, use "Long.MAX_VALUE" if the ledger is the last one.
long entriesInLedger = comparePositions(toPosition, lastConfirmedEntry) >= 0
? Long.MAX_VALUE : toLedger.getEntries();
count += Math.min(toPosition.getEntryId(), entriesInLedger - 1);
count += toIncluded ? 1 : 0;
}

// 2. Add the entries in the ledger pointed by fromPosition
LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
if (li != null) {
count += li.getEntries() - (fromPosition.getEntryId() + 1);
// 2. Add the entries in the ledger pointed by fromPosition.
// Add nothing if "toPosition.entryId < 0".
// Add nothing if "toPosition" does not exit in "ledgers".
LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId());
if (formLedger != null) {
if (fromPosition.getEntryId() < 0) {
count += formLedger.getEntries();
} else {
count += formLedger.getEntries() - (fromPosition.getEntryId() + 1);
count += fromIncluded ? 1 : 0;
}
}

// 3. Add the whole ledgers entries in between
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
.values()) {
count += ls.getEntries();
}

return count;
// 3. Add the whole ledgers entries in between
for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
.values()) {
count += ls.getEntries();
}

return count;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {

private static final Charset Encoding = StandardCharsets.UTF_8;

@Test(timeOut = 20000)
@Test(timeOut = 20000 * 1000)
void testMultiPositionDelete() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));

Expand Down
Loading
Loading