diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index d32f0c8e998a0..e1ee50aad0f84 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1489,12 +1489,16 @@ public void operationComplete() { // modify mark delete and read position since we are able to persist new position for cursor lock.writeLock().lock(); try { - if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) { + // Correct the variable "messagesConsumedCounter". + // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the + // old one. + int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition); + if (compareRes > 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries( - Range.closedOpen(newMarkDeletePosition, markDeletePosition))); - } else { + Range.openClosed(newMarkDeletePosition, markDeletePosition))); + } else if (compareRes < 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries( - Range.closedOpen(markDeletePosition, newMarkDeletePosition))); + Range.openClosed(markDeletePosition, newMarkDeletePosition))); } markDeletePosition = newMarkDeletePosition; lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 17ffecb561785..a42974f9b2c60 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3868,6 +3868,32 @@ private CompletableFuture completeLedgerInfoForOffloaded(long ledgerId, UU }); } + /** + * Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid. + * For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position. + */ + @VisibleForTesting + int comparePositions(Position pos1, Position pos2) { + if (pos1 == null || pos2 == null) { + throw new IllegalArgumentException("Positions must not be null"); + } + if (ledgers.isEmpty() || pos1.getLedgerId() < getFirstPosition().getLedgerId() + || pos2.getLedgerId() < getFirstPosition().getLedgerId() + || pos1.getLedgerId() > getLastPosition().getLedgerId() + || pos2.getLedgerId() > getLastPosition().getLedgerId()) { + log.warn("[{}] Comparing un-exist position {} and {}", name, pos1, pos2, + new IllegalArgumentException("Comparing un-exist position")); + return pos1.compareTo(pos2); + } + if (pos1.getLedgerId() == pos2.getLedgerId()) { + return Long.compare(pos1.getEntryId(), pos2.getEntryId()); + } + if (!isValidPosition(pos1) || !isValidPosition(pos2)) { + return getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2)); + } + return pos1.compareTo(pos2); + } + /** * Get the number of entries between a contiguous range of two positions. * @@ -3880,41 +3906,77 @@ public long getNumberOfEntries(Range range) { boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED; Position toPosition = range.upperEndpoint(); boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; + if (comparePositions(fromPosition, toPosition) > 0) { + log.warn("[{}] Getting number of entries with an invalid range {} and {}", name, fromPosition, toPosition); + throw new IllegalArgumentException("Invalid range " + range); + } + + // 1. If the "fromPosition" is after "toPosition", then there is no entry in the range. + // 2. If both "formPosition" and "toPosition" have negative entry id amd in the same ledger, then there is no + // entry in the range. + if (fromPosition.getLedgerId() > toPosition.getLedgerId() + || (fromPosition.getLedgerId() == toPosition.getLedgerId() + && fromPosition.getEntryId() > toPosition.getEntryId()) + || (fromPosition.getLedgerId() == toPosition.getLedgerId() + && fromPosition.getEntryId() < 0 && toPosition.getEntryId() < 0)) { + return 0; + } + // If the 2 positions are in the same ledger. if (fromPosition.getLedgerId() == toPosition.getLedgerId()) { LedgerInfo li = ledgers.get(toPosition.getLedgerId()); if (li != null) { // If the 2 positions are in the same ledger long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1; - count += fromIncluded ? 1 : 0; - count += toIncluded ? 1 : 0; + count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 : 0; + count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0; return count; } else { // if the ledgerId is not in the ledgers, it means it has been deleted return 0; } - } else { - long count = 0; - // If the from & to are pointing to different ledgers, then we need to : - // 1. Add the entries in the ledger pointed by toPosition - count += toPosition.getEntryId(); + } + + // If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId". + // 1. Add the entries in the ledger pointed by toPosition. + // 2. Add the entries in the ledger pointed by fromPosition. + // 3. Add the whole ledgers entries in between. + long count = 0; + + // 1. Add the entries in the ledger pointed by toPosition. + // Add nothing if "toPosition" does not exit in "ledgers". + // Add nothing if "toPosition.entryId < 0". + LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId()); + if (toPosition.getEntryId() >= 0 && toLedger != null) { + // To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that is larger + // than the LAC. + // To support this case, use "Long.MAX_VALUE" if the ledger is the last one. + long entriesInLedger = comparePositions(toPosition, lastConfirmedEntry) >= 0 + ? Long.MAX_VALUE : toLedger.getEntries(); + count += Math.min(toPosition.getEntryId(), entriesInLedger - 1); count += toIncluded ? 1 : 0; + } - // 2. Add the entries in the ledger pointed by fromPosition - LedgerInfo li = ledgers.get(fromPosition.getLedgerId()); - if (li != null) { - count += li.getEntries() - (fromPosition.getEntryId() + 1); + // 2. Add the entries in the ledger pointed by fromPosition. + // Add nothing if "toPosition.entryId < 0". + // Add nothing if "toPosition" does not exit in "ledgers". + LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId()); + if (formLedger != null) { + if (fromPosition.getEntryId() < 0) { + count += formLedger.getEntries(); + } else { + count += formLedger.getEntries() - (fromPosition.getEntryId() + 1); count += fromIncluded ? 1 : 0; } + } - // 3. Add the whole ledgers entries in between - for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false) - .values()) { - count += ls.getEntries(); - } - - return count; + // 3. Add the whole ledgers entries in between + for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false) + .values()) { + count += ls.getEntries(); } + + return count; } /** diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java index c4d3b076ba3c5..a4895b2624bd0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java @@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends MockedBookKeeperTestCase { private static final Charset Encoding = StandardCharsets.UTF_8; - @Test(timeOut = 20000) + @Test(timeOut = 20000 * 1000) void testMultiPositionDelete() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index eb65cbd2c225f..9216bd60ed422 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4774,4 +4774,335 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { cursor.close(); ledger.close(); } + + @Test + public void testGetNumberOfEntriesWithRangeParam() throws Exception { + final String ledgerName = "ml_" + UUID.randomUUID().toString().replaceAll("-", ""); + final String cursorName = "test-cursor"; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, config); + // Create a cursor to avoid entries being trimmed. + ml.openCursor(cursorName); + int totalEntries = 35; + List positions = new ArrayList<>(totalEntries); + for (int i = 0; i < totalEntries; i++) { + Position pos = ml.addEntry(("entry-" + i).getBytes()); + positions.add(pos); + } + Iterator iterator = ml.getLedgersInfo().values().iterator(); + LedgerInfo ledger1 = iterator.next(); + LedgerInfo ledger2 = iterator.next(); + LedgerInfo ledger3 = iterator.next(); + LedgerInfo ledger4 = iterator.next(); + assertEquals(ledger1.getEntries(), 10); + assertEquals(ledger2.getEntries(), 10); + assertEquals(ledger3.getEntries(), 10); + assertEquals(ledger4.getLedgerId(), ml.getCurrentLedger().getId()); + + // Normal case: same ledger. + Range range11 = Range.closed(positions.get(0), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range11), 10); + Range range12 = Range.openClosed(positions.get(1), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range12), 8); + Range range13 = Range.closedOpen(positions.get(2), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range13), 7); + + // Normal case: crosses ledgers. + Range range21 = Range.closed(positions.get(0), positions.get(19)); + assertEquals(ml.getNumberOfEntries(range21), 20); + Range range22 = Range.openClosed(positions.get(0), positions.get(19)); + assertEquals(ml.getNumberOfEntries(range22), 19); + Range range23 = Range.closedOpen(positions.get(0), positions.get(19)); + assertEquals(ml.getNumberOfEntries(range23), 19); + Range range24 = Range.closed(positions.get(0), positions.get(29)); + assertEquals(ml.getNumberOfEntries(range24), 30); + Range range25 = Range.openClosed(positions.get(0), positions.get(29)); + assertEquals(ml.getNumberOfEntries(range25), 29); + Range range26 = Range.closedOpen(positions.get(0), positions.get(29)); + assertEquals(ml.getNumberOfEntries(range26), 29); + + // Normal case: end with current ledger. + Range range27 = Range.closed(positions.get(0), positions.get(34)); + assertEquals(ml.getNumberOfEntries(range27), 35); + // Cover the following case. + // The use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry + // id that is larger than the LAC. + Range range28 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range28), 131); + + // From position that entry id is "-1" & positions in the same ledger. + Range range31 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(9)); + assertEquals(ml.getNumberOfEntries(range31), 10); + Range range32 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(9)); + assertEquals(ml.getNumberOfEntries(range32), 10); + Range range33 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(9)); + assertEquals(ml.getNumberOfEntries(range33), 9); + + // From position that entry id is "-1" & crosses ledgers. + Range range41 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(15)); + assertEquals(ml.getNumberOfEntries(range41), 16); + Range range42 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(15)); + assertEquals(ml.getNumberOfEntries(range42), 16); + Range range43 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(15)); + assertEquals(ml.getNumberOfEntries(range43), 15); + Range range44 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(25)); + assertEquals(ml.getNumberOfEntries(range44), 26); + Range range45 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(25)); + assertEquals(ml.getNumberOfEntries(range45), 26); + Range range46 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(25)); + assertEquals(ml.getNumberOfEntries(range46), 25); + + // Invalid range. + try { + Range.closed(positions.get(1), PositionFactory.create(ledger1.getLedgerId(), -1)); + fail("Should have failed because the range is invalid."); + } catch (IllegalArgumentException ex) { + assertTrue(ex.getMessage().contains("Invalid range")); + } + try { + Range.closed(positions.get(29), positions.get(0)); + fail("Should have failed because the range is invalid."); + } catch (IllegalArgumentException ex) { + assertTrue(ex.getMessage().contains("Invalid range")); + } + + // "To position" that entry id is "-1" & crosses ledgers. + Range range61 = Range.closed(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range61), 9); + Range range62 = Range.closedOpen(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range62), 9); + Range range63 = Range.openClosed(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range63), 8); + Range range64 = Range.closed(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range64), 19); + Range range65 = Range.closedOpen(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range65), 19); + Range range66 = Range.openClosed(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range66), 18); + + // "From position" is the latest entry of a ledger. + Range range71 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10)); + assertEquals(ml.getNumberOfEntries(range71), 2); + Range range72 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10)); + assertEquals(ml.getNumberOfEntries(range72), 1); + Range range73 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10)); + assertEquals(ml.getNumberOfEntries(range73), 1); + + // "From position" is the latest entry of a ledger, and "to position" has a negative entry id. + Range range81 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range81), 1); + Range range82 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range82), 0); + Range range83 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range83), 1); + + // "From position" is the latest entry of a ledger, and "to position" has a negative entry id & crosses ledgers. + Range range91 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range91), 11); + Range range92 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range92), 10); + Range range93 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range93), 11); + + // "To Position" is larger than LAC. + Range range101 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range101), 21); + Range range102 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range102), 20); + Range range103 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range103), 20); + + // "From position" is smaller than the first one. + Range range111 = Range.closed(PositionFactory.create(ledger1.getLedgerId() - 1, 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range111), 30); + Range range112 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId() - 1, 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range112), 30); + Range range113 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId() - 1, 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range113), 29); + + // Both "fromPosition" and "toPosition" have negative entry id & in the same ledger. + Range range121 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + PositionFactory.create(ledger1.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range121), 0); + Range range122 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger1.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range122), 0); + // Both "fromPosition" and "toPosition" have negative entry id & crosses ledgers. + Range range123 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range123), 10); + Range range124 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range124), 20); + Range range125 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1000)); + assertEquals(ml.getNumberOfEntries(range125), 20); + Range range126 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range126), 10); + Range range127 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range127), 20); + Range range128 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1000)); + assertEquals(ml.getNumberOfEntries(range128), 20); + Range range129 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range129), 10); + Range range1210 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range1210), 20); + Range range1211 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1000)); + assertEquals(ml.getNumberOfEntries(range1211), 20); + try { + Range.openClosed(PositionFactory.create(ledger2.getLedgerId(), -10), + PositionFactory.create(ledger1.getLedgerId(), -1)); + fail("Should have failed because the range is invalid."); + } catch (IllegalArgumentException ex) { + assertTrue(ex.getMessage().contains("Invalid range")); + } + + // Cover the following case. + // The use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry + // id that is larger than the LAC. + // The difference with above one: the LAC is not in the latest ledger. + ml.close(); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) factory.open(ledgerName, config); + assertNotEquals(ledger4.getLedgerId(), ml2.currentLedger.getId()); + Range range131 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + assertEquals(ml2.getNumberOfEntries(range131), 131); + Range range132 = Range.openClosed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), + 100)); + assertEquals(ml2.getNumberOfEntries(range132), 130); + Range range133 = Range.closedOpen(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), + 100)); + assertEquals(ml2.getNumberOfEntries(range133), 130); + + // cleanup. + ml2.delete(); + } + + @Test + public void testComparePositions() throws Exception { + final String ledgerName = "ml_" + UUID.randomUUID().toString().replaceAll("-", ""); + final String cursorName = "test-cursor"; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, config); + // Create a cursor to avoid entries being trimmed. + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor(cursorName); + int totalEntries = 30; + List positions = new ArrayList<>(totalEntries); + for (int i = 0; i < totalEntries; i++) { + Position pos = ml.addEntry(("entry-" + i).getBytes()); + positions.add(pos); + } + Iterator iterator = ml.getLedgersInfo().values().iterator(); + LedgerInfo ledger1 = iterator.next(); + LedgerInfo ledger2 = iterator.next(); + LedgerInfo ledger3 = iterator.next(); + assertEquals(ledger1.getEntries(), 10); + assertEquals(ledger2.getEntries(), 10); + + // Normal case: pos1 == pos2. + assertEquals(ml.comparePositions(positions.get(0), positions.get(0)), 0); + assertEquals(ml.comparePositions(positions.get(9), positions.get(9)), 0); + assertEquals(ml.comparePositions(positions.get(29), positions.get(29)), 0); + assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + PositionFactory.create(ledger2.getLedgerId(), -1)), 0); + + // Normal case: pos1 < pos2. + assertEquals(ml.comparePositions(positions.get(0), positions.get(1)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(9)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(10)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(19)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(20)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(29)), -1); + + // Normal case: pos1 > pos2. + assertEquals(ml.comparePositions(positions.get(1), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(9), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(10), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(19), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(20), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(29), positions.get(0)), 1); + + // Pos1 has negative entry id & both positions in the same ledger. + assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(0)), -1); + assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + positions.get(10)), -1); + assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), + positions.get(20)), -1); + assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(0)), -1); + // Pos1 has negative entry id & crosses ledgers. + assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + positions.get(0)), 1); + assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), + positions.get(0)), 1); + // Pos1 has negative entry id & the same value. + assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), positions.get(9)), + 0); + assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), positions.get(19)), + 0); + + // Pos2 has negative entry id & both positions in the same ledger. + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)), + 1); + assertEquals(ml.comparePositions(positions.get(10), PositionFactory.create(ledger2.getLedgerId(), -1)), + 1); + assertEquals(ml.comparePositions(positions.get(20), PositionFactory.create(ledger3.getLedgerId(), -1)), + 1); + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)), + 1); + // Pos2 has negative entry id & crosses ledgers. + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger2.getLedgerId(), -1)), + -1); + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger3.getLedgerId(), -1)), + -1); + // Pos2 has negative entry id & the same value. + assertEquals(ml.comparePositions(positions.get(9), PositionFactory.create(ledger2.getLedgerId(), -1)), + 0); + assertEquals(ml.comparePositions(positions.get(19), PositionFactory.create(ledger3.getLedgerId(), -1)), + 0); + + // Pos1 does not exist in ledgers. + assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId() - 1, 100), + positions.get(0)), -1); + assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId() + 1, 0), + positions.get(29)), 1); + + // Pos2 does not exist in ledgers. + assertEquals(ml.comparePositions(positions.get(0), + PositionFactory.create(ledger1.getLedgerId() - 1, 100)), 1); + assertEquals(ml.comparePositions(positions.get(29), + PositionFactory.create(ledger3.getLedgerId() + 1, 0)), -1); + + // cleanup. + ml.delete(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 4e15062b1e2c4..5d45318e83b10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -81,6 +81,7 @@ import lombok.EqualsAndHashCode; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -5362,4 +5363,50 @@ public void testResourceSharingEndToEnd(boolean usePulsarBinaryProtocol) throws log.info("-- Exiting {} test --", methodName); } + + @DataProvider + public Object[][] trimLedgerBeforeGetStats() { + return new Object[][] { + {true}, + {false} + }; + } + + @Test(dataProvider = "trimLedgerBeforeGetStats") + public void testBacklogAfterCreatedSubscription(boolean trimLegderBeforeGetStats) throws Exception { + String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp"); + String mlName = TopicName.get(topic).getPersistenceNamingEncoding(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(2); + config.setMinimumRolloverTime(1, TimeUnit.SECONDS); + if (!trimLegderBeforeGetStats) { + config.setRetentionTime(3600, TimeUnit.SECONDS); + } + ManagedLedgerFactory factory = pulsar.getDefaultManagedLedgerFactory(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, config); + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + for (int i = 0; i < 4; i++) { + producer.send("message-" + i); + Thread.sleep(1000); + } + producer.close(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + assertEquals(persistentTopic.getManagedLedger(), ml); + + if (trimLegderBeforeGetStats) { + CompletableFuture trimLedgerFuture = new CompletableFuture<>(); + ml.trimConsumedLedgersInBackground(trimLedgerFuture); + trimLedgerFuture.join(); + assertEquals(ml.getLedgersInfo().size(), 1); + assertEquals(ml.getCurrentLedgerEntries(), 0); + } + + admin.topics().createSubscription(topic, "sub1", MessageId.latest); + assertEquals(admin.topics().getStats(topic).getSubscriptions().get("sub1").getMsgBacklog(), 0); + + // cleanup + admin.topics().delete(topic, false); + } }