Skip to content

Commit 196dc2f

Browse files
Artem LabazinArtem Labazin
authored andcommitted
Fix premature close connection
1 parent 043d5ce commit 196dc2f

File tree

11 files changed

+45
-49
lines changed

11 files changed

+45
-49
lines changed

encon/src/main/java/io/appulse/encon/ModuleClient.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.appulse.encon.common.RemoteNode;
2929
import io.appulse.encon.connection.Connection;
3030
import io.appulse.encon.connection.handshake.HandshakeClientInitializer;
31-
import io.appulse.encon.connection.regular.ConnectionHandler;
3231

3332
import io.netty.bootstrap.Bootstrap;
3433
import lombok.NonNull;
@@ -94,17 +93,7 @@ private CompletableFuture<Connection> createConnection (@NonNull RemoteNode remo
9493
.node(node)
9594
.future(future)
9695
.remote(remote)
97-
.channelCloseListener(f -> {
98-
log.debug("Running close listener");
99-
ConnectionHandler connectionHandler = f.channel()
100-
.pipeline()
101-
.get(ConnectionHandler.class);
102-
103-
if (connectionHandler == null) {
104-
return;
105-
}
106-
107-
RemoteNode remoteNode = connectionHandler.getRemote();
96+
.channelCloseAction(remoteNode -> {
10897
node.moduleLookup.remove(remoteNode);
10998
node.moduleConnection.remove(remoteNode);
11099
})

encon/src/main/java/io/appulse/encon/ModuleServer.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727

2828
import java.io.Closeable;
2929

30-
import io.appulse.encon.common.RemoteNode;
3130
import io.appulse.encon.connection.handshake.HandshakeServerInitializer;
32-
import io.appulse.encon.connection.regular.ConnectionHandler;
3331

3432
import io.netty.bootstrap.ServerBootstrap;
3533
import io.netty.buffer.PooledByteBufAllocator;
@@ -81,17 +79,7 @@ private void start () {
8179
.childHandler(HandshakeServerInitializer.builder()
8280
.node(node)
8381
.consumer(moduleConnection::add)
84-
.channelCloseListener(future -> {
85-
log.debug("Running close listener");
86-
ConnectionHandler connectionHandler = future.channel()
87-
.pipeline()
88-
.get(ConnectionHandler.class);
89-
90-
if (connectionHandler == null) {
91-
return;
92-
}
93-
94-
RemoteNode remote = connectionHandler.getRemote();
82+
.channelCloseAction(remote -> {
9583
node.moduleLookup.remove(remote);
9684
node.moduleConnection.remove(remote);
9785
})

encon/src/main/java/io/appulse/encon/connection/handshake/AbstractHandshakeChannelInitializer.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static lombok.AccessLevel.PROTECTED;
2323

2424
import io.netty.channel.ChannelDuplexHandler;
25-
import io.netty.channel.ChannelFutureListener;
2625
import io.netty.channel.ChannelInboundHandler;
2726
import io.netty.channel.ChannelInitializer;
2827
import io.netty.channel.ChannelOutboundHandler;
@@ -32,7 +31,6 @@
3231
import io.netty.handler.codec.LengthFieldPrepender;
3332
import io.netty.handler.logging.LoggingHandler;
3433
import io.netty.handler.timeout.ReadTimeoutHandler;
35-
3634
import lombok.NonNull;
3735
import lombok.RequiredArgsConstructor;
3836
import lombok.experimental.FieldDefaults;
@@ -76,9 +74,6 @@ static void cleanup (@NonNull ChannelPipeline pipeline) {
7674
@NonNull
7775
ChannelInboundHandler decoder;
7876

79-
@NonNull
80-
ChannelFutureListener channelCloseListener;
81-
8277
@Override
8378
protected void initChannel (SocketChannel socketChannel) throws Exception {
8479
throw new UnsupportedOperationException();
@@ -94,8 +89,6 @@ protected void initChannel (SocketChannel socketChannel, AbstractHandshakeHandle
9489
.addLast("ENCODER", ENCODER)
9590
.addLast("HANDLER", handler);
9691

97-
socketChannel.closeFuture().addListener(channelCloseListener);
98-
9992
log.debug("Handshake pipeline for {} was initialized",
10093
socketChannel.remoteAddress());
10194
}

encon/src/main/java/io/appulse/encon/connection/handshake/AbstractHandshakeHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static lombok.AccessLevel.PROTECTED;
2020

2121
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Consumer;
2223

2324
import io.appulse.encon.Node;
2425
import io.appulse.encon.common.RemoteNode;
@@ -51,6 +52,9 @@ abstract class AbstractHandshakeHandler extends ChannelInboundHandlerAdapter {
5152
@NonNull
5253
CompletableFuture<Connection> future;
5354

55+
@NonNull
56+
Consumer<RemoteNode> channelCloseAction;
57+
5458
@NonFinal
5559
RemoteNode remote;
5660

@@ -74,7 +78,7 @@ protected void successHandshake (@NonNull Channel channel) {
7478
log.debug("Replacing pipline to regular for {}", channel.remoteAddress());
7579

7680
AbstractHandshakeChannelInitializer.cleanup(pipeline);
77-
val handler = RegularPipeline.setup(pipeline, node, remote);
81+
val handler = RegularPipeline.setup(pipeline, node, remote, channelCloseAction);
7882

7983
future.complete(new Connection(remote, handler));
8084

encon/src/main/java/io/appulse/encon/connection/handshake/HandshakeClientInitializer.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import static lombok.AccessLevel.PRIVATE;
2020

2121
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Consumer;
2223

2324
import io.appulse.encon.Node;
2425
import io.appulse.encon.common.RemoteNode;
2526
import io.appulse.encon.connection.Connection;
2627

27-
import io.netty.channel.ChannelFutureListener;
2828
import io.netty.channel.ChannelInboundHandler;
2929
import io.netty.channel.socket.SocketChannel;
3030
import lombok.Builder;
@@ -52,21 +52,24 @@ public final class HandshakeClientInitializer extends AbstractHandshakeChannelIn
5252

5353
RemoteNode remote;
5454

55+
Consumer<RemoteNode> channelCloseAction;
56+
5557
@Builder
5658
public HandshakeClientInitializer (@NonNull Node node,
5759
@NonNull CompletableFuture<Connection> future,
5860
@NonNull RemoteNode remote,
59-
ChannelFutureListener channelCloseListener
61+
@NonNull Consumer<RemoteNode> channelCloseAction
6062
) {
61-
super(DECODER, channelCloseListener);
63+
super(DECODER);
6264
this.node = node;
6365
this.future = future;
6466
this.remote = remote;
67+
this.channelCloseAction = channelCloseAction;
6568
}
6669

6770
@Override
6871
protected void initChannel (SocketChannel socketChannel) throws Exception {
69-
val handler = new HandshakeHandlerClient(node, future, remote);
72+
val handler = new HandshakeHandlerClient(node, future, remote, channelCloseAction);
7073
initChannel(socketChannel, handler);
7174
}
7275
}

encon/src/main/java/io/appulse/encon/connection/handshake/HandshakeHandlerClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Arrays;
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.ThreadLocalRandom;
24+
import java.util.function.Consumer;
2425

2526
import io.appulse.encon.Node;
2627
import io.appulse.encon.common.RemoteNode;
@@ -54,8 +55,12 @@ class HandshakeHandlerClient extends AbstractHandshakeHandler {
5455
int myChallenge;
5556

5657
@Builder
57-
HandshakeHandlerClient (Node node, CompletableFuture<Connection> future, @NonNull RemoteNode remote) {
58-
super(node, future);
58+
HandshakeHandlerClient (Node node,
59+
CompletableFuture<Connection> future,
60+
@NonNull RemoteNode remote,
61+
Consumer<RemoteNode> channelCloseAction
62+
) {
63+
super(node, future, channelCloseAction);
5964
this.remote = remote;
6065
}
6166

encon/src/main/java/io/appulse/encon/connection/handshake/HandshakeHandlerServer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import java.util.Arrays;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ThreadLocalRandom;
25+
import java.util.function.Consumer;
2526

2627
import io.appulse.encon.Node;
28+
import io.appulse.encon.common.RemoteNode;
2729
import io.appulse.encon.connection.Connection;
2830
import io.appulse.encon.connection.handshake.exception.HandshakeException;
2931
import io.appulse.encon.connection.handshake.message.ChallengeAcknowledgeMessage;
@@ -54,8 +56,8 @@ class HandshakeHandlerServer extends AbstractHandshakeHandler {
5456
int ourChallenge;
5557

5658
@Builder
57-
HandshakeHandlerServer (Node node, CompletableFuture<Connection> future) {
58-
super(node, future);
59+
HandshakeHandlerServer (Node node, CompletableFuture<Connection> future, Consumer<RemoteNode> channelCloseAction) {
60+
super(node, future, channelCloseAction);
5961
}
6062

6163
@Override

encon/src/main/java/io/appulse/encon/connection/handshake/HandshakeServerInitializer.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import java.util.function.Consumer;
2323

2424
import io.appulse.encon.Node;
25+
import io.appulse.encon.common.RemoteNode;
2526
import io.appulse.encon.connection.Connection;
2627

27-
import io.netty.channel.ChannelFutureListener;
2828
import io.netty.channel.ChannelInboundHandler;
2929
import io.netty.channel.socket.SocketChannel;
3030
import lombok.Builder;
@@ -52,14 +52,17 @@ public final class HandshakeServerInitializer extends AbstractHandshakeChannelIn
5252

5353
Consumer<CompletableFuture<Connection>> consumer;
5454

55+
Consumer<RemoteNode> channelCloseAction;
56+
5557
@Builder
5658
public HandshakeServerInitializer (@NonNull Node node,
5759
@NonNull Consumer<CompletableFuture<Connection>> consumer,
58-
ChannelFutureListener channelCloseListener
60+
@NonNull Consumer<RemoteNode> channelCloseAction
5961
) {
60-
super(DECODER, channelCloseListener);
62+
super(DECODER);
6163
this.node = node;
6264
this.consumer = consumer;
65+
this.channelCloseAction = channelCloseAction;
6366
}
6467

6568
@Override
@@ -68,7 +71,7 @@ protected void initChannel (SocketChannel socketChannel) throws Exception {
6871
socketChannel.remoteAddress());
6972

7073
CompletableFuture<Connection> future = new CompletableFuture<>();
71-
val handler = new HandshakeHandlerServer(node, future);
74+
val handler = new HandshakeHandlerServer(node, future, channelCloseAction);
7275
initChannel(socketChannel, handler);
7376
consumer.accept(future);
7477
}

encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static lombok.AccessLevel.PRIVATE;
2020

2121
import java.io.Closeable;
22+
import java.util.function.Consumer;
2223

2324
import io.appulse.encon.Node;
2425
import io.appulse.encon.common.RemoteNode;
@@ -59,6 +60,9 @@ public final class ConnectionHandler extends ChannelInboundHandlerAdapter implem
5960
@NonNull
6061
RemoteNode remote;
6162

63+
@NonNull
64+
Consumer<RemoteNode> channelCloseAction;
65+
6266
@NonFinal
6367
Channel channel;
6468

@@ -122,6 +126,8 @@ public void close () {
122126
if (channel.isOpen()) {
123127
channel.close();
124128
}
129+
channelCloseAction.accept(remote);
130+
125131
log.debug("Client handler for {} was closed", channel.remoteAddress());
126132
}
127133

encon/src/main/java/io/appulse/encon/connection/regular/RegularPipeline.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static io.netty.handler.logging.LogLevel.DEBUG;
2020
import static java.lang.Integer.MAX_VALUE;
2121

22+
import java.util.function.Consumer;
23+
2224
import io.appulse.encon.Node;
2325
import io.appulse.encon.common.RemoteNode;
2426

@@ -59,9 +61,10 @@ public final class RegularPipeline {
5961

6062
public static ConnectionHandler setup (@NonNull ChannelPipeline pipeline,
6163
@NonNull Node node,
62-
@NonNull RemoteNode remoteNode
64+
@NonNull RemoteNode remoteNode,
65+
@NonNull Consumer<RemoteNode> channelCloseAction
6366
) {
64-
val handler = new ConnectionHandler(node, remoteNode);
67+
val handler = new ConnectionHandler(node, remoteNode, channelCloseAction);
6568

6669
pipeline
6770
.addLast(LOGGING_HANDLER)

0 commit comments

Comments
 (0)