Skip to content

Commit 9e9bfbf

Browse files
committed
Add flow control test with in-memory queue
1 parent ec384d7 commit 9e9bfbf

File tree

1 file changed

+41
-0
lines changed

1 file changed

+41
-0
lines changed

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,47 @@ void asynchronousProcessingWithFlowControl() {
266266
}
267267
}
268268

269+
@Test
270+
void asynchronousProcessingWithInMemoryQueue(TestInfo info) {
271+
int messageCount = 100_000;
272+
publishAndWaitForConfirms(cf, messageCount, stream);
273+
274+
CountDownLatch latch = new CountDownLatch(messageCount);
275+
BlockingQueue<Tuples.Pair<MessageHandler.Context, Message>> queue =
276+
new ArrayBlockingQueue<>(10_000);
277+
Thread t =
278+
ThreadUtils.newInternalThread(
279+
info.getTestMethod().get().getName(),
280+
() -> {
281+
try {
282+
while (!Thread.currentThread().isInterrupted()) {
283+
Tuples.Pair<MessageHandler.Context, Message> item =
284+
queue.poll(10, TimeUnit.SECONDS);
285+
if (item != null) {
286+
latch.countDown();
287+
item.v1().processed();
288+
}
289+
}
290+
} catch (InterruptedException e) {
291+
// finish the thread
292+
}
293+
});
294+
t.start();
295+
296+
try {
297+
environment.consumerBuilder().stream(stream)
298+
.offset(OffsetSpecification.first())
299+
.flow()
300+
.strategy(creditWhenHalfMessagesProcessed(1))
301+
.builder()
302+
.messageHandler((ctx, message) -> queue.add(Tuples.pair(ctx, message)))
303+
.build();
304+
org.assertj.core.api.Assertions.assertThat(latch).is(completed());
305+
} finally {
306+
t.interrupt();
307+
}
308+
}
309+
269310
@Test
270311
void closeOnCondition() throws Exception {
271312
int messageCount = 50_000;

0 commit comments

Comments
 (0)