Skip to content

Commit 90aaaa9

Browse files
authored
Merge pull request #1681 from rabbitmq/netty-fix-back-pressure
Fix back-pressure in Netty frame handler References #1663
2 parents 80d5504 + cdf43b8 commit 90aaaa9

File tree

1 file changed

+20
-10
lines changed

1 file changed

+20
-10
lines changed

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,17 +318,27 @@ public void writeFrame(Frame frame) throws IOException {
318318
// we do not wait in the event loop
319319
this.doWriteFrame(frame);
320320
} else {
321-
try {
322-
boolean canWriteNow =
323-
this.handler.writableLatch().await(enqueuingTimeout.toMillis(), MILLISECONDS);
324-
if (canWriteNow) {
325-
this.doWriteFrame(frame);
326-
} else {
327-
this.handler.logEvents();
328-
throw new IOException("Frame enqueuing failed");
321+
// we get the current latch
322+
CountDownLatch latch = this.handler.writableLatch();
323+
if (this.handler.isWritable()) {
324+
// the channel became writable
325+
this.doWriteFrame(frame);
326+
} else {
327+
try {
328+
// the channel is still non-writable
329+
// in case its writability flipped, we have a reference to a latch that has been
330+
// counted down
331+
// so, worst case scenario, we'll enqueue only one frame right away
332+
boolean canWriteNow = latch.await(enqueuingTimeout.toMillis(), MILLISECONDS);
333+
if (canWriteNow) {
334+
this.doWriteFrame(frame);
335+
} else {
336+
this.handler.logEvents();
337+
throw new IOException("Frame enqueuing failed");
338+
}
339+
} catch (InterruptedException e) {
340+
Thread.currentThread().interrupt();
329341
}
330-
} catch (InterruptedException e) {
331-
Thread.currentThread().interrupt();
332342
}
333343
}
334344
}

0 commit comments

Comments
 (0)