@Component@RequiredArgsConstructorpublic class WebSocketServer implements ApplicationRunner, DisposableBean { @Value("${netty.port}") private int port; private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final ServerBootstrap b = new ServerBootstrap(); private final Map<String, Channel> map = new HashMap<>(); @Override public void destroy() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } @Override public void run(ApplicationArguments args) throws Exception { b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerProtocolHandler("/ws", true, 3000L)); pipeline.addLast(new WebSocketFrameHandler(map)); } }); ChannelFuture future = b.bind(port).sync(); System.out.println("WebSocket server started on port " + port); future.channel().closeFuture().sync(); }}@Slf4jpublic class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private final Map<String, Channel> map; public static final AttributeKey<String> URI_ATTRIBUTE_KEY = AttributeKey.valueOf("URI"); public WebSocketFrameHandler(Map<String, Channel> map) { this.map = map; } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt; Channel currentChannel = ctx.channel(); String uri = handshakeComplete.requestUri(); currentChannel.attr(URI_ATTRIBUTE_KEY).set(uri); Channel exist = map.get(uri); if (Objects.isNull(exist)) { map.put(uri, currentChannel); log.info(new Message(uri, currentChannel.id().toString(), 1).toString()); } else { //无效 exist.close().sync();//明确在 close 执行完成后在重新上线,但是好像没生效一样,直接就输出上线! 1= 上线,0= 下线 map.put(uri, currentChannel); log.info(new Message(uri, currentChannel.id().toString(), 1).toString()); //无效 //exist.close().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString())); //无效 //exist.close().sync().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString())); } } else { super.userEventTriggered(ctx, evt); } } @Override public void channelInactive(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); String uri = channel.attr(URI_ATTRIBUTE_KEY).get(); log.info(new Message(uri, channel.id().toString(), 0).toString()); }}日志输出:
2025-09-21 03:16:54.669 INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessionId='86aeadad', status=上线}2025-09-21 03:16:58.837 INFO 30008 --- [ntLoopGroup-5-2] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessionId='fa3a5fbb', status=上线}2025-09-21 03:16:58.838 INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessionId='86aeadad', status=下线}我用 netty 提供的 websocket 服务,想达到新设备踢出老设备的在线状态,但是控制不好状态
预期的效果:
- 1.86aeadad 上线成功2.fa3a5fbb 登录检查3.86aeadad 踢下线4.fa3a5fbb 上线
日志输出:
- 1.86aeadad 上线2.86aeadad 下线3.fa3a5fbb 上线
但是现在看结果是错误的顺序。
- 1.86aeadad 上线2.fa3a5fbb 上线3.86aeadad 下线
想在这个服务里保证状态的正确,我尝试使用自定义 eventGroup 并且设置线程数 1 来执行代码也不行,也尝试跟 ai 沟通了半宿也研究明白
netty-all 的版本是 4.2.6.Final
