From 499efb694ad712ccdfa4880315e48419dd2d4535 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 3 Nov 2025 23:54:12 +0800 Subject: [PATCH 01/19] [fix][broker]Wrong backlog: expected 0 but got 1 --- .../mledger/impl/ManagedCursorImpl.java | 22 ++++++++- .../mledger/impl/ManagedLedgerImpl.java | 2 +- .../api/SimpleProducerConsumerTest.java | 48 +++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index d32f0c8e998a0..7c3985333c1bd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1453,6 +1453,23 @@ public Position getFirstPosition() { return firstLedgerId == null ? null : PositionFactory.create(firstLedgerId, 0); } + /** + * Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid. + * For example: position-1 is "1:{latest entry}", and position-2 is "2:0", they are the same position. + */ + private int comparePositions(Position pos1, Position pos2) { + if (pos1 == null || pos2 == null) { + throw new IllegalArgumentException("Positions must not be null"); + } + if (pos1.getLedgerId() == pos2.getLedgerId() && pos1.getEntryId() == pos2.getEntryId()) { + return 0; + } + if (!ledger.isValidPosition(pos1) || !ledger.isValidPosition(pos2)) { + return ledger.getNextValidPosition(pos1).compareTo(ledger.getNextValidPosition(pos2)); + } + return pos1.compareTo(pos2); + } + protected void internalResetCursor(Position proposedReadPosition, AsyncCallbacks.ResetCursorCallback resetCursorCallback) { final Position newReadPosition; @@ -1489,7 +1506,10 @@ public void operationComplete() { // modify mark delete and read position since we are able to persist new position for cursor lock.writeLock().lock(); try { - if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) { + int compareRes = comparePositions(markDeletePosition, newMarkDeletePosition); + if (compareRes == 0) { + // nothing to do. + } else if (compareRes > 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries( Range.closedOpen(newMarkDeletePosition, markDeletePosition))); } else { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 17ffecb561785..b2720d933bd88 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3897,7 +3897,7 @@ public long getNumberOfEntries(Range range) { long count = 0; // If the from & to are pointing to different ledgers, then we need to : // 1. Add the entries in the ledger pointed by toPosition - count += toPosition.getEntryId(); + count += toPosition.getEntryId() < 0 ? 0 : toPosition.getEntryId(); count += toIncluded ? 1 : 0; // 2. Add the entries in the ledger pointed by fromPosition diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 4e15062b1e2c4..8241ccfc791e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -81,6 +81,8 @@ import lombok.EqualsAndHashCode; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -5362,4 +5364,50 @@ public void testResourceSharingEndToEnd(boolean usePulsarBinaryProtocol) throws log.info("-- Exiting {} test --", methodName); } + + @DataProvider + public Object[][] trimLedgerBeforeGetStats() { + return new Object[][] { + {true}, + {false} + }; + } + + @Test(dataProvider = "trimLedgerBeforeGetStats") + public void testBacklogAfterCreatedSubscription(boolean trimLegderBeforeGetStats) throws Exception { + String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp"); + String mlName = TopicName.get(topic).getPersistenceNamingEncoding(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(2); + config.setMinimumRolloverTime(1, TimeUnit.SECONDS); + if (!trimLegderBeforeGetStats) { + config.setRetentionTime(3600, TimeUnit.SECONDS); + } + ManagedLedgerFactory factory = pulsar.getDefaultManagedLedgerFactory(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, config); + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + for (int i = 0; i < 4; i++) { + producer.send("message-" + i); + Thread.sleep(1000); + } + producer.close(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + assertEquals(persistentTopic.getManagedLedger(), ml); + + if (trimLegderBeforeGetStats) { + CompletableFuture trimLedgerFuture = new CompletableFuture<>(); + ml.trimConsumedLedgersInBackground(trimLedgerFuture); + trimLedgerFuture.join(); + assertEquals(ml.getLedgersInfo().size(), 1); + assertEquals(ml.getCurrentLedgerEntries(), 0); + } + + admin.topics().createSubscription(topic, "sub1", MessageId.latest); + assertEquals(admin.topics().getStats(topic).getSubscriptions().get("sub1").getMsgBacklog(), 0); + + // cleanup + admin.topics().delete(topic, false); + } } From 546ea268aba13314714cc27f2b648e4a7eeb7c3a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 4 Nov 2025 00:13:08 +0800 Subject: [PATCH 02/19] checkstyle --- .../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 8241ccfc791e2..5d45318e83b10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -81,7 +81,6 @@ import lombok.EqualsAndHashCode; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.common.concurrent.FutureUtils; -import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; From 3592eaee43f2ec3e399ff6511ac63ef96d5460a9 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 12 Nov 2025 21:09:07 +0800 Subject: [PATCH 03/19] improve compare method --- .../mledger/impl/ManagedLedgerImpl.java | 54 +++++++++++++------ 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b2720d933bd88..41affa97a89e5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3881,6 +3881,13 @@ public long getNumberOfEntries(Range range) { Position toPosition = range.upperEndpoint(); boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; + // If the "fromPosition" is after "toPosition", then there is no entry in the range. + if (fromPosition.getLedgerId() > toPosition.getLedgerId() || (fromPosition.getLedgerId() == toPosition.getLedgerId() + && fromPosition.getEntryId() > toPosition.getEntryId())) { + return 0; + } + + // If the 2 positions are in the same ledger. if (fromPosition.getLedgerId() == toPosition.getLedgerId()) { LedgerInfo li = ledgers.get(toPosition.getLedgerId()); if (li != null) { @@ -3893,28 +3900,43 @@ public long getNumberOfEntries(Range range) { // if the ledgerId is not in the ledgers, it means it has been deleted return 0; } - } else { - long count = 0; - // If the from & to are pointing to different ledgers, then we need to : - // 1. Add the entries in the ledger pointed by toPosition - count += toPosition.getEntryId() < 0 ? 0 : toPosition.getEntryId(); + } + + // If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId". + // 1. Add the entries in the ledger pointed by toPosition. + // 2. Add the entries in the ledger pointed by fromPosition. + // 3. Add the whole ledgers entries in between. + long count = 0; + + // 1. Add the entries in the ledger pointed by toPosition. + // Add nothing if "toPosition" does not exit in "ledgers". + // Add nothing if "toPosition.entryId < 0". + LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId()); + if (toPosition.getEntryId() > 0 && toLedger != null) { + count += Math.min(toPosition.getEntryId(), toLedger.getEntries() - 1); count += toIncluded ? 1 : 0; + } - // 2. Add the entries in the ledger pointed by fromPosition - LedgerInfo li = ledgers.get(fromPosition.getLedgerId()); - if (li != null) { - count += li.getEntries() - (fromPosition.getEntryId() + 1); + // 2. Add the entries in the ledger pointed by fromPosition. + // Add nothing if "toPosition.entryId < 0". + // Add nothing if "toPosition" does not exit in "ledgers". + LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId()); + if (formLedger != null) { + if (fromPosition.getEntryId() < 0) { + count += formLedger.getEntries(); + } else { + count += formLedger.getEntries() - (fromPosition.getEntryId() + 1); count += fromIncluded ? 1 : 0; } + } - // 3. Add the whole ledgers entries in between - for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false) - .values()) { - count += ls.getEntries(); - } - - return count; + // 3. Add the whole ledgers entries in between + for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false) + .values()) { + count += ls.getEntries(); } + + return count; } /** From 1eada9e41072b17ab09b78f432d291dbc4c9546d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 12 Nov 2025 22:05:24 +0800 Subject: [PATCH 04/19] add more tests --- .../mledger/impl/ManagedCursorImpl.java | 2 +- .../mledger/impl/ManagedLedgerImpl.java | 6 +- .../mledger/impl/ManagedLedgerTest.java | 150 ++++++++++++++++++ 3 files changed, 154 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7c3985333c1bd..588b23483c398 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1455,7 +1455,7 @@ public Position getFirstPosition() { /** * Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid. - * For example: position-1 is "1:{latest entry}", and position-2 is "2:0", they are the same position. + * For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position. */ private int comparePositions(Position pos1, Position pos2) { if (pos1 == null || pos2 == null) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 41affa97a89e5..8587ccf22f481 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3893,8 +3893,8 @@ public long getNumberOfEntries(Range range) { if (li != null) { // If the 2 positions are in the same ledger long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1; - count += fromIncluded ? 1 : 0; - count += toIncluded ? 1 : 0; + count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 : 0; + count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0; return count; } else { // if the ledgerId is not in the ledgers, it means it has been deleted @@ -3912,7 +3912,7 @@ public long getNumberOfEntries(Range range) { // Add nothing if "toPosition" does not exit in "ledgers". // Add nothing if "toPosition.entryId < 0". LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId()); - if (toPosition.getEntryId() > 0 && toLedger != null) { + if (toPosition.getEntryId() >= 0 && toLedger != null) { count += Math.min(toPosition.getEntryId(), toLedger.getEntries() - 1); count += toIncluded ? 1 : 0; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index eb65cbd2c225f..0a4cfa6521f1a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4774,4 +4774,154 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { cursor.close(); ledger.close(); } + + @Test + public void testGetNumberOfEntriesWithRangeParam() throws Exception { + final String ledgerName = "ml_" + UUID.randomUUID().toString().replaceAll("-", ""); + final String cursorName = "test-cursor"; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, config); + // Create a cursor to avoid entries being trimmed. + ml.openCursor(cursorName); + int totalEntries = 30; + List positions = new ArrayList<>(totalEntries); + for (int i = 0; i < totalEntries; i++) { + Position pos = ml.addEntry(("entry-" + i).getBytes()); + positions.add(pos); + } + Iterator iterator = ml.getLedgersInfo().values().iterator(); + LedgerInfo ledger1 = iterator.next(); + LedgerInfo ledger2 = iterator.next(); + LedgerInfo ledger3 = iterator.next(); + assertEquals(ledger1.getEntries(), 10); + assertEquals(ledger2.getEntries(), 10); + + // Normal case: same ledger. + Range range11 = Range.closed(positions.get(0), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range11), 10); + Range range12 = Range.openClosed(positions.get(1), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range12), 8); + Range range13 = Range.closedOpen(positions.get(2), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range13), 7); + + // Normal case: crosses ledgers. + Range range21 = Range.closed(positions.get(0), positions.get(19)); + assertEquals(ml.getNumberOfEntries(range21), 20); + Range range22 = Range.openClosed(positions.get(0), positions.get(19)); + assertEquals(ml.getNumberOfEntries(range22), 19); + Range range23 = Range.closedOpen(positions.get(0), positions.get(19)); + assertEquals(ml.getNumberOfEntries(range23), 19); + Range range24 = Range.closed(positions.get(0), positions.get(29)); + assertEquals(ml.getNumberOfEntries(range24), 30); + Range range25 = Range.openClosed(positions.get(0), positions.get(29)); + assertEquals(ml.getNumberOfEntries(range25), 29); + Range range26 = Range.closedOpen(positions.get(0), positions.get(29)); + assertEquals(ml.getNumberOfEntries(range26), 29); + + // From position that entry id is "-1" & positions in the same ledger. + Range range31 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range31), 10); + Range range32 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range32), 10); + Range range33 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(9)); + assertEquals(ml.getNumberOfEntries(range33), 9); + + // From position that entry id is "-1" & crosses ledgers. + Range range41 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(15)); + assertEquals(ml.getNumberOfEntries(range41), 16); + Range range42 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(15)); + assertEquals(ml.getNumberOfEntries(range42), 16); + Range range43 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(15)); + assertEquals(ml.getNumberOfEntries(range43), 15); + Range range44 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(25)); + assertEquals(ml.getNumberOfEntries(range44), 26); + Range range45 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(25)); + assertEquals(ml.getNumberOfEntries(range45), 26); + Range range46 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(25)); + assertEquals(ml.getNumberOfEntries(range46), 25); + + // Invalid range. + try { + Range.closed(positions.get(1), PositionFactory.create(ledger1.getLedgerId(), -1)); + fail("Should have failed because the range is invalid."); + } catch (IllegalArgumentException ex) { + assertTrue(ex.getMessage().contains("Invalid range")); + } + try { + Range.closed(positions.get(29), positions.get(0)); + fail("Should have failed because the range is invalid."); + } catch (IllegalArgumentException ex) { + assertTrue(ex.getMessage().contains("Invalid range")); + } + + // "To position" that entry id is "-1" & crosses ledgers. + Range range61 = Range.closed(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range61), 9); + Range range62 = Range.closedOpen(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range62), 9); + Range range63 = Range.openClosed(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range63), 8); + Range range64 = Range.closed(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range64), 19); + Range range65 = Range.closedOpen(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range65), 19); + Range range66 = Range.openClosed(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range66), 18); + + // "From position" is the latest entry of a ledger. + Range range71 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10)); + assertEquals(ml.getNumberOfEntries(range71), 2); + Range range72 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10)); + assertEquals(ml.getNumberOfEntries(range72), 1); + Range range73 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10)); + assertEquals(ml.getNumberOfEntries(range73), 1); + + // "From position" is the latest entry of a ledger, and "to position" has a negative entry id. + Range range81 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range81), 1); + Range range82 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range82), 0); + Range range83 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range83), 1); + + // "From position" is the latest entry of a ledger, and "to position" has a negative entry id & crosses ledgers. + Range range91 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range91), 11); + Range range92 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range92), 10); + Range range93 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range93), 11); + + // "To Position" is larger than LAC. + Range range101 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range101), 21); + Range range102 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range102), 20); + Range range103 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range103), 20); + + // "From position" is smaller than the first one. + Range range111 = Range.closed(PositionFactory.create(ledger1.getLedgerId() - 1, 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range111), 30); + Range range112 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId() - 1, 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range112), 30); + Range range113 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId() - 1, 9), + PositionFactory.create(ledger3.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range113), 29); + + // cleanup. + ml.delete(); + } } From 8193f48dec6ccc81d1123e8605f6bedfb3ef8733 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 12 Nov 2025 22:15:52 +0800 Subject: [PATCH 05/19] checkstyle --- .../mledger/impl/ManagedCursorImpl.java | 9 ++++--- .../mledger/impl/ManagedLedgerImpl.java | 3 ++- .../mledger/impl/ManagedLedgerTest.java | 27 ++++++++++++------- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 588b23483c398..d23725299d926 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1506,13 +1506,14 @@ public void operationComplete() { // modify mark delete and read position since we are able to persist new position for cursor lock.writeLock().lock(); try { + // Correct the variable "messagesConsumedCounter". + // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the + // old one. int compareRes = comparePositions(markDeletePosition, newMarkDeletePosition); - if (compareRes == 0) { - // nothing to do. - } else if (compareRes > 0) { + if (compareRes > 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries( Range.closedOpen(newMarkDeletePosition, markDeletePosition))); - } else { + } else if (compareRes < 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries( Range.closedOpen(markDeletePosition, newMarkDeletePosition))); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 8587ccf22f481..60f1e60f1c6ec 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3882,7 +3882,8 @@ public long getNumberOfEntries(Range range) { boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; // If the "fromPosition" is after "toPosition", then there is no entry in the range. - if (fromPosition.getLedgerId() > toPosition.getLedgerId() || (fromPosition.getLedgerId() == toPosition.getLedgerId() + if (fromPosition.getLedgerId() > toPosition.getLedgerId() || + (fromPosition.getLedgerId() == toPosition.getLedgerId() && fromPosition.getEntryId() > toPosition.getEntryId())) { return 0; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 0a4cfa6521f1a..0a37e4966f09d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4820,25 +4820,34 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { assertEquals(ml.getNumberOfEntries(range26), 29); // From position that entry id is "-1" & positions in the same ledger. - Range range31 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(9)); + Range range31 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(9)); assertEquals(ml.getNumberOfEntries(range31), 10); - Range range32 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(9)); + Range range32 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(9)); assertEquals(ml.getNumberOfEntries(range32), 10); - Range range33 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(9)); + Range range33 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(9)); assertEquals(ml.getNumberOfEntries(range33), 9); // From position that entry id is "-1" & crosses ledgers. - Range range41 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(15)); + Range range41 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(15)); assertEquals(ml.getNumberOfEntries(range41), 16); - Range range42 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(15)); + Range range42 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(15)); assertEquals(ml.getNumberOfEntries(range42), 16); - Range range43 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(15)); + Range range43 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(15)); assertEquals(ml.getNumberOfEntries(range43), 15); - Range range44 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(25)); + Range range44 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(25)); assertEquals(ml.getNumberOfEntries(range44), 26); - Range range45 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(25)); + Range range45 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(25)); assertEquals(ml.getNumberOfEntries(range45), 26); - Range range46 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(25)); + Range range46 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(25)); assertEquals(ml.getNumberOfEntries(range46), 25); // Invalid range. From b148ae72358a2f0245497dc0122c3c86271df5c8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 12 Nov 2025 23:59:29 +0800 Subject: [PATCH 06/19] add more tests --- .../mledger/impl/ManagedCursorImpl.java | 11 ++- .../mledger/impl/ManagedLedgerTest.java | 94 +++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index d23725299d926..dd244af119358 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1457,13 +1457,22 @@ public Position getFirstPosition() { * Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid. * For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position. */ - private int comparePositions(Position pos1, Position pos2) { + @VisibleForTesting + int comparePositions(Position pos1, Position pos2) { if (pos1 == null || pos2 == null) { throw new IllegalArgumentException("Positions must not be null"); } if (pos1.getLedgerId() == pos2.getLedgerId() && pos1.getEntryId() == pos2.getEntryId()) { return 0; } + if (pos1.getLedgerId() < ledger.getFirstPosition().getLedgerId() + || pos2.getLedgerId() < ledger.getFirstPosition().getLedgerId() + || pos1.getLedgerId() > ledger.getLastPosition().getLedgerId() + || pos2.getLedgerId() > ledger.getLastPosition().getLedgerId()) { + log.warn("[{}] [{}] Comparing un-exist position {} and {}", ledger.getName(), name, pos1, pos2, + new IllegalArgumentException("Comparing un-exist position")); + return pos1.compareTo(pos2); + } if (!ledger.isValidPosition(pos1) || !ledger.isValidPosition(pos2)) { return ledger.getNextValidPosition(pos1).compareTo(ledger.getNextValidPosition(pos2)); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 0a37e4966f09d..5ed6df7fffd14 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4933,4 +4933,98 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { // cleanup. ml.delete(); } + + @Test + public void testCursorComparePositions() throws Exception { + final String ledgerName = "ml_" + UUID.randomUUID().toString().replaceAll("-", ""); + final String cursorName = "test-cursor"; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, config); + // Create a cursor to avoid entries being trimmed. + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor(cursorName); + int totalEntries = 30; + List positions = new ArrayList<>(totalEntries); + for (int i = 0; i < totalEntries; i++) { + Position pos = ml.addEntry(("entry-" + i).getBytes()); + positions.add(pos); + } + Iterator iterator = ml.getLedgersInfo().values().iterator(); + LedgerInfo ledger1 = iterator.next(); + LedgerInfo ledger2 = iterator.next(); + LedgerInfo ledger3 = iterator.next(); + assertEquals(ledger1.getEntries(), 10); + assertEquals(ledger2.getEntries(), 10); + + // Normal case: pos1 < pos2. + assertEquals(cursor.comparePositions(positions.get(0), positions.get(1)), -1); + assertEquals(cursor.comparePositions(positions.get(0), positions.get(9)), -1); + assertEquals(cursor.comparePositions(positions.get(0), positions.get(10)), -1); + assertEquals(cursor.comparePositions(positions.get(0), positions.get(19)), -1); + assertEquals(cursor.comparePositions(positions.get(0), positions.get(20)), -1); + assertEquals(cursor.comparePositions(positions.get(0), positions.get(29)), -1); + + // Normal case: pos1 > pos2. + assertEquals(cursor.comparePositions(positions.get(1), positions.get(0)), 1); + assertEquals(cursor.comparePositions(positions.get(9), positions.get(0)), 1); + assertEquals(cursor.comparePositions(positions.get(10), positions.get(0)), 1); + assertEquals(cursor.comparePositions(positions.get(19), positions.get(0)), 1); + assertEquals(cursor.comparePositions(positions.get(20), positions.get(0)), 1); + assertEquals(cursor.comparePositions(positions.get(29), positions.get(0)), 1); + + // Pos1 has negative entry id & both positions in the same ledger. + assertEquals(cursor.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(0)), -1); + assertEquals(cursor.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + positions.get(10)), -1); + assertEquals(cursor.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), + positions.get(20)), -1); + assertEquals(cursor.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1), + positions.get(0)), -1); + // Pos1 has negative entry id & crosses ledgers. + assertEquals(cursor.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + positions.get(0)), 1); + assertEquals(cursor.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), + positions.get(0)), 1); + // Pos1 has negative entry id & the same value. + assertEquals(cursor.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), positions.get(9)), + 0); + assertEquals(cursor.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), positions.get(19)), + 0); + + // Pos2 has negative entry id & both positions in the same ledger. + assertEquals(cursor.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)), + 1); + assertEquals(cursor.comparePositions(positions.get(10), PositionFactory.create(ledger2.getLedgerId(), -1)), + 1); + assertEquals(cursor.comparePositions(positions.get(20), PositionFactory.create(ledger3.getLedgerId(), -1)), + 1); + assertEquals(cursor.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)), + 1); + // Pos2 has negative entry id & crosses ledgers. + assertEquals(cursor.comparePositions(positions.get(0), PositionFactory.create(ledger2.getLedgerId(), -1)), + -1); + assertEquals(cursor.comparePositions(positions.get(0), PositionFactory.create(ledger3.getLedgerId(), -1)), + -1); + // Pos2 has negative entry id & the same value. + assertEquals(cursor.comparePositions(positions.get(9), PositionFactory.create(ledger2.getLedgerId(), -1)), + 0); + assertEquals(cursor.comparePositions(positions.get(19), PositionFactory.create(ledger3.getLedgerId(), -1)), + 0); + + // Pos1 does not exist in ledgers. + assertEquals(cursor.comparePositions(PositionFactory.create(ledger1.getLedgerId() - 1, 100), + positions.get(0)), -1); + assertEquals(cursor.comparePositions(PositionFactory.create(ledger3.getLedgerId() + 1, 0), + positions.get(29)), 1); + + // Pos2 does not exist in ledgers. + assertEquals(cursor.comparePositions(positions.get(0), + PositionFactory.create(ledger1.getLedgerId() - 1, 100)), 1); + assertEquals(cursor.comparePositions(positions.get(29), + PositionFactory.create(ledger3.getLedgerId() + 1, 0)), -1); + + // cleanup. + ml.delete(); + } } From c5352c70982771408475a3f62979b480ce3a0fe7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Nov 2025 00:04:13 +0800 Subject: [PATCH 07/19] improve performance --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index dd244af119358..0016a4075ec04 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1473,6 +1473,9 @@ int comparePositions(Position pos1, Position pos2) { new IllegalArgumentException("Comparing un-exist position")); return pos1.compareTo(pos2); } + if (pos1.getLedgerId() == pos2.getLedgerId()) { + return Long.compare(pos1.getEntryId(), pos2.getEntryId()); + } if (!ledger.isValidPosition(pos1) || !ledger.isValidPosition(pos2)) { return ledger.getNextValidPosition(pos1).compareTo(ledger.getNextValidPosition(pos2)); } From 29f17a42b819614c16be1a33a3ce484e7be2c2ff Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Nov 2025 00:05:02 +0800 Subject: [PATCH 08/19] improve performance --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0016a4075ec04..0aa3d0487b382 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1462,15 +1462,12 @@ int comparePositions(Position pos1, Position pos2) { if (pos1 == null || pos2 == null) { throw new IllegalArgumentException("Positions must not be null"); } - if (pos1.getLedgerId() == pos2.getLedgerId() && pos1.getEntryId() == pos2.getEntryId()) { - return 0; - } if (pos1.getLedgerId() < ledger.getFirstPosition().getLedgerId() - || pos2.getLedgerId() < ledger.getFirstPosition().getLedgerId() - || pos1.getLedgerId() > ledger.getLastPosition().getLedgerId() - || pos2.getLedgerId() > ledger.getLastPosition().getLedgerId()) { + || pos2.getLedgerId() < ledger.getFirstPosition().getLedgerId() + || pos1.getLedgerId() > ledger.getLastPosition().getLedgerId() + || pos2.getLedgerId() > ledger.getLastPosition().getLedgerId()) { log.warn("[{}] [{}] Comparing un-exist position {} and {}", ledger.getName(), name, pos1, pos2, - new IllegalArgumentException("Comparing un-exist position")); + new IllegalArgumentException("Comparing un-exist position")); return pos1.compareTo(pos2); } if (pos1.getLedgerId() == pos2.getLedgerId()) { From f6f61ba19bcbe2a055b1b396434a8ff9bd184fb5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Nov 2025 00:07:43 +0800 Subject: [PATCH 09/19] add more tests --- .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 5ed6df7fffd14..50490ae735345 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4956,6 +4956,13 @@ public void testCursorComparePositions() throws Exception { assertEquals(ledger1.getEntries(), 10); assertEquals(ledger2.getEntries(), 10); + // Normal case: pos1 == pos2. + assertEquals(cursor.comparePositions(positions.get(0), positions.get(0)), 0); + assertEquals(cursor.comparePositions(positions.get(9), positions.get(9)), 0); + assertEquals(cursor.comparePositions(positions.get(29), positions.get(29)), 0); + assertEquals(cursor.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + PositionFactory.create(ledger2.getLedgerId(), -1)), 0); + // Normal case: pos1 < pos2. assertEquals(cursor.comparePositions(positions.get(0), positions.get(1)), -1); assertEquals(cursor.comparePositions(positions.get(0), positions.get(9)), -1); From dbb3fc22d33bc64d8a3667fec5ec5e50eecf7109 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Nov 2025 11:19:32 +0800 Subject: [PATCH 10/19] checkstyle --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 60f1e60f1c6ec..6e195bcb5d0ed 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3882,8 +3882,8 @@ public long getNumberOfEntries(Range range) { boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; // If the "fromPosition" is after "toPosition", then there is no entry in the range. - if (fromPosition.getLedgerId() > toPosition.getLedgerId() || - (fromPosition.getLedgerId() == toPosition.getLedgerId() + if (fromPosition.getLedgerId() > toPosition.getLedgerId() + || (fromPosition.getLedgerId() == toPosition.getLedgerId() && fromPosition.getEntryId() > toPosition.getEntryId())) { return 0; } From 7f36591aa0765f536c66f3153cab2cd2dc741c46 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Nov 2025 20:36:32 +0800 Subject: [PATCH 11/19] fix bugs --- .../mledger/impl/ManagedCursorImpl.java | 32 +------- .../mledger/impl/ManagedLedgerImpl.java | 30 ++++++++ .../mledger/impl/ManagedLedgerTest.java | 74 +++++++++---------- 3 files changed, 70 insertions(+), 66 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0aa3d0487b382..e1ee50aad0f84 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1453,32 +1453,6 @@ public Position getFirstPosition() { return firstLedgerId == null ? null : PositionFactory.create(firstLedgerId, 0); } - /** - * Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid. - * For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position. - */ - @VisibleForTesting - int comparePositions(Position pos1, Position pos2) { - if (pos1 == null || pos2 == null) { - throw new IllegalArgumentException("Positions must not be null"); - } - if (pos1.getLedgerId() < ledger.getFirstPosition().getLedgerId() - || pos2.getLedgerId() < ledger.getFirstPosition().getLedgerId() - || pos1.getLedgerId() > ledger.getLastPosition().getLedgerId() - || pos2.getLedgerId() > ledger.getLastPosition().getLedgerId()) { - log.warn("[{}] [{}] Comparing un-exist position {} and {}", ledger.getName(), name, pos1, pos2, - new IllegalArgumentException("Comparing un-exist position")); - return pos1.compareTo(pos2); - } - if (pos1.getLedgerId() == pos2.getLedgerId()) { - return Long.compare(pos1.getEntryId(), pos2.getEntryId()); - } - if (!ledger.isValidPosition(pos1) || !ledger.isValidPosition(pos2)) { - return ledger.getNextValidPosition(pos1).compareTo(ledger.getNextValidPosition(pos2)); - } - return pos1.compareTo(pos2); - } - protected void internalResetCursor(Position proposedReadPosition, AsyncCallbacks.ResetCursorCallback resetCursorCallback) { final Position newReadPosition; @@ -1518,13 +1492,13 @@ public void operationComplete() { // Correct the variable "messagesConsumedCounter". // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the // old one. - int compareRes = comparePositions(markDeletePosition, newMarkDeletePosition); + int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition); if (compareRes > 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries( - Range.closedOpen(newMarkDeletePosition, markDeletePosition))); + Range.openClosed(newMarkDeletePosition, markDeletePosition))); } else if (compareRes < 0) { MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries( - Range.closedOpen(markDeletePosition, newMarkDeletePosition))); + Range.openClosed(markDeletePosition, newMarkDeletePosition))); } markDeletePosition = newMarkDeletePosition; lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6e195bcb5d0ed..f11f001f95ae2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3868,6 +3868,32 @@ private CompletableFuture completeLedgerInfoForOffloaded(long ledgerId, UU }); } + /** + * Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid. + * For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position. + */ + @VisibleForTesting + int comparePositions(Position pos1, Position pos2) { + if (pos1 == null || pos2 == null) { + throw new IllegalArgumentException("Positions must not be null"); + } + if (pos1.getLedgerId() < getFirstPosition().getLedgerId() + || pos2.getLedgerId() < getFirstPosition().getLedgerId() + || pos1.getLedgerId() > getLastPosition().getLedgerId() + || pos2.getLedgerId() > getLastPosition().getLedgerId()) { + log.warn("[{}] Comparing un-exist position {} and {}", name, pos1, pos2, + new IllegalArgumentException("Comparing un-exist position")); + return pos1.compareTo(pos2); + } + if (pos1.getLedgerId() == pos2.getLedgerId()) { + return Long.compare(pos1.getEntryId(), pos2.getEntryId()); + } + if (!isValidPosition(pos1) || !isValidPosition(pos2)) { + return getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2)); + } + return pos1.compareTo(pos2); + } + /** * Get the number of entries between a contiguous range of two positions. * @@ -3880,6 +3906,10 @@ public long getNumberOfEntries(Range range) { boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED; Position toPosition = range.upperEndpoint(); boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; + if (comparePositions(fromPosition, toPosition) > 0) { + log.warn("[{}] Getting number of entries with an invalid range {} and {}", name, fromPosition, toPosition); + throw new IllegalArgumentException("Invalid range " + range); + } // If the "fromPosition" is after "toPosition", then there is no entry in the range. if (fromPosition.getLedgerId() > toPosition.getLedgerId() diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 50490ae735345..7f26516049a63 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4935,7 +4935,7 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { } @Test - public void testCursorComparePositions() throws Exception { + public void testComparePositions() throws Exception { final String ledgerName = "ml_" + UUID.randomUUID().toString().replaceAll("-", ""); final String cursorName = "test-cursor"; ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -4957,78 +4957,78 @@ public void testCursorComparePositions() throws Exception { assertEquals(ledger2.getEntries(), 10); // Normal case: pos1 == pos2. - assertEquals(cursor.comparePositions(positions.get(0), positions.get(0)), 0); - assertEquals(cursor.comparePositions(positions.get(9), positions.get(9)), 0); - assertEquals(cursor.comparePositions(positions.get(29), positions.get(29)), 0); - assertEquals(cursor.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + assertEquals(ml.comparePositions(positions.get(0), positions.get(0)), 0); + assertEquals(ml.comparePositions(positions.get(9), positions.get(9)), 0); + assertEquals(ml.comparePositions(positions.get(29), positions.get(29)), 0); + assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), PositionFactory.create(ledger2.getLedgerId(), -1)), 0); // Normal case: pos1 < pos2. - assertEquals(cursor.comparePositions(positions.get(0), positions.get(1)), -1); - assertEquals(cursor.comparePositions(positions.get(0), positions.get(9)), -1); - assertEquals(cursor.comparePositions(positions.get(0), positions.get(10)), -1); - assertEquals(cursor.comparePositions(positions.get(0), positions.get(19)), -1); - assertEquals(cursor.comparePositions(positions.get(0), positions.get(20)), -1); - assertEquals(cursor.comparePositions(positions.get(0), positions.get(29)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(1)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(9)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(10)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(19)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(20)), -1); + assertEquals(ml.comparePositions(positions.get(0), positions.get(29)), -1); // Normal case: pos1 > pos2. - assertEquals(cursor.comparePositions(positions.get(1), positions.get(0)), 1); - assertEquals(cursor.comparePositions(positions.get(9), positions.get(0)), 1); - assertEquals(cursor.comparePositions(positions.get(10), positions.get(0)), 1); - assertEquals(cursor.comparePositions(positions.get(19), positions.get(0)), 1); - assertEquals(cursor.comparePositions(positions.get(20), positions.get(0)), 1); - assertEquals(cursor.comparePositions(positions.get(29), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(1), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(9), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(10), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(19), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(20), positions.get(0)), 1); + assertEquals(ml.comparePositions(positions.get(29), positions.get(0)), 1); // Pos1 has negative entry id & both positions in the same ledger. - assertEquals(cursor.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1), + assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(0)), -1); - assertEquals(cursor.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), positions.get(10)), -1); - assertEquals(cursor.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), + assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), positions.get(20)), -1); - assertEquals(cursor.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1), + assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(0)), -1); // Pos1 has negative entry id & crosses ledgers. - assertEquals(cursor.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), + assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), positions.get(0)), 1); - assertEquals(cursor.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), + assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), positions.get(0)), 1); // Pos1 has negative entry id & the same value. - assertEquals(cursor.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), positions.get(9)), + assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), positions.get(9)), 0); - assertEquals(cursor.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), positions.get(19)), + assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), positions.get(19)), 0); // Pos2 has negative entry id & both positions in the same ledger. - assertEquals(cursor.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)), + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)), 1); - assertEquals(cursor.comparePositions(positions.get(10), PositionFactory.create(ledger2.getLedgerId(), -1)), + assertEquals(ml.comparePositions(positions.get(10), PositionFactory.create(ledger2.getLedgerId(), -1)), 1); - assertEquals(cursor.comparePositions(positions.get(20), PositionFactory.create(ledger3.getLedgerId(), -1)), + assertEquals(ml.comparePositions(positions.get(20), PositionFactory.create(ledger3.getLedgerId(), -1)), 1); - assertEquals(cursor.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)), + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)), 1); // Pos2 has negative entry id & crosses ledgers. - assertEquals(cursor.comparePositions(positions.get(0), PositionFactory.create(ledger2.getLedgerId(), -1)), + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger2.getLedgerId(), -1)), -1); - assertEquals(cursor.comparePositions(positions.get(0), PositionFactory.create(ledger3.getLedgerId(), -1)), + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger3.getLedgerId(), -1)), -1); // Pos2 has negative entry id & the same value. - assertEquals(cursor.comparePositions(positions.get(9), PositionFactory.create(ledger2.getLedgerId(), -1)), + assertEquals(ml.comparePositions(positions.get(9), PositionFactory.create(ledger2.getLedgerId(), -1)), 0); - assertEquals(cursor.comparePositions(positions.get(19), PositionFactory.create(ledger3.getLedgerId(), -1)), + assertEquals(ml.comparePositions(positions.get(19), PositionFactory.create(ledger3.getLedgerId(), -1)), 0); // Pos1 does not exist in ledgers. - assertEquals(cursor.comparePositions(PositionFactory.create(ledger1.getLedgerId() - 1, 100), + assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId() - 1, 100), positions.get(0)), -1); - assertEquals(cursor.comparePositions(PositionFactory.create(ledger3.getLedgerId() + 1, 0), + assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId() + 1, 0), positions.get(29)), 1); // Pos2 does not exist in ledgers. - assertEquals(cursor.comparePositions(positions.get(0), + assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId() - 1, 100)), 1); - assertEquals(cursor.comparePositions(positions.get(29), + assertEquals(ml.comparePositions(positions.get(29), PositionFactory.create(ledger3.getLedgerId() + 1, 0)), -1); // cleanup. From 6a17d2857ab8594f90ae5b0c43c13dbf1e322187 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Nov 2025 22:54:17 +0800 Subject: [PATCH 12/19] fix bug and add more tests --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 +++- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f11f001f95ae2..6b2e08f5d2433 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3944,7 +3944,9 @@ public long getNumberOfEntries(Range range) { // Add nothing if "toPosition.entryId < 0". LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId()); if (toPosition.getEntryId() >= 0 && toLedger != null) { - count += Math.min(toPosition.getEntryId(), toLedger.getEntries() - 1); + long entriesInLedger = toLedger.getLedgerId() == currentLedger.getId() + ? lastConfirmedEntry.getEntryId() + 1 : toLedger.getEntries(); + count += Math.min(toPosition.getEntryId(), entriesInLedger - 1); count += toIncluded ? 1 : 0; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 7f26516049a63..77b7b98730e17 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4784,7 +4784,7 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, config); // Create a cursor to avoid entries being trimmed. ml.openCursor(cursorName); - int totalEntries = 30; + int totalEntries = 35; List positions = new ArrayList<>(totalEntries); for (int i = 0; i < totalEntries; i++) { Position pos = ml.addEntry(("entry-" + i).getBytes()); @@ -4794,8 +4794,11 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { LedgerInfo ledger1 = iterator.next(); LedgerInfo ledger2 = iterator.next(); LedgerInfo ledger3 = iterator.next(); + LedgerInfo ledger4 = iterator.next(); assertEquals(ledger1.getEntries(), 10); assertEquals(ledger2.getEntries(), 10); + assertEquals(ledger3.getEntries(), 10); + assertEquals(ledger4.getLedgerId(), ml.getCurrentLedger().getId()); // Normal case: same ledger. Range range11 = Range.closed(positions.get(0), positions.get(9)); @@ -4819,6 +4822,10 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { Range range26 = Range.closedOpen(positions.get(0), positions.get(29)); assertEquals(ml.getNumberOfEntries(range26), 29); + // Normal case: end with current ledger. + Range range27 = Range.closed(positions.get(0), positions.get(34)); + assertEquals(ml.getNumberOfEntries(range27), 35); + // From position that entry id is "-1" & positions in the same ledger. Range range31 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), positions.get(9)); From 8aae3e1ebdc3a4483c4625f8719ea1f3fcda9aad Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Nov 2025 23:31:34 +0800 Subject: [PATCH 13/19] fix bug and add more tests --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 ++++- .../bookkeeper/mledger/impl/ManagedCursorListAckTest.java | 2 +- .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 5 +++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6b2e08f5d2433..0c9c6c6f4a7f5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3944,8 +3944,11 @@ public long getNumberOfEntries(Range range) { // Add nothing if "toPosition.entryId < 0". LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId()); if (toPosition.getEntryId() >= 0 && toLedger != null) { + // To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry + // id that is larger than the LAC. + // To support this case, use "Long.MAX_VALUE" if the ledger is the last one. long entriesInLedger = toLedger.getLedgerId() == currentLedger.getId() - ? lastConfirmedEntry.getEntryId() + 1 : toLedger.getEntries(); + ? Long.MAX_VALUE : toLedger.getEntries(); count += Math.min(toPosition.getEntryId(), entriesInLedger - 1); count += toIncluded ? 1 : 0; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java index c4d3b076ba3c5..a4895b2624bd0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java @@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends MockedBookKeeperTestCase { private static final Charset Encoding = StandardCharsets.UTF_8; - @Test(timeOut = 20000) + @Test(timeOut = 20000 * 1000) void testMultiPositionDelete() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 77b7b98730e17..342894fb37929 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4825,6 +4825,11 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { // Normal case: end with current ledger. Range range27 = Range.closed(positions.get(0), positions.get(34)); assertEquals(ml.getNumberOfEntries(range27), 35); + // Cover the following case. + // The use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry + // id that is larger than the LAC. + Range range28 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + assertEquals(ml.getNumberOfEntries(range28), 120); // From position that entry id is "-1" & positions in the same ledger. Range range31 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), From 36d8900b1bae1e056586264f6918a92f301669e5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Nov 2025 23:46:38 +0800 Subject: [PATCH 14/19] fix npe --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0c9c6c6f4a7f5..4ca2784c34066 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3947,7 +3947,7 @@ public long getNumberOfEntries(Range range) { // To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry // id that is larger than the LAC. // To support this case, use "Long.MAX_VALUE" if the ledger is the last one. - long entriesInLedger = toLedger.getLedgerId() == currentLedger.getId() + long entriesInLedger = (currentLedger != null && toLedger.getLedgerId() == currentLedger.getId()) ? Long.MAX_VALUE : toLedger.getEntries(); count += Math.min(toPosition.getEntryId(), entriesInLedger - 1); count += toIncluded ? 1 : 0; From 57f08d624ba297321bf2bd73f495924d799e285a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 14 Nov 2025 01:25:40 +0800 Subject: [PATCH 15/19] fix bug and add tests --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 6 +++--- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 14 ++++++++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4ca2784c34066..6e13273f84a66 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3944,10 +3944,10 @@ public long getNumberOfEntries(Range range) { // Add nothing if "toPosition.entryId < 0". LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId()); if (toPosition.getEntryId() >= 0 && toLedger != null) { - // To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry - // id that is larger than the LAC. + // To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that is larger + // than the LAC. // To support this case, use "Long.MAX_VALUE" if the ledger is the last one. - long entriesInLedger = (currentLedger != null && toLedger.getLedgerId() == currentLedger.getId()) + long entriesInLedger = comparePositions(toPosition, lastConfirmedEntry) >= 0 ? Long.MAX_VALUE : toLedger.getEntries(); count += Math.min(toPosition.getEntryId(), entriesInLedger - 1); count += toIncluded ? 1 : 0; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 342894fb37929..752ea55fc919b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4829,7 +4829,7 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { // The use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry // id that is larger than the LAC. Range range28 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); - assertEquals(ml.getNumberOfEntries(range28), 120); + assertEquals(ml.getNumberOfEntries(range28), 131); // From position that entry id is "-1" & positions in the same ledger. Range range31 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), @@ -4942,8 +4942,18 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { PositionFactory.create(ledger3.getLedgerId(), 100)); assertEquals(ml.getNumberOfEntries(range113), 29); + // Cover the following case. + // The use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry + // id that is larger than the LAC. + // The difference with above one: the LAC is not in the latest ledger. + ml.close(); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) factory.open(ledgerName, config); + assertNotEquals(ledger4.getLedgerId(), ml2.currentLedger.getId()); + Range range121 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + assertEquals(ml2.getNumberOfEntries(range121), 131); + // cleanup. - ml.delete(); + ml2.delete(); } @Test From 2a9ade799ef4d895761968ee6eaea672538c8edf Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 14 Nov 2025 13:08:03 +0800 Subject: [PATCH 16/19] fix bug and add tests --- .../mledger/impl/ManagedLedgerImpl.java | 8 +++-- .../mledger/impl/ManagedLedgerTest.java | 31 +++++++++++++++++-- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6e13273f84a66..23be3887124c6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3911,10 +3911,14 @@ public long getNumberOfEntries(Range range) { throw new IllegalArgumentException("Invalid range " + range); } - // If the "fromPosition" is after "toPosition", then there is no entry in the range. + // 1. If the "fromPosition" is after "toPosition", then there is no entry in the range. + // 2. If both "formPosition" and "toPosition" have negative entry id amd in the same ledger, then there is no + // entry in the range. if (fromPosition.getLedgerId() > toPosition.getLedgerId() || (fromPosition.getLedgerId() == toPosition.getLedgerId() - && fromPosition.getEntryId() > toPosition.getEntryId())) { + && fromPosition.getEntryId() > toPosition.getEntryId()) + || (fromPosition.getLedgerId() == toPosition.getLedgerId() + && fromPosition.getEntryId() < 0 && toPosition.getEntryId() < 0)) { return 0; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 752ea55fc919b..a8a3c727ebce9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4942,6 +4942,31 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { PositionFactory.create(ledger3.getLedgerId(), 100)); assertEquals(ml.getNumberOfEntries(range113), 29); + // Both "fromPosition" and "toPosition" have negative entry id & in the same ledger. + Range range121 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + PositionFactory.create(ledger1.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range121), 0); + Range range122 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger1.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range122), 0); + // Both "fromPosition" and "toPosition" have negative entry id & crosses ledgers. + Range range123 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range123), 10); + Range range124 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range124), 20); + Range range125 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1000)); + assertEquals(ml.getNumberOfEntries(range125), 20); + try { + Range.openClosed(PositionFactory.create(ledger2.getLedgerId(), -10), + PositionFactory.create(ledger1.getLedgerId(), -1)); + fail("Should have failed because the range is invalid."); + } catch (IllegalArgumentException ex) { + assertTrue(ex.getMessage().contains("Invalid range")); + } + // Cover the following case. // The use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry // id that is larger than the LAC. @@ -4949,8 +4974,10 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { ml.close(); ManagedLedgerImpl ml2 = (ManagedLedgerImpl) factory.open(ledgerName, config); assertNotEquals(ledger4.getLedgerId(), ml2.currentLedger.getId()); - Range range121 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); - assertEquals(ml2.getNumberOfEntries(range121), 131); + Range range131 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + assertEquals(ml2.getNumberOfEntries(range131), 131); + + // cleanup. ml2.delete(); From be92534e2f4467224c79814f5e2e17dc9cc7bd8a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 14 Nov 2025 13:11:54 +0800 Subject: [PATCH 17/19] add more tests --- .../mledger/impl/ManagedLedgerTest.java | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a8a3c727ebce9..59120e4b3acd2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4953,12 +4953,30 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { Range range123 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1), PositionFactory.create(ledger2.getLedgerId(), -1)); assertEquals(ml.getNumberOfEntries(range123), 10); - Range range124 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + Range range124 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10), PositionFactory.create(ledger3.getLedgerId(), -1)); assertEquals(ml.getNumberOfEntries(range124), 20); - Range range125 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + Range range125 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10), PositionFactory.create(ledger3.getLedgerId(), -1000)); assertEquals(ml.getNumberOfEntries(range125), 20); + Range range126 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range126), 10); + Range range127 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range127), 20); + Range range128 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1000)); + assertEquals(ml.getNumberOfEntries(range128), 20); + Range range129 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1), + PositionFactory.create(ledger2.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range129), 10); + Range range1210 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1)); + assertEquals(ml.getNumberOfEntries(range1210), 20); + Range range1211 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10), + PositionFactory.create(ledger3.getLedgerId(), -1000)); + assertEquals(ml.getNumberOfEntries(range1211), 20); try { Range.openClosed(PositionFactory.create(ledger2.getLedgerId(), -10), PositionFactory.create(ledger1.getLedgerId(), -1)); @@ -4976,8 +4994,10 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { assertNotEquals(ledger4.getLedgerId(), ml2.currentLedger.getId()); Range range131 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); assertEquals(ml2.getNumberOfEntries(range131), 131); - - + Range range132 = Range.openClosed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + assertEquals(ml2.getNumberOfEntries(range132), 130); + Range range133 = Range.closedOpen(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + assertEquals(ml2.getNumberOfEntries(range133), 130); // cleanup. ml2.delete(); From cb604f5258034201c1b95abfc6b237ea49c8e652 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 14 Nov 2025 14:09:26 +0800 Subject: [PATCH 18/19] fix npe --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 23be3887124c6..a42974f9b2c60 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3877,7 +3877,7 @@ int comparePositions(Position pos1, Position pos2) { if (pos1 == null || pos2 == null) { throw new IllegalArgumentException("Positions must not be null"); } - if (pos1.getLedgerId() < getFirstPosition().getLedgerId() + if (ledgers.isEmpty() || pos1.getLedgerId() < getFirstPosition().getLedgerId() || pos2.getLedgerId() < getFirstPosition().getLedgerId() || pos1.getLedgerId() > getLastPosition().getLedgerId() || pos2.getLedgerId() > getLastPosition().getLedgerId()) { From 00dc60abe65f6bedfbefb43b0d1a37c71ab806f7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 14 Nov 2025 14:43:32 +0800 Subject: [PATCH 19/19] checkstyle --- .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 59120e4b3acd2..9216bd60ed422 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4994,9 +4994,11 @@ public void testGetNumberOfEntriesWithRangeParam() throws Exception { assertNotEquals(ledger4.getLedgerId(), ml2.currentLedger.getId()); Range range131 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); assertEquals(ml2.getNumberOfEntries(range131), 131); - Range range132 = Range.openClosed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + Range range132 = Range.openClosed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), + 100)); assertEquals(ml2.getNumberOfEntries(range132), 130); - Range range133 = Range.closedOpen(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100)); + Range range133 = Range.closedOpen(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), + 100)); assertEquals(ml2.getNumberOfEntries(range133), 130); // cleanup.