From a067613ae4fee4a159816b4686d13093d47490bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 13 Jun 2025 16:34:52 +0200 Subject: [PATCH] Consider max unconfirmed messages in dynamic batch configuration A small value for max unconfirmed messages can impact the dynamic batch mechanism. This commit sets the min batch size to half the max unconfirmed messages value if it is less than the configured batch size. References #757 --- .../rabbitmq/stream/impl/DynamicBatch.java | 19 ++++++++++++------- .../impl/DynamicBatchMessageAccumulator.java | 7 +++++-- .../rabbitmq/stream/impl/ProducerUtils.java | 2 ++ .../rabbitmq/stream/impl/StreamProducer.java | 1 + .../stream/impl/DynamicBatchTest.java | 6 +++--- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 37494535b6..3c288d5428 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -28,17 +28,22 @@ final class DynamicBatch implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class); - private static final int MIN_BATCH_SIZE = 32; - private static final int MAX_BATCH_SIZE = 8192; + private static final int MIN_BATCH_SIZE = 16; private final BlockingQueue requests = new LinkedBlockingQueue<>(); private final BatchConsumer consumer; - private final int configuredBatchSize; + private final int configuredBatchSize, minBatchSize, maxBatchSize; private final Thread thread; - DynamicBatch(BatchConsumer consumer, int batchSize) { + DynamicBatch(BatchConsumer consumer, int batchSize, int maxUnconfirmed) { this.consumer = consumer; - this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); + if (batchSize < maxUnconfirmed) { + this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2); + } else { + this.minBatchSize = min(1, maxUnconfirmed / 2); + } + this.configuredBatchSize = batchSize; + this.maxBatchSize = batchSize * 2; this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop); this.thread.start(); } @@ -104,9 +109,9 @@ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { boolean completed = this.consumer.process(state.items); if (completed) { if (increaseIfCompleted) { - state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE); + state.batchSize = min(state.batchSize * 2, this.maxBatchSize); } else { - state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE); + state.batchSize = max(state.batchSize / 2, this.minBatchSize); } state.items = new ArrayList<>(state.batchSize); } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index ee8c397e13..8c763cde86 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { DynamicBatchMessageAccumulator( int subEntrySize, int batchSize, + int maxUnconfirmedMessages, Codec codec, int maxFrameSize, ToLongFunction publishSequenceFunction, @@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize); + batchSize, + maxUnconfirmedMessages); } else { byte compressionCode = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); @@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize * subEntrySize); + batchSize * subEntrySize, + maxUnconfirmedMessages); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java index 5ae8faa7dd..691fc65c57 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator( boolean dynamicBatch, int subEntrySize, int batchSize, + int maxUnconfirmedMessages, CompressionCodec compressionCodec, Codec codec, ByteBufAllocator byteBufAllocator, @@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator( return new DynamicBatchMessageAccumulator( subEntrySize, batchSize, + maxUnconfirmedMessages, codec, maxFrameSize, publishSequenceFunction, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 27552512c8..fd3cb8f25d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -180,6 +180,7 @@ public int fragmentLength(Object entity) { dynamicBatch, subEntrySize, batchSize, + maxUnconfirmedMessages, compressionCodec, environment.codec(), environment.byteBufAllocator(), diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index 50698320f8..07a2877385 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -71,7 +71,7 @@ void itemAreProcessed() { sync.down(items.size()); return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { RateLimiter rateLimiter = RateLimiter.create(10000); IntStream.range(0, itemCount) .forEach( @@ -102,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception { } return result; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { int firstRoundCount = itemCount / 5; IntStream.range(0, firstRoundCount) .forEach( @@ -132,7 +132,7 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception { return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, batchSize)) { + try (DynamicBatch batch = new DynamicBatch<>(action, batchSize, 10_000)) { MetricRegistry metrics = new MetricRegistry(); Meter rate = metrics.meter("publishing-rate"); AtomicBoolean keepGoing = new AtomicBoolean(true);