Skip to content

Commit 6bf945b

Browse files
committed
Fix back-pressure in Netty frame handler
A race condition can cause to see the channel as non-writable but end up with a new count down latch, not the one that was present when the writability flag was true. So the code waits on this new latch which will be never counted down. This can happen when the writability flips from false to true very fast. This commit makes sure to get the latch first, then check the writability a second time. In case we get an "old" latch, it is counted down automatically, so the enqueuing code will not be blocked. (cherry picked from commit cdf43b8) Conflicts: src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java
1 parent a5d1c6b commit 6bf945b

File tree

1 file changed

+19
-9
lines changed

1 file changed

+19
-9
lines changed

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -316,16 +316,26 @@ public void writeFrame(Frame frame) throws IOException {
316316
// we do not wait in the event loop
317317
this.doWriteFrame(frame);
318318
} else {
319-
try {
320-
boolean canWriteNow =
321-
this.handler.writableLatch().await(enqueuingTimeout.toMillis(), MILLISECONDS);
322-
if (canWriteNow) {
323-
this.doWriteFrame(frame);
324-
} else {
325-
throw new IOException("Frame enqueuing failed");
319+
// we get the current latch
320+
CountDownLatch latch = this.handler.writableLatch();
321+
if (this.handler.isWritable()) {
322+
// the channel became writable
323+
this.doWriteFrame(frame);
324+
} else {
325+
try {
326+
// the channel is still non-writable
327+
// in case its writability flipped, we have a reference to a latch that has been
328+
// counted down
329+
// so, worst case scenario, we'll enqueue only one frame right away
330+
boolean canWriteNow = latch.await(enqueuingTimeout.toMillis(), MILLISECONDS);
331+
if (canWriteNow) {
332+
this.doWriteFrame(frame);
333+
} else {
334+
throw new IOException("Frame enqueuing failed");
335+
}
336+
} catch (InterruptedException e) {
337+
Thread.currentThread().interrupt();
326338
}
327-
} catch (InterruptedException e) {
328-
Thread.currentThread().interrupt();
329339
}
330340
}
331341
}

0 commit comments

Comments
 (0)