Skip to content

Commit a4a3409

Browse files
poorbarcodelhotari
andauthored
[fix][client] Fix incorrect producer.getPendingQueueSize due to incomplete queue implementation (#24184)
Co-authored-by: Lari Hotari <[email protected]> Co-authored-by: Lari Hotari <[email protected]>
1 parent bb2e4ab commit a4a3409

File tree

3 files changed

+80
-4
lines changed

3 files changed

+80
-4
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.Executors;
4949
import java.util.concurrent.TimeUnit;
5050
import java.util.concurrent.atomic.AtomicInteger;
51+
import java.util.concurrent.atomic.AtomicReference;
5152
import java.util.regex.Pattern;
5253
import java.util.stream.Collectors;
5354
import lombok.Cleanup;
@@ -66,6 +67,7 @@
6667
import org.apache.pulsar.client.admin.PulsarAdminException;
6768
import org.apache.pulsar.client.api.Consumer;
6869
import org.apache.pulsar.client.api.Message;
70+
import org.apache.pulsar.client.api.MessageId;
6971
import org.apache.pulsar.client.api.Producer;
7072
import org.apache.pulsar.client.api.PulsarClientException;
7173
import org.apache.pulsar.client.api.Schema;
@@ -77,6 +79,7 @@
7779
import org.apache.pulsar.client.api.schema.SchemaDefinition;
7880
import org.apache.pulsar.client.impl.ConsumerImpl;
7981
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
82+
import org.apache.pulsar.client.impl.ProducerImpl;
8083
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
8184
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
8285
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -96,6 +99,7 @@
9699
import org.apache.pulsar.metadata.api.MetadataCache;
97100
import org.apache.pulsar.metadata.api.MetadataSerde;
98101
import org.apache.pulsar.metadata.api.Stat;
102+
import org.awaitility.Awaitility;
99103
import org.testng.Assert;
100104
import org.testng.annotations.AfterMethod;
101105
import org.testng.annotations.BeforeMethod;
@@ -1454,9 +1458,9 @@ public User(String name) {
14541458
}
14551459

14561460
/**
1457-
* This test validates that consumer/producers should recover on topic whose
1461+
* This test validates that consumer/producers should recover on topic whose
14581462
* schema ledgers are not able to open due to non-recoverable error.
1459-
*
1463+
*
14601464
* @throws Exception
14611465
*/
14621466
@Test
@@ -1524,6 +1528,31 @@ public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content
15241528
producer.close();
15251529
}
15261530

1531+
@Test
1532+
public void testPendingQueueSizeIfIncompatible() throws Exception {
1533+
final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
1534+
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
1535+
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
1536+
final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp");
1537+
admin.topics().createNonPartitionedTopic(topic);
1538+
1539+
ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
1540+
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
1541+
producer.newMessage(Schema.STRING).value("msg").sendAsync();
1542+
AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>();
1543+
for (int i = 0; i < 100; i++) {
1544+
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
1545+
}
1546+
Awaitility.await().untilAsserted(() -> {
1547+
assertTrue(latestSend.get().isDone());
1548+
assertEquals(producer.getPendingQueueSize(), 0);
1549+
});
1550+
1551+
// cleanup.
1552+
producer.close();
1553+
admin.topics().delete(topic, false);
1554+
}
1555+
15271556
@Test
15281557
public void testTopicSchemaMetadata() throws Exception {
15291558
final String tenant = PUBLIC_TENANT;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,7 +1734,8 @@ protected OpSendMsg newObject(Handle<OpSendMsg> handle) {
17341734
* This queue is not thread safe.
17351735
*/
17361736
protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
1737-
private final Queue<OpSendMsg> delegate = new ArrayDeque<>();
1737+
@VisibleForTesting
1738+
final Queue<OpSendMsg> delegate = new ArrayDeque<>();
17381739
private int forEachDepth = 0;
17391740
private List<OpSendMsg> postponedOpSendMgs;
17401741
private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1791,7 +1792,35 @@ public int messagesCount() {
17911792

17921793
@Override
17931794
public Iterator<OpSendMsg> iterator() {
1794-
return delegate.iterator();
1795+
Iterator<OpSendMsg> delegateIterator = delegate.iterator();
1796+
return new Iterator<OpSendMsg>() {
1797+
OpSendMsg currentOp;
1798+
1799+
@Override
1800+
public boolean hasNext() {
1801+
return delegateIterator.hasNext();
1802+
}
1803+
1804+
@Override
1805+
public OpSendMsg next() {
1806+
currentOp = delegateIterator.next();
1807+
return currentOp;
1808+
}
1809+
1810+
@Override
1811+
public void remove() {
1812+
delegateIterator.remove();
1813+
if (currentOp != null) {
1814+
messagesCount.addAndGet(-currentOp.numMessagesInBatch);
1815+
currentOp = null;
1816+
}
1817+
}
1818+
1819+
@Override
1820+
public void forEachRemaining(Consumer<? super OpSendMsg> action) {
1821+
delegateIterator.forEachRemaining(action);
1822+
}
1823+
};
17951824
}
17961825
}
17971826

pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.testng.Assert.assertEquals;
2424
import com.google.common.collect.Lists;
2525
import java.util.Arrays;
26+
import java.util.Iterator;
2627
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
2728
import org.testng.annotations.BeforeClass;
2829
import org.testng.annotations.Test;
@@ -83,4 +84,21 @@ public void shouldPostponeAddsAlsoInRecursiveCalls() {
8384
// then
8485
assertEquals(Lists.newArrayList(queue), Arrays.asList(opSendMsg, opSendMsg2, opSendMsg3, opSendMsg4));
8586
}
87+
88+
@Test
89+
public void testIteratorRemove() {
90+
ProducerImpl.OpSendMsgQueue queue = new ProducerImpl.OpSendMsgQueue();
91+
for (int i = 0; i < 10; i++) {
92+
queue.add(createDummyOpSendMsg());
93+
}
94+
95+
Iterator<ProducerImpl.OpSendMsg> iterator = queue.iterator();
96+
while (iterator.hasNext()) {
97+
iterator.next();
98+
iterator.remove();
99+
}
100+
// Verify: the result of "messagesCount()" is 0 after removed all items.
101+
assertEquals(queue.delegate.size(), 0);
102+
assertEquals(queue.messagesCount(), 0);
103+
}
86104
}

0 commit comments

Comments
 (0)