Skip to content

Commit cdf43b8

Browse files
committed
Fix back-pressure in Netty frame handler
A race condition can cause to see the channel as non-writable but end up with a new count down latch, not the one that was present when the writability flag was true. So the code waits on this new latch which will be never counted down. This can happen when the writability flips from false to true very fast. This commit makes sure to get the latch first, then check the writability a second time. In case we get an "old" latch, it is counted down automatically, so the enqueuing code will not be blocked.
1 parent 80d5504 commit cdf43b8

File tree

1 file changed

+20
-10
lines changed

1 file changed

+20
-10
lines changed

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,17 +318,27 @@ public void writeFrame(Frame frame) throws IOException {
318318
// we do not wait in the event loop
319319
this.doWriteFrame(frame);
320320
} else {
321-
try {
322-
boolean canWriteNow =
323-
this.handler.writableLatch().await(enqueuingTimeout.toMillis(), MILLISECONDS);
324-
if (canWriteNow) {
325-
this.doWriteFrame(frame);
326-
} else {
327-
this.handler.logEvents();
328-
throw new IOException("Frame enqueuing failed");
321+
// we get the current latch
322+
CountDownLatch latch = this.handler.writableLatch();
323+
if (this.handler.isWritable()) {
324+
// the channel became writable
325+
this.doWriteFrame(frame);
326+
} else {
327+
try {
328+
// the channel is still non-writable
329+
// in case its writability flipped, we have a reference to a latch that has been
330+
// counted down
331+
// so, worst case scenario, we'll enqueue only one frame right away
332+
boolean canWriteNow = latch.await(enqueuingTimeout.toMillis(), MILLISECONDS);
333+
if (canWriteNow) {
334+
this.doWriteFrame(frame);
335+
} else {
336+
this.handler.logEvents();
337+
throw new IOException("Frame enqueuing failed");
338+
}
339+
} catch (InterruptedException e) {
340+
Thread.currentThread().interrupt();
329341
}
330-
} catch (InterruptedException e) {
331-
Thread.currentThread().interrupt();
332342
}
333343
}
334344
}

0 commit comments

Comments
 (0)