|
24 | 24 | import io.netty.channel.ChannelInboundHandlerAdapter; |
25 | 25 | import io.netty.channel.ChannelOutboundInvoker; |
26 | 26 | import io.netty.handler.codec.haproxy.HAProxyMessage; |
| 27 | +import io.netty.handler.ssl.SslCloseCompletionEvent; |
| 28 | +import io.netty.handler.ssl.SslHandshakeCompletionEvent; |
27 | 29 | import org.apache.pulsar.common.api.proto.BaseCommand; |
28 | 30 | import org.apache.pulsar.common.api.proto.CommandAck; |
29 | 31 | import org.apache.pulsar.common.api.proto.CommandAckResponse; |
@@ -749,4 +751,26 @@ protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose comma |
749 | 751 | private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) { |
750 | 752 | NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd); |
751 | 753 | } |
| 754 | + |
| 755 | + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { |
| 756 | + if (evt instanceof SslHandshakeCompletionEvent) { |
| 757 | + // log handshake failures |
| 758 | + SslHandshakeCompletionEvent sslHandshakeCompletionEvent = (SslHandshakeCompletionEvent) evt; |
| 759 | + if (!sslHandshakeCompletionEvent.isSuccess()) { |
| 760 | + log.warn("[{}] TLS handshake failed. {}", ctx.channel(), sslHandshakeCompletionEvent); |
| 761 | + } |
| 762 | + } else if (evt instanceof SslCloseCompletionEvent) { |
| 763 | + // handle TLS close_notify event and immediately close the channel |
| 764 | + // this is not handled by Netty by default |
| 765 | + // See https://datatracker.ietf.org/doc/html/rfc8446#section-6.1 for more details |
| 766 | + SslCloseCompletionEvent sslCloseCompletionEvent = (SslCloseCompletionEvent) evt; |
| 767 | + if (sslCloseCompletionEvent.isSuccess() && ctx.channel().isActive()) { |
| 768 | + if (log.isDebugEnabled()) { |
| 769 | + log.debug("[{}] Received a TLS close_notify, closing the channel.", ctx.channel()); |
| 770 | + } |
| 771 | + ctx.close(); |
| 772 | + } |
| 773 | + } |
| 774 | + ctx.fireUserEventTriggered(evt); |
| 775 | + } |
752 | 776 | } |
0 commit comments