Skip to content

Commit 3c38000

Browse files
committed
Validate initial credits is at least n x 2
For flow strategy. The broker uses a send limit to "batch" TCP packets, which can conflict with this flow strategy and can block a consumer. The n parameter in the flow strategy must be lesser or equal to the half of initial credits. References #843
1 parent 7e0fc0b commit 3c38000

File tree

2 files changed

+19
-21
lines changed

2 files changed

+19
-21
lines changed

src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,6 @@ static ConsumerFlowStrategy creditOnChunkArrival() {
118118
*
119119
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
120120
*
121-
* <p>Consider using {@link #creditEveryNthChunk(int, int)} instead as it generates less network
122-
* traffic.
123-
*
124121
* @param initialCredits number of initial credits
125122
* @return flow strategy
126123
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
@@ -175,11 +172,12 @@ static ConsumerFlowStrategy creditOnProcessedMessageCount(int initialCredits, do
175172
* Strategy that provides the specified number of initial credits and <code>n</code> credits every
176173
* <code>n</code> chunks.
177174
*
178-
* <p>This strategy generates less network traffic than {@link
179-
* com.rabbitmq.stream.ConsumerFlowStrategy.CreditOnChunkArrivalConsumerFlowStrategy} and should
180-
* be used instead, unless <code>n</code> is equal to 1.
175+
* <p>This strategy can improve throughput for streams with small chunks (less than 30 messages
176+
* per chunk).
177+
*
178+
* <p>The number of initial credits must be at least twice as big as <code>n</code>.
181179
*
182-
* <p>A rule of thumb is to set <code>n</code> to half the value of initial credits.
180+
* <p>A rule of thumb is to set <code>n</code> to a third of the value of initial credits.
183181
*
184182
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
185183
*
@@ -195,9 +193,10 @@ static ConsumerFlowStrategy creditEveryNthChunk(int initialCredits, int n) {
195193
* Strategy that provides the specified number of initial credits and <code>n</code> credits every
196194
* <code>n</code> chunks.
197195
*
198-
* <p>This strategy generates less network traffic than {@link
199-
* com.rabbitmq.stream.ConsumerFlowStrategy.CreditOnChunkArrivalConsumerFlowStrategy} and should
200-
* be used instead, unless <code>n</code> is equal to 1.
196+
* <p>This strategy can improve throughput for streams with small chunks (less than 30 messages
197+
* per chunk).
198+
*
199+
* <p>The number of initial credits must be at least twice as big as <code>n</code>.
201200
*
202201
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
203202
*/
@@ -213,9 +212,9 @@ private CreditEveryNthChunkConsumerFlowStrategy(int initialCredits, int n) {
213212
if (n <= 0) {
214213
throw new IllegalArgumentException("The n argument must be greater than 0");
215214
}
216-
if (initialCredits <= n) {
215+
if (n * 2 > initialCredits) {
217216
throw new IllegalArgumentException(
218-
"The number of initial credits must be greater than the limit");
217+
"The number of initial credits must be at least twice as big as n");
219218
}
220219
this.initialCredits = initialCredits;
221220
this.n = n;

src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,26 @@
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2121

2222
import com.rabbitmq.stream.ConsumerFlowStrategy;
23+
import com.rabbitmq.stream.ConsumerFlowStrategy.Context;
2324
import java.util.concurrent.atomic.AtomicInteger;
24-
import org.junit.jupiter.api.Test;
2525
import org.junit.jupiter.params.ParameterizedTest;
2626
import org.junit.jupiter.params.provider.CsvSource;
2727

2828
public class CreditEveryNthChunkConsumerFlowStrategyTest {
2929

3030
AtomicInteger requestedCredits = new AtomicInteger();
3131

32-
@Test
33-
void invalidArguments() {
34-
assertThatThrownBy(() -> build(1, 1)).isInstanceOf(IllegalArgumentException.class);
35-
assertThatThrownBy(() -> build(1, 0)).isInstanceOf(IllegalArgumentException.class);
36-
assertThatThrownBy(() -> build(10, 0)).isInstanceOf(IllegalArgumentException.class);
32+
@ParameterizedTest
33+
@CsvSource({"10,6", "1,1", "1,0", "10,0"})
34+
void invalidArguments(int initialCredits, int limit) {
35+
assertThatThrownBy(() -> build(initialCredits, limit))
36+
.isInstanceOf(IllegalArgumentException.class);
3737
}
3838

3939
@ParameterizedTest
4040
@CsvSource({"10,5", "5,2", "2,1"})
4141
void test(int initialCredits, int limit) {
4242
ConsumerFlowStrategy strategy = build(initialCredits, limit);
43-
4443
range(0, limit - 1)
4544
.forEach(
4645
ignored -> {
@@ -55,9 +54,9 @@ ConsumerFlowStrategy build(int initial, int limit) {
5554
return creditEveryNthChunk(initial, limit);
5655
}
5756

58-
ConsumerFlowStrategy.Context context() {
57+
Context context() {
5958
requestedCredits.set(0);
60-
return new ConsumerFlowStrategy.Context() {
59+
return new Context() {
6160
@Override
6261
public void credits(int credits) {
6362
requestedCredits.addAndGet(credits);

0 commit comments

Comments
 (0)