diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 691987793c5..6e0d338f6c2 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -932,10 +932,9 @@ private Batch deserialize() { private boolean awaitReadDemand(final long timeoutMillis, final int elementsNeeded) throws InterruptedException { assert this.lock.isHeldByCurrentThread(); - final long deadlineMillis = Math.addExact(System.currentTimeMillis(), timeoutMillis); - this.readDemand = new ReadDemand(deadlineMillis, elementsNeeded); + this.readDemand = ReadDemand.fromExpectedTimeout(timeoutMillis, elementsNeeded); - boolean unElapsed = this.notEmpty.awaitUntil(new Date(deadlineMillis)); + boolean unElapsed = this.notEmpty.awaitUntil(this.readDemand.expectedExpiry()); this.readDemand = null; return unElapsed; } @@ -946,7 +945,7 @@ private void maybeSignalReadDemand(boolean forceSignal) { // if we're not forcing, and if the current read demand has // neither been met nor expired, this method becomes a no-op. if (!forceSignal && Objects.nonNull(readDemand)) { - if (unreadCount < readDemand.elementsNeeded && System.currentTimeMillis() < readDemand.deadlineMillis) { + if (readDemand.isDeferrable(unreadCount)) { return; } } @@ -962,5 +961,18 @@ private static class ReadDemand { this.deadlineMillis = deadlineMillis; this.elementsNeeded = elementsNeeded; } + + boolean isDeferrable(long elementsAvailable) { + return elementsAvailable < elementsNeeded && System.currentTimeMillis() < deadlineMillis; + } + + static ReadDemand fromExpectedTimeout(long timeoutMillis, int elementsNeeded) { + final long deadlineMillis = Math.addExact(System.currentTimeMillis(), timeoutMillis); + return new ReadDemand(deadlineMillis, elementsNeeded); + } + + public Date expectedExpiry() { + return new Date(deadlineMillis); + } } }