|
23 | 23 | import io.netty.channel.ChannelInitializer; |
24 | 24 | import io.netty.channel.ChannelPipeline; |
25 | 25 | import io.netty.channel.EventLoopGroup; |
26 | | -import io.netty.channel.SingleThreadEventLoop; |
27 | | -import io.netty.channel.group.DefaultChannelGroup; |
28 | 26 | import io.netty.channel.nio.NioEventLoopGroup; |
29 | 27 | import io.netty.channel.socket.SocketChannel; |
30 | 28 | import io.netty.channel.socket.nio.NioServerSocketChannel; |
31 | 29 | import io.netty.channel.socket.nio.NioSocketChannel; |
32 | 30 | import io.netty.handler.timeout.IdleStateHandler; |
33 | | -import io.netty.util.concurrent.EventExecutor; |
34 | | -import io.netty.util.concurrent.GlobalEventExecutor; |
35 | 31 | import org.junit.jupiter.api.*; |
36 | 32 | import org.junit.jupiter.api.extension.ExtendWith; |
37 | 33 | import org.mockito.ArgumentCaptor; |
|
41 | 37 |
|
42 | 38 | import java.net.InetSocketAddress; |
43 | 39 | import java.net.SocketAddress; |
44 | | -import java.util.Iterator; |
45 | 40 | import java.util.concurrent.CountDownLatch; |
46 | 41 | import java.util.concurrent.TimeUnit; |
47 | 42 |
|
@@ -198,17 +193,15 @@ void testChannelInactive() throws Exception { |
198 | 193 | void testChannelInactiveByServer() throws Exception { |
199 | 194 | connectClient(); |
200 | 195 |
|
201 | | - DefaultChannelGroup serverChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
202 | | - serverChannels.addAll(collectServerChannels(workerGroup)); |
203 | | - Channel serverSideClientChannel = serverChannels.stream() |
204 | | - .filter(ch -> ch.isActive() && ch.remoteAddress() != null) |
205 | | - .findFirst() |
206 | | - .orElseThrow(() -> new AssertionError("Failed to find client channel on server side")); |
| 196 | + // Simulate server-side behavior by performing shutdown operations in the event loop |
| 197 | + clientChannel.eventLoop().execute(() -> { |
| 198 | + // Simulate server-side disconnection |
| 199 | + clientChannel.pipeline().fireChannelInactive(); |
| 200 | + }); |
207 | 201 |
|
208 | | - serverSideClientChannel.close().sync(); |
209 | 202 | assertTrue( |
210 | 203 | channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), |
211 | | - "Channel inactive event was not detected on client side when server closed the connection"); |
| 204 | + "Channel inactive event was not detected on client side when connection was closed"); |
212 | 205 | verify(mockRemotingClient).onChannelInactive(any(Channel.class)); |
213 | 206 | } |
214 | 207 |
|
@@ -257,27 +250,4 @@ protected void initChannel(SocketChannel ch) { |
257 | 250 | clientChannel = future.channel(); |
258 | 251 | assertTrue(clientChannel.isActive()); |
259 | 252 | } |
260 | | - |
261 | | - private DefaultChannelGroup collectServerChannels(EventLoopGroup workerGroup) throws InterruptedException { |
262 | | - DefaultChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
263 | | - |
264 | | - for (EventExecutor executor : workerGroup) { |
265 | | - if (executor instanceof SingleThreadEventLoop) { |
266 | | - SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) executor; |
267 | | - |
268 | | - executor.submit(() -> { |
269 | | - Iterator<Channel> it = eventLoop.registeredChannelsIterator(); |
270 | | - while (it.hasNext()) { |
271 | | - Channel ch = it.next(); |
272 | | - if (ch.isActive() && ch instanceof SocketChannel) { |
273 | | - channels.add(ch); |
274 | | - } |
275 | | - } |
276 | | - return null; |
277 | | - }) |
278 | | - .sync(); |
279 | | - } |
280 | | - } |
281 | | - return channels; |
282 | | - } |
283 | 253 | } |
0 commit comments