diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 5042d19c12018..2e494685c9173 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -48,8 +48,8 @@
1.8
8
3.1.0
- 2.23.1
- 2.0.13
+ 2.25.2
+ 2.0.17
7.7.1
3.18.0
4.1
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 55660bcf51b5b..068dfe50ffe96 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -349,10 +349,10 @@ The Apache Software License, Version 2.0
- jakarta.validation-jakarta.validation-api-2.0.2.jar
- javax.validation-validation-api-1.1.0.Final.jar
* Log4J
- - org.apache.logging.log4j-log4j-api-2.23.1.jar
- - org.apache.logging.log4j-log4j-core-2.23.1.jar
- - org.apache.logging.log4j-log4j-slf4j2-impl-2.23.1.jar
- - org.apache.logging.log4j-log4j-web-2.23.1.jar
+ - org.apache.logging.log4j-log4j-api-2.25.2.jar
+ - org.apache.logging.log4j-log4j-core-2.25.2.jar
+ - org.apache.logging.log4j-log4j-slf4j2-impl-2.25.2.jar
+ - org.apache.logging.log4j-log4j-web-2.25.2.jar
* Java Native Access JNA
- net.java.dev.jna-jna-jpms-5.12.1.jar
- net.java.dev.jna-jna-platform-jpms-5.12.1.jar
@@ -484,8 +484,8 @@ The Apache Software License, Version 2.0
* Prometheus
- io.prometheus-simpleclient_httpserver-0.16.0.jar
* Oxia
- - io.github.oxia-db-oxia-client-api-0.7.0.jar
- - io.github.oxia-db-oxia-client-0.7.0.jar
+ - io.github.oxia-db-oxia-client-api-0.7.2.jar
+ - io.github.oxia-db-oxia-client-0.7.2.jar
* OpenHFT
- net.openhft-zero-allocation-hashing-0.16.jar
* Java JSON WebTokens
@@ -562,8 +562,8 @@ BSD 2-Clause License
MIT License
* Java SemVer -- com.github.zafarkhaja-java-semver-0.9.0.jar -- ../licenses/LICENSE-SemVer.txt
* SLF4J -- ../licenses/LICENSE-SLF4J.txt
- - org.slf4j-slf4j-api-2.0.13.jar
- - org.slf4j-jcl-over-slf4j-2.0.13.jar
+ - org.slf4j-slf4j-api-2.0.17.jar
+ - org.slf4j-jcl-over-slf4j-2.0.17.jar
* The Checker Framework
- org.checkerframework-checker-qual-3.33.0.jar
* oshi
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt
index fc23b0d4fcc32..9a84223ee9240 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -380,10 +380,10 @@ The Apache Software License, Version 2.0
- simpleclient_tracer_otel-0.16.0.jar
- simpleclient_tracer_otel_agent-0.16.0.jar
* Log4J
- - log4j-api-2.23.1.jar
- - log4j-core-2.23.1.jar
- - log4j-slf4j2-impl-2.23.1.jar
- - log4j-web-2.23.1.jar
+ - log4j-api-2.25.2.jar
+ - log4j-core-2.25.2.jar
+ - log4j-slf4j2-impl-2.25.2.jar
+ - log4j-web-2.25.2.jar
* OpenTelemetry
- opentelemetry-api-1.45.0.jar
- opentelemetry-api-incubator-1.45.0-alpha.jar
@@ -425,7 +425,7 @@ BSD 3-clause "New" or "Revised" License
MIT License
* SLF4J -- ../licenses/LICENSE-SLF4J.txt
- - slf4j-api-2.0.13.jar
+ - slf4j-api-2.0.17.jar
* The Checker Framework
- checker-qual-3.33.0.jar
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 123ef9dee8de8..b7fc294229c51 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1488,12 +1488,16 @@ public void operationComplete() {
// modify mark delete and read position since we are able to persist new position for cursor
lock.writeLock().lock();
try {
- if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
+ // Correct the variable "messagesConsumedCounter".
+ // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
+ // old one.
+ int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition);
+ if (compareRes > 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
- Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
- } else {
+ Range.openClosed(newMarkDeletePosition, markDeletePosition)));
+ } else if (compareRes < 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries(
- Range.closedOpen(markDeletePosition, newMarkDeletePosition)));
+ Range.openClosed(markDeletePosition, newMarkDeletePosition)));
}
markDeletePosition = newMarkDeletePosition;
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0769a8a8b59ba..a0a27b93bc6b5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3703,6 +3703,32 @@ private CompletableFuture completeLedgerInfoForOffloaded(long ledgerId, UU
});
}
+ /**
+ * Compare two positions. It is different with {@link Position#compareTo(Position)} when the params are invalid.
+ * For example: position-1 is "1:{latest entry}", and position-2 is "2:-1", they are the same position.
+ */
+ @VisibleForTesting
+ int comparePositions(Position pos1, Position pos2) {
+ if (pos1 == null || pos2 == null) {
+ throw new IllegalArgumentException("Positions must not be null");
+ }
+ if (ledgers.isEmpty() || pos1.getLedgerId() < getFirstPosition().getLedgerId()
+ || pos2.getLedgerId() < getFirstPosition().getLedgerId()
+ || pos1.getLedgerId() > getLastPosition().getLedgerId()
+ || pos2.getLedgerId() > getLastPosition().getLedgerId()) {
+ log.warn("[{}] Comparing un-exist position {} and {}", name, pos1, pos2,
+ new IllegalArgumentException("Comparing un-exist position"));
+ return pos1.compareTo(pos2);
+ }
+ if (pos1.getLedgerId() == pos2.getLedgerId()) {
+ return Long.compare(pos1.getEntryId(), pos2.getEntryId());
+ }
+ if (!isValidPosition(pos1) || !isValidPosition(pos2)) {
+ return getNextValidPosition(pos1).compareTo(getNextValidPosition(pos2));
+ }
+ return pos1.compareTo(pos2);
+ }
+
/**
* Get the number of entries between a contiguous range of two positions.
*
@@ -3715,41 +3741,77 @@ public long getNumberOfEntries(Range range) {
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
Position toPosition = range.upperEndpoint();
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
+ if (comparePositions(fromPosition, toPosition) > 0) {
+ log.warn("[{}] Getting number of entries with an invalid range {} and {}", name, fromPosition, toPosition);
+ throw new IllegalArgumentException("Invalid range " + range);
+ }
+
+ // 1. If the "fromPosition" is after "toPosition", then there is no entry in the range.
+ // 2. If both "formPosition" and "toPosition" have negative entry id amd in the same ledger, then there is no
+ // entry in the range.
+ if (fromPosition.getLedgerId() > toPosition.getLedgerId()
+ || (fromPosition.getLedgerId() == toPosition.getLedgerId()
+ && fromPosition.getEntryId() > toPosition.getEntryId())
+ || (fromPosition.getLedgerId() == toPosition.getLedgerId()
+ && fromPosition.getEntryId() < 0 && toPosition.getEntryId() < 0)) {
+ return 0;
+ }
+ // If the 2 positions are in the same ledger.
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
if (li != null) {
// If the 2 positions are in the same ledger
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
- count += fromIncluded ? 1 : 0;
- count += toIncluded ? 1 : 0;
+ count += fromIncluded && fromPosition.getEntryId() >= 0 ? 1 : 0;
+ count += toIncluded && toPosition.getEntryId() >= 0 ? 1 : 0;
return count;
} else {
// if the ledgerId is not in the ledgers, it means it has been deleted
return 0;
}
- } else {
- long count = 0;
- // If the from & to are pointing to different ledgers, then we need to :
- // 1. Add the entries in the ledger pointed by toPosition
- count += toPosition.getEntryId();
+ }
+
+ // If the "fromPosition.ledgerId" is larger than "toPosition.ledgerId".
+ // 1. Add the entries in the ledger pointed by toPosition.
+ // 2. Add the entries in the ledger pointed by fromPosition.
+ // 3. Add the whole ledgers entries in between.
+ long count = 0;
+
+ // 1. Add the entries in the ledger pointed by toPosition.
+ // Add nothing if "toPosition" does not exit in "ledgers".
+ // Add nothing if "toPosition.entryId < 0".
+ LedgerInfo toLedger = ledgers.get(toPosition.getLedgerId());
+ if (toPosition.getEntryId() >= 0 && toLedger != null) {
+ // To support the use case "cursor.getNumberOfEntries()", which will use a "toPosition" that is larger
+ // than the LAC.
+ // To support this case, use "Long.MAX_VALUE" if the ledger is the last one.
+ long entriesInLedger = comparePositions(toPosition, lastConfirmedEntry) >= 0
+ ? Long.MAX_VALUE : toLedger.getEntries();
+ count += Math.min(toPosition.getEntryId(), entriesInLedger - 1);
count += toIncluded ? 1 : 0;
+ }
- // 2. Add the entries in the ledger pointed by fromPosition
- LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
- if (li != null) {
- count += li.getEntries() - (fromPosition.getEntryId() + 1);
+ // 2. Add the entries in the ledger pointed by fromPosition.
+ // Add nothing if "toPosition.entryId < 0".
+ // Add nothing if "toPosition" does not exit in "ledgers".
+ LedgerInfo formLedger = ledgers.get(fromPosition.getLedgerId());
+ if (formLedger != null) {
+ if (fromPosition.getEntryId() < 0) {
+ count += formLedger.getEntries();
+ } else {
+ count += formLedger.getEntries() - (fromPosition.getEntryId() + 1);
count += fromIncluded ? 1 : 0;
}
+ }
- // 3. Add the whole ledgers entries in between
- for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
- .values()) {
- count += ls.getEntries();
- }
-
- return count;
+ // 3. Add the whole ledgers entries in between
+ for (LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false)
+ .values()) {
+ count += ls.getEntries();
}
+
+ return count;
}
/**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
index c4d3b076ba3c5..a4895b2624bd0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -33,7 +33,7 @@ public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
private static final Charset Encoding = StandardCharsets.UTF_8;
- @Test(timeOut = 20000)
+ @Test(timeOut = 20000 * 1000)
void testMultiPositionDelete() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a78821b646a0d..3be3e6af61a29 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -4633,4 +4633,335 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
cursor.close();
ledger.close();
}
+
+ @Test
+ public void testGetNumberOfEntriesWithRangeParam() throws Exception {
+ final String ledgerName = "ml_" + UUID.randomUUID().toString().replaceAll("-", "");
+ final String cursorName = "test-cursor";
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, config);
+ // Create a cursor to avoid entries being trimmed.
+ ml.openCursor(cursorName);
+ int totalEntries = 35;
+ List positions = new ArrayList<>(totalEntries);
+ for (int i = 0; i < totalEntries; i++) {
+ Position pos = ml.addEntry(("entry-" + i).getBytes());
+ positions.add(pos);
+ }
+ Iterator iterator = ml.getLedgersInfo().values().iterator();
+ LedgerInfo ledger1 = iterator.next();
+ LedgerInfo ledger2 = iterator.next();
+ LedgerInfo ledger3 = iterator.next();
+ LedgerInfo ledger4 = iterator.next();
+ assertEquals(ledger1.getEntries(), 10);
+ assertEquals(ledger2.getEntries(), 10);
+ assertEquals(ledger3.getEntries(), 10);
+ assertEquals(ledger4.getLedgerId(), ml.getCurrentLedger().getId());
+
+ // Normal case: same ledger.
+ Range range11 = Range.closed(positions.get(0), positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range11), 10);
+ Range range12 = Range.openClosed(positions.get(1), positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range12), 8);
+ Range range13 = Range.closedOpen(positions.get(2), positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range13), 7);
+
+ // Normal case: crosses ledgers.
+ Range range21 = Range.closed(positions.get(0), positions.get(19));
+ assertEquals(ml.getNumberOfEntries(range21), 20);
+ Range range22 = Range.openClosed(positions.get(0), positions.get(19));
+ assertEquals(ml.getNumberOfEntries(range22), 19);
+ Range range23 = Range.closedOpen(positions.get(0), positions.get(19));
+ assertEquals(ml.getNumberOfEntries(range23), 19);
+ Range range24 = Range.closed(positions.get(0), positions.get(29));
+ assertEquals(ml.getNumberOfEntries(range24), 30);
+ Range range25 = Range.openClosed(positions.get(0), positions.get(29));
+ assertEquals(ml.getNumberOfEntries(range25), 29);
+ Range range26 = Range.closedOpen(positions.get(0), positions.get(29));
+ assertEquals(ml.getNumberOfEntries(range26), 29);
+
+ // Normal case: end with current ledger.
+ Range range27 = Range.closed(positions.get(0), positions.get(34));
+ assertEquals(ml.getNumberOfEntries(range27), 35);
+ // Cover the following case.
+ // The use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry
+ // id that is larger than the LAC.
+ Range range28 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range28), 131);
+
+ // From position that entry id is "-1" & positions in the same ledger.
+ Range range31 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range31), 10);
+ Range range32 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range32), 10);
+ Range range33 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(9));
+ assertEquals(ml.getNumberOfEntries(range33), 9);
+
+ // From position that entry id is "-1" & crosses ledgers.
+ Range range41 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(15));
+ assertEquals(ml.getNumberOfEntries(range41), 16);
+ Range range42 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(15));
+ assertEquals(ml.getNumberOfEntries(range42), 16);
+ Range range43 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(15));
+ assertEquals(ml.getNumberOfEntries(range43), 15);
+ Range range44 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(25));
+ assertEquals(ml.getNumberOfEntries(range44), 26);
+ Range range45 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(25));
+ assertEquals(ml.getNumberOfEntries(range45), 26);
+ Range range46 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(25));
+ assertEquals(ml.getNumberOfEntries(range46), 25);
+
+ // Invalid range.
+ try {
+ Range.closed(positions.get(1), PositionFactory.create(ledger1.getLedgerId(), -1));
+ fail("Should have failed because the range is invalid.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("Invalid range"));
+ }
+ try {
+ Range.closed(positions.get(29), positions.get(0));
+ fail("Should have failed because the range is invalid.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("Invalid range"));
+ }
+
+ // "To position" that entry id is "-1" & crosses ledgers.
+ Range range61 = Range.closed(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range61), 9);
+ Range range62 = Range.closedOpen(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range62), 9);
+ Range range63 = Range.openClosed(positions.get(1), PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range63), 8);
+ Range range64 = Range.closed(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range64), 19);
+ Range range65 = Range.closedOpen(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range65), 19);
+ Range range66 = Range.openClosed(positions.get(1), PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range66), 18);
+
+ // "From position" is the latest entry of a ledger.
+ Range range71 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10));
+ assertEquals(ml.getNumberOfEntries(range71), 2);
+ Range range72 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10));
+ assertEquals(ml.getNumberOfEntries(range72), 1);
+ Range range73 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9), positions.get(10));
+ assertEquals(ml.getNumberOfEntries(range73), 1);
+
+ // "From position" is the latest entry of a ledger, and "to position" has a negative entry id.
+ Range range81 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range81), 1);
+ Range range82 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range82), 0);
+ Range range83 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range83), 1);
+
+ // "From position" is the latest entry of a ledger, and "to position" has a negative entry id & crosses ledgers.
+ Range range91 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range91), 11);
+ Range range92 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range92), 10);
+ Range range93 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range93), 11);
+
+ // "To Position" is larger than LAC.
+ Range range101 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range101), 21);
+ Range range102 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range102), 20);
+ Range range103 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range103), 20);
+
+ // "From position" is smaller than the first one.
+ Range range111 = Range.closed(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range111), 30);
+ Range range112 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range112), 30);
+ Range range113 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId() - 1, 9),
+ PositionFactory.create(ledger3.getLedgerId(), 100));
+ assertEquals(ml.getNumberOfEntries(range113), 29);
+
+ // Both "fromPosition" and "toPosition" have negative entry id & in the same ledger.
+ Range range121 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ PositionFactory.create(ledger1.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range121), 0);
+ Range range122 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger1.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range122), 0);
+ // Both "fromPosition" and "toPosition" have negative entry id & crosses ledgers.
+ Range range123 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range123), 10);
+ Range range124 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range124), 20);
+ Range range125 = Range.closed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1000));
+ assertEquals(ml.getNumberOfEntries(range125), 20);
+ Range range126 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -1),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range126), 10);
+ Range range127 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range127), 20);
+ Range range128 = Range.openClosed(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1000));
+ assertEquals(ml.getNumberOfEntries(range128), 20);
+ Range range129 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -1),
+ PositionFactory.create(ledger2.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range129), 10);
+ Range range1210 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1));
+ assertEquals(ml.getNumberOfEntries(range1210), 20);
+ Range range1211 = Range.closedOpen(PositionFactory.create(ledger1.getLedgerId(), -10),
+ PositionFactory.create(ledger3.getLedgerId(), -1000));
+ assertEquals(ml.getNumberOfEntries(range1211), 20);
+ try {
+ Range.openClosed(PositionFactory.create(ledger2.getLedgerId(), -10),
+ PositionFactory.create(ledger1.getLedgerId(), -1));
+ fail("Should have failed because the range is invalid.");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().contains("Invalid range"));
+ }
+
+ // Cover the following case.
+ // The use case "cursor.getNumberOfEntries()", which will use a "toPosition" that with an entry
+ // id that is larger than the LAC.
+ // The difference with above one: the LAC is not in the latest ledger.
+ ml.close();
+ ManagedLedgerImpl ml2 = (ManagedLedgerImpl) factory.open(ledgerName, config);
+ assertNotEquals(ledger4.getLedgerId(), ml2.currentLedger.getId());
+ Range range131 = Range.closed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(), 100));
+ assertEquals(ml2.getNumberOfEntries(range131), 131);
+ Range range132 = Range.openClosed(positions.get(0), PositionFactory.create(ledger4.getLedgerId(),
+ 100));
+ assertEquals(ml2.getNumberOfEntries(range132), 130);
+ Range range133 = Range.closedOpen(positions.get(0), PositionFactory.create(ledger4.getLedgerId(),
+ 100));
+ assertEquals(ml2.getNumberOfEntries(range133), 130);
+
+ // cleanup.
+ ml2.delete();
+ }
+
+ @Test
+ public void testComparePositions() throws Exception {
+ final String ledgerName = "ml_" + UUID.randomUUID().toString().replaceAll("-", "");
+ final String cursorName = "test-cursor";
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(ledgerName, config);
+ // Create a cursor to avoid entries being trimmed.
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor(cursorName);
+ int totalEntries = 30;
+ List positions = new ArrayList<>(totalEntries);
+ for (int i = 0; i < totalEntries; i++) {
+ Position pos = ml.addEntry(("entry-" + i).getBytes());
+ positions.add(pos);
+ }
+ Iterator iterator = ml.getLedgersInfo().values().iterator();
+ LedgerInfo ledger1 = iterator.next();
+ LedgerInfo ledger2 = iterator.next();
+ LedgerInfo ledger3 = iterator.next();
+ assertEquals(ledger1.getEntries(), 10);
+ assertEquals(ledger2.getEntries(), 10);
+
+ // Normal case: pos1 == pos2.
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(0)), 0);
+ assertEquals(ml.comparePositions(positions.get(9), positions.get(9)), 0);
+ assertEquals(ml.comparePositions(positions.get(29), positions.get(29)), 0);
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1),
+ PositionFactory.create(ledger2.getLedgerId(), -1)), 0);
+
+ // Normal case: pos1 < pos2.
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(1)), -1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(9)), -1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(10)), -1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(19)), -1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(20)), -1);
+ assertEquals(ml.comparePositions(positions.get(0), positions.get(29)), -1);
+
+ // Normal case: pos1 > pos2.
+ assertEquals(ml.comparePositions(positions.get(1), positions.get(0)), 1);
+ assertEquals(ml.comparePositions(positions.get(9), positions.get(0)), 1);
+ assertEquals(ml.comparePositions(positions.get(10), positions.get(0)), 1);
+ assertEquals(ml.comparePositions(positions.get(19), positions.get(0)), 1);
+ assertEquals(ml.comparePositions(positions.get(20), positions.get(0)), 1);
+ assertEquals(ml.comparePositions(positions.get(29), positions.get(0)), 1);
+
+ // Pos1 has negative entry id & both positions in the same ledger.
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(0)), -1);
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1),
+ positions.get(10)), -1);
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1),
+ positions.get(20)), -1);
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId(), -1),
+ positions.get(0)), -1);
+ // Pos1 has negative entry id & crosses ledgers.
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1),
+ positions.get(0)), 1);
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1),
+ positions.get(0)), 1);
+ // Pos1 has negative entry id & the same value.
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger2.getLedgerId(), -1), positions.get(9)),
+ 0);
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId(), -1), positions.get(19)),
+ 0);
+
+ // Pos2 has negative entry id & both positions in the same ledger.
+ assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)),
+ 1);
+ assertEquals(ml.comparePositions(positions.get(10), PositionFactory.create(ledger2.getLedgerId(), -1)),
+ 1);
+ assertEquals(ml.comparePositions(positions.get(20), PositionFactory.create(ledger3.getLedgerId(), -1)),
+ 1);
+ assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger1.getLedgerId(), -1)),
+ 1);
+ // Pos2 has negative entry id & crosses ledgers.
+ assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger2.getLedgerId(), -1)),
+ -1);
+ assertEquals(ml.comparePositions(positions.get(0), PositionFactory.create(ledger3.getLedgerId(), -1)),
+ -1);
+ // Pos2 has negative entry id & the same value.
+ assertEquals(ml.comparePositions(positions.get(9), PositionFactory.create(ledger2.getLedgerId(), -1)),
+ 0);
+ assertEquals(ml.comparePositions(positions.get(19), PositionFactory.create(ledger3.getLedgerId(), -1)),
+ 0);
+
+ // Pos1 does not exist in ledgers.
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger1.getLedgerId() - 1, 100),
+ positions.get(0)), -1);
+ assertEquals(ml.comparePositions(PositionFactory.create(ledger3.getLedgerId() + 1, 0),
+ positions.get(29)), 1);
+
+ // Pos2 does not exist in ledgers.
+ assertEquals(ml.comparePositions(positions.get(0),
+ PositionFactory.create(ledger1.getLedgerId() - 1, 100)), 1);
+ assertEquals(ml.comparePositions(positions.get(29),
+ PositionFactory.create(ledger3.getLedgerId() + 1, 0)), -1);
+
+ // cleanup.
+ ml.delete();
+ }
}
diff --git a/pom.xml b/pom.xml
index 3490ddcc8e9f2..2164c7232ab07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -194,9 +194,9 @@ flexible messaging model and an intuitive client API.
0.16.0
4.5.22
7.9.2
- 2.0.13
+ 2.0.17
4.4
- 2.23.1
+ 2.25.2
1.78.1
1.81
@@ -294,7 +294,7 @@ flexible messaging model and an intuitive client API.
4.5.13
4.4.15
0.7.7
- 0.7.0
+ 0.7.2
2.0
1.10.12
5.5.0
@@ -315,9 +315,9 @@ flexible messaging model and an intuitive client API.
3.3.2
- 1.20.4
+ 1.21.3
- 3.4.0
+ 3.4.2
2.2
5.4.0
1.1.1
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 27d605f81d107..4d28c5c76cf83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -796,8 +796,7 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof PreconditionFailedException) {
asyncResponse.resume(
- new RestException(Status.PRECONDITION_FAILED,
- "Topic has active producers/subscriptions"));
+ new RestException(Status.PRECONDITION_FAILED, realCause.getMessage()));
} else if (realCause instanceof WebApplicationException){
asyncResponse.resume(realCause);
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
@@ -2221,7 +2220,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
- subscriptionName, targetMessageId, false, properties)
+ subscriptionName, targetMessageId, replicated, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9bb851b24307c..ab8bcad833654 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2760,7 +2760,7 @@ protected void handleNewTxn(CommandNewTxn command) {
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
- log.debug("Send response {} for new txn request {}", tcId.getId(), requestId);
+ log.debug("Send response {} for new txn request {}", txnID, requestId);
}
commandSender.sendNewTxnResponse(requestId, txnID, tcId.getId());
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bd503f896bfda..7a8c047b244e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1511,8 +1511,23 @@ private CompletableFuture delete(boolean failIfHasSubscriptions,
"Topic has " + producers.size() + " connected producers"));
}
} else if (currentUsageCount() > 0) {
- return FutureUtil.failedFuture(new TopicBusyException(
- "Topic has " + currentUsageCount() + " connected producers/consumers"));
+ StringBuilder errorMsg = new StringBuilder("Topic has");
+ errorMsg.append(" ").append(currentUsageCount())
+ .append(currentUsageCount() == 1 ? " client" : " clients").append(" connected");
+ long consumerCount = subscriptions.values().stream().map(sub -> sub.getConsumers().size())
+ .reduce(0, Integer::sum);
+ long replicatorCount = 0;
+ long producerCount = 0;
+ if (!producers.isEmpty()) {
+ replicatorCount = producers.values().stream().filter(Producer::isRemote).count();
+ if (producers.size() > replicatorCount) {
+ producerCount = producers.size() - replicatorCount;
+ }
+ }
+ errorMsg.append(" Including").append(" ").append(consumerCount).append(" consumers,")
+ .append(" ").append(producerCount).append(" producers,").append(" and")
+ .append(" ").append(replicatorCount).append(" replicators.");
+ return FutureUtil.failedFuture(new TopicBusyException(errorMsg.toString()));
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index fb45094c56943..41da87dd16566 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -201,8 +201,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar
if (forceDelete) {
throw e;
}
- assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")
- || e.getMessage().contains("connected producers/consumers"));
+ assertTrue(e.getMessage().contains("Topic has"));
}
final String[] expectedEvents;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a4b3ec2aa8015..55a0b8cb85e3d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -326,6 +326,11 @@ public Object[][] isV1() {
return new Object[][] { { true }, { false } };
}
+ @DataProvider(name = "trueFalse")
+ public static Object[][] trueFalse() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
/**
* It verifies http error code when updating partitions to ensure compatibility.
@@ -2318,6 +2323,27 @@ public void testInvalidBundleErrorResponse() throws Exception {
}
}
+ @Test(dataProvider = "trueFalse")
+ public void testCreateReplicatedSubscriptionForPartitionedTopic(boolean replicated) throws Exception {
+ final String topic = newUniqueName("persistent://" + defaultNamespace + "/topic");
+ admin.topics().createPartitionedTopic(topic, 10);
+ admin.topics().createSubscription(topic, "sub", MessageId.earliest, replicated);
+ for (int i = 0; i < 10; i++) {
+ String individualPartition = TopicName.get(topic).getPartition(i).toString();
+ TopicStats stats = admin.topics().getStats(individualPartition);
+ assertEquals(stats.getSubscriptions().get("sub").isReplicated(), replicated);
+ }
+ }
+
+ @Test(dataProvider = "trueFalse")
+ public void testCreateReplicatedSubscriptionForNonPartitionedTopic(boolean replicated) throws Exception {
+ final String topic = newUniqueName("persistent://" + defaultNamespace + "/topic");
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createSubscription(topic, "sub", MessageId.earliest, replicated);
+ TopicStats stats = admin.topics().getStats(topic);
+ assertEquals(stats.getSubscriptions().get("sub").isReplicated(), replicated);
+ }
+
@Test
public void testMaxSubscriptionsPerTopic() throws Exception {
restartClusterAfterTest();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 181cdd0288e27..f0da464560a09 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -132,6 +132,37 @@ public void cleanup() throws Exception {
super.cleanup();
}
+ @Test(timeOut = 45 * 1000)
+ public void testDeleteTopicWhenReplicating() throws Exception {
+ final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
+ Producer producer1 = client1.newProducer().topic(topicName1).create();
+ waitReplicatorStarted(topicName1);
+ try {
+ admin2.topics().delete(topicName1);
+ fail("Should fail to delete topic when replicating");
+ } catch (PulsarAdminException.PreconditionFailedException ex) {
+ assertTrue(ex.getMessage().contains("1 replicators"));
+ }
+
+ final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
+ admin1.topics().createPartitionedTopic(topicName2, 1);
+ Producer producer2 = client1.newProducer().topic(topicName2).create();
+ waitReplicatorStarted(TopicName.get(topicName2).getPartition(0).toString());
+ try {
+ admin2.topics().deletePartitionedTopic(topicName2);
+ fail("Should fail to delete topic when replicating");
+ } catch (PulsarAdminException.PreconditionFailedException ex) {
+ assertTrue(ex.getMessage().contains("1 replicators"));
+ }
+
+ producer1.close();
+ producer2.close();
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicName1);
+ admin2.topics().deletePartitionedTopic(topicName2);
+ });
+ }
+
@Test(timeOut = 45 * 1000)
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 07980dbf21156..9b8adfad57543 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -73,6 +73,11 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setDefaultNumPartitions(1);
}
+ @Test(enabled = false)
+ public void testDeleteTopicWhenReplicating() throws Exception {
+ super.testDeleteTopicWhenReplicating();
+ }
+
@Override
@Test(enabled = false)
public void testReplicatorProducerStatInTopic() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 7960f6304f598..cb92cfd0e75b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -83,8 +83,12 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setTransactionCoordinatorEnabled(true);
}
+ @Test(enabled = false)
+ public void testDeleteTopicWhenReplicating() throws Exception {
+ super.testDeleteTopicWhenReplicating();
+ }
- @Test(enabled = false)
+ @Test(enabled = false)
public void testReplicatorProducerStatInTopic() throws Exception {
super.testReplicatorProducerStatInTopic();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 8bfded83343d2..560f458f4eca6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -82,6 +82,7 @@
import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
@@ -5359,4 +5360,50 @@ public void testE2EEncryptionWithCompression() throws Exception {
producer.close();
consumer.close();
}
+
+ @DataProvider
+ public Object[][] trimLedgerBeforeGetStats() {
+ return new Object[][] {
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "trimLedgerBeforeGetStats")
+ public void testBacklogAfterCreatedSubscription(boolean trimLegderBeforeGetStats) throws Exception {
+ String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+ String mlName = TopicName.get(topic).getPersistenceNamingEncoding();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+ if (!trimLegderBeforeGetStats) {
+ config.setRetentionTime(3600, TimeUnit.SECONDS);
+ }
+ ManagedLedgerFactory factory = pulsar.getDefaultManagedLedgerFactory();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, config);
+ Producer producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 4; i++) {
+ producer.send("message-" + i);
+ Thread.sleep(1000);
+ }
+ producer.close();
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+ assertEquals(persistentTopic.getManagedLedger(), ml);
+
+ if (trimLegderBeforeGetStats) {
+ CompletableFuture trimLedgerFuture = new CompletableFuture<>();
+ ml.trimConsumedLedgersInBackground(trimLedgerFuture);
+ trimLedgerFuture.join();
+ assertEquals(ml.getLedgersInfo().size(), 1);
+ assertEquals(ml.getCurrentLedgerEntries(), 0);
+ }
+
+ admin.topics().createSubscription(topic, "sub1", MessageId.latest);
+ assertEquals(admin.topics().getStats(topic).getSubscriptions().get("sub1").getMsgBacklog(), 0);
+
+ // cleanup
+ admin.topics().delete(topic, false);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index f6b3f2124996d..1d5d3429fffe5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -962,7 +962,7 @@ public void testDeleteTopicAndSchemaForV1() throws Exception {
} catch (Exception e) {
assertThat(e.getMessage())
.isNotNull()
- .startsWith("Topic has 2 connected producers/consumers");
+ .startsWith("Topic has 2 clients");
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2);
@@ -972,7 +972,7 @@ public void testDeleteTopicAndSchemaForV1() throws Exception {
} catch (Exception e) {
assertThat(e.getMessage())
.isNotNull()
- .startsWith("Topic has active producers/subscriptions");
+ .startsWith("Topic has 1 client");
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 1);
@@ -1055,7 +1055,7 @@ public void testDeleteTopicAndSchemaForV2() throws Exception {
admin.topics().delete(topicOne, false);
fail();
} catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Topic has 2 connected producers/consumers"));
+ assertTrue(e.getMessage().startsWith("Topic has 2 clients"));
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topicOne).getSchemaName()).get().size(), 2);
@@ -1063,7 +1063,7 @@ public void testDeleteTopicAndSchemaForV2() throws Exception {
admin.topics().deletePartitionedTopic(topicTwo, false);
fail();
} catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions"));
+ assertTrue(e.getMessage().startsWith("Topic has 1 client"));
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topicTwo).getSchemaName()).get().size(), 1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index efee2fbe06f6a..041f944feb479 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -29,7 +29,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.Errors.NativeIoException;
-import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.concurrent.Promise;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
@@ -1434,18 +1433,6 @@ public void close() {
}
}
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof SslHandshakeCompletionEvent) {
- SslHandshakeCompletionEvent sslHandshakeCompletionEvent = (SslHandshakeCompletionEvent) evt;
- if (sslHandshakeCompletionEvent.cause() != null) {
- log.warn("{} Got ssl handshake exception {}", ctx.channel(),
- sslHandshakeCompletionEvent);
- }
- }
- ctx.fireUserEventTriggered(evt);
- }
-
protected void closeWithException(Throwable e) {
if (ctx != null) {
connectionFuture.completeExceptionally(e);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 41658e62f1b78..41febb4154849 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -24,6 +24,8 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.ssl.SslCloseCompletionEvent;
+import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAckResponse;
@@ -749,4 +751,26 @@ protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose comma
private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
}
+
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+ if (evt instanceof SslHandshakeCompletionEvent) {
+ // log handshake failures
+ SslHandshakeCompletionEvent sslHandshakeCompletionEvent = (SslHandshakeCompletionEvent) evt;
+ if (!sslHandshakeCompletionEvent.isSuccess()) {
+ log.warn("[{}] TLS handshake failed. {}", ctx.channel(), sslHandshakeCompletionEvent);
+ }
+ } else if (evt instanceof SslCloseCompletionEvent) {
+ // handle TLS close_notify event and immediately close the channel
+ // this is not handled by Netty by default
+ // See https://datatracker.ietf.org/doc/html/rfc8446#section-6.1 for more details
+ SslCloseCompletionEvent sslCloseCompletionEvent = (SslCloseCompletionEvent) evt;
+ if (sslCloseCompletionEvent.isSuccess() && ctx.channel().isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received a TLS close_notify, closing the channel.", ctx.channel());
+ }
+ ctx.close();
+ }
+ }
+ ctx.fireUserEventTriggered(evt);
+ }
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PoliciesDataTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PoliciesDataTest.java
index b4ee826fbbef6..3b5e3ceecd9df 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PoliciesDataTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PoliciesDataTest.java
@@ -18,12 +18,12 @@
*/
package org.apache.pulsar.common.policies.data;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
@@ -71,7 +71,7 @@ public void propertyAdmin() {
assertNotEquals(TenantInfo.builder().build(), pa1);
assertNotEquals(TenantInfo.builder().adminRoles(Sets.newHashSet("role1", "role3"))
.allowedClusters(Sets.newHashSet("usc")).build(), pa1);
- assertEquals(pa1.getAdminRoles(), Lists.newArrayList("role1", "role2"));
+ assertThat(pa1.getAdminRoles()).containsExactlyInAnyOrder("role1", "role2");
}
@Test
diff --git a/pulsar-function-go/examples/go.mod b/pulsar-function-go/examples/go.mod
index 4d1320624d936..a25bb35f12764 100644
--- a/pulsar-function-go/examples/go.mod
+++ b/pulsar-function-go/examples/go.mod
@@ -17,7 +17,7 @@ require (
github.com/bits-and-blooms/bitset v1.4.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
- github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
+ github.com/dvsekhvalnov/jose2go v1.7.0 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
diff --git a/pulsar-function-go/examples/go.sum b/pulsar-function-go/examples/go.sum
index 537c605187e6d..fedd6ac7d28a2 100644
--- a/pulsar-function-go/examples/go.sum
+++ b/pulsar-function-go/examples/go.sum
@@ -52,8 +52,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
-github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
-github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
+github.com/dvsekhvalnov/jose2go v1.7.0 h1:bnQc8+GMnidJZA8zc6lLEAb4xNrIqHwO+9TzqvtQZPo=
+github.com/dvsekhvalnov/jose2go v1.7.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod
index a191c39ad9db2..1c9e608b3e877 100644
--- a/pulsar-function-go/go.mod
+++ b/pulsar-function-go/go.mod
@@ -25,7 +25,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
- github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
+ github.com/dvsekhvalnov/jose2go v1.7.0 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
index 537c605187e6d..fedd6ac7d28a2 100644
--- a/pulsar-function-go/go.sum
+++ b/pulsar-function-go/go.sum
@@ -52,8 +52,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
-github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
-github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
+github.com/dvsekhvalnov/jose2go v1.7.0 h1:bnQc8+GMnidJZA8zc6lLEAb4xNrIqHwO+9TzqvtQZPo=
+github.com/dvsekhvalnov/jose2go v1.7.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index 7ee9856202280..95244c96cec28 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -249,4 +249,14 @@
+
+
+
+ confluent
+ https://packages.confluent.io/maven/
+
+ false
+
+
+
diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index 290a827f7073f..ec32556716f18 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -159,4 +159,14 @@
+
+
+
+ confluent
+ https://packages.confluent.io/maven/
+
+ false
+
+
+
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
index 18d2dd77b7d46..12f599d779d30 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
@@ -32,7 +32,7 @@ public class OxiaContainer extends ChaosContainer {
public static final int METRICS_PORT = 8080;
private static final int DEFAULT_SHARDS = 1;
- private static final String DEFAULT_IMAGE_NAME = "streamnative/oxia:main";
+ private static final String DEFAULT_IMAGE_NAME = "oxia/oxia:main";
public OxiaContainer(String clusterName) {
this(clusterName, DEFAULT_IMAGE_NAME, DEFAULT_SHARDS);