From c51c116f8e2ad9ea9190ac56b81e4c3e1ea2470a Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 10 Jul 2025 10:01:17 +0200 Subject: [PATCH 1/2] Minor, added some methods to ReadDemand that would increase readability --- .../java/org/logstash/ackedqueue/Queue.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 691987793c5..85a98d7de11 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -932,10 +932,9 @@ private Batch deserialize() { private boolean awaitReadDemand(final long timeoutMillis, final int elementsNeeded) throws InterruptedException { assert this.lock.isHeldByCurrentThread(); - final long deadlineMillis = Math.addExact(System.currentTimeMillis(), timeoutMillis); - this.readDemand = new ReadDemand(deadlineMillis, elementsNeeded); + this.readDemand = ReadDemand.fromExpectedTimeout(timeoutMillis, elementsNeeded); - boolean unElapsed = this.notEmpty.awaitUntil(new Date(deadlineMillis)); + boolean unElapsed = this.notEmpty.awaitUntil(this.readDemand.expectedExpiry()); this.readDemand = null; return unElapsed; } @@ -946,7 +945,7 @@ private void maybeSignalReadDemand(boolean forceSignal) { // if we're not forcing, and if the current read demand has // neither been met nor expired, this method becomes a no-op. if (!forceSignal && Objects.nonNull(readDemand)) { - if (unreadCount < readDemand.elementsNeeded && System.currentTimeMillis() < readDemand.deadlineMillis) { + if (!readDemand.isSatisfiable(unreadCount)) { return; } } @@ -962,5 +961,18 @@ private static class ReadDemand { this.deadlineMillis = deadlineMillis; this.elementsNeeded = elementsNeeded; } + + boolean isSatisfiable(long available) { + return available >= elementsNeeded || System.currentTimeMillis() >= deadlineMillis; + } + + static ReadDemand fromExpectedTimeout(long timeoutMillis, int elementsNeeded) { + final long deadlineMillis = Math.addExact(System.currentTimeMillis(), timeoutMillis); + return new ReadDemand(deadlineMillis, elementsNeeded); + } + + public Date expectedExpiry() { + return new Date(deadlineMillis); + } } } From bc6339bf35b689af7c18cbf48bfee3009eb4e46a Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Fri, 11 Jul 2025 09:22:17 +0200 Subject: [PATCH 2/2] Switched isSatisfiable to the more meaningful isDeferrable Co-authored-by: Rye Biesemeyer --- .../src/main/java/org/logstash/ackedqueue/Queue.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 85a98d7de11..6e0d338f6c2 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -945,7 +945,7 @@ private void maybeSignalReadDemand(boolean forceSignal) { // if we're not forcing, and if the current read demand has // neither been met nor expired, this method becomes a no-op. if (!forceSignal && Objects.nonNull(readDemand)) { - if (!readDemand.isSatisfiable(unreadCount)) { + if (readDemand.isDeferrable(unreadCount)) { return; } } @@ -962,8 +962,8 @@ private static class ReadDemand { this.elementsNeeded = elementsNeeded; } - boolean isSatisfiable(long available) { - return available >= elementsNeeded || System.currentTimeMillis() >= deadlineMillis; + boolean isDeferrable(long elementsAvailable) { + return elementsAvailable < elementsNeeded && System.currentTimeMillis() < deadlineMillis; } static ReadDemand fromExpectedTimeout(long timeoutMillis, int elementsNeeded) {