V2EX 09月21日
Netty WebSocket实现新设备踢出旧设备
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文档介绍了如何使用Netty的WebSocket服务实现新设备登录时自动踢出旧设备的功能。当一个URI被新的WebSocket连接占用时,原有的连接会被关闭,以确保同一URI在同一时间只有一个活跃连接。代码通过`WebSocketServerProtocolHandler.HandshakeComplete`事件捕获新连接的建立,并利用`ChannelId`和`URI`进行映射管理。当检测到已有连接时,旧连接会被显式关闭,然后新连接被添加到映射中,并记录上线日志。当连接断开时,会记录下线日志。此实现旨在解决WebSocket连接状态管理中的并发和覆盖问题,确保用户体验的一致性。

🔄 **连接管理与覆盖逻辑**:当新的WebSocket连接尝试使用一个已存在的URI时,Netty服务会捕获`WebSocketServerProtocolHandler.HandshakeComplete`事件。如果该URI已在`map`中存在,则旧的`Channel`会被立即关闭,从而实现新设备上线时自动踢出旧设备的功能,确保同一URI同一时间只有一个活跃连接。

📝 **状态日志记录**:通过`log.info(new Message(uri, currentChannel.id().toString(), 1).toString())`和`log.info(new Message(uri, channel.id().toString(), 0).toString())`,详细记录了WebSocket连接的上线和下线状态。这有助于追踪连接的生命周期,并对上述踢出逻辑的执行进行验证。

💡 **`URI_ATTRIBUTE_KEY`的使用**:`AttributeKey URI_ATTRIBUTE_KEY`被用于在`Channel`的属性中存储其对应的URI。这使得在`channelInactive`等事件处理方法中能够方便地获取到连接所关联的URI,从而进行正确的状态管理和日志记录。

⚠️ **旧连接关闭的时序问题**:在`userEventTriggered`方法中,当检测到旧连接存在时,`exist.close().sync()`被调用。然而,日志显示旧连接的下线日志(`status=下线`)出现在新连接的上线日志(`status=上线`)之后,这可能表明`close().sync()`的异步性或Netty事件处理的顺序导致了预期的状态切换顺序被打乱。需要进一步优化关闭旧连接的操作,确保其完成后再处理新连接的状态。

@Component@RequiredArgsConstructorpublic class WebSocketServer implements ApplicationRunner, DisposableBean {    @Value("${netty.port}")    private int port;    private final EventLoopGroup bossGroup = new NioEventLoopGroup();    private final EventLoopGroup workerGroup = new NioEventLoopGroup();    private final ServerBootstrap b = new ServerBootstrap();    private final Map<String, Channel> map = new HashMap<>();    @Override    public void destroy() {        bossGroup.shutdownGracefully();        workerGroup.shutdownGracefully();    }    @Override    public void run(ApplicationArguments args) throws Exception {        b.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, 128)                .childOption(ChannelOption.SO_KEEPALIVE, true)                .childHandler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel ch) {                        ChannelPipeline pipeline = ch.pipeline();                        pipeline.addLast(new HttpServerCodec());                        pipeline.addLast(new HttpObjectAggregator(65536));                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", true, 3000L));                        pipeline.addLast(new WebSocketFrameHandler(map));                    }                });        ChannelFuture future = b.bind(port).sync();        System.out.println("WebSocket server started on port " + port);        future.channel().closeFuture().sync();    }}@Slf4jpublic class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {    private final Map<String, Channel> map;    public static final AttributeKey<String> URI_ATTRIBUTE_KEY = AttributeKey.valueOf("URI");    public WebSocketFrameHandler(Map<String, Channel> map) {        this.map = map;    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {            WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;            Channel currentChannel = ctx.channel();            String uri = handshakeComplete.requestUri();            currentChannel.attr(URI_ATTRIBUTE_KEY).set(uri);            Channel exist = map.get(uri);            if (Objects.isNull(exist)) {                map.put(uri, currentChannel);                log.info(new Message(uri, currentChannel.id().toString(), 1).toString());            } else {                //无效                exist.close().sync();//明确在 close 执行完成后在重新上线,但是好像没生效一样,直接就输出上线! 1= 上线,0= 下线                map.put(uri, currentChannel);                log.info(new Message(uri, currentChannel.id().toString(), 1).toString());                //无效                //exist.close().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));                //无效                //exist.close().sync().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));            }        } else {            super.userEventTriggered(ctx, evt);        }    }    @Override    public void channelInactive(ChannelHandlerContext ctx) {        Channel channel = ctx.channel();        String uri = channel.attr(URI_ATTRIBUTE_KEY).get();        log.info(new Message(uri, channel.id().toString(), 0).toString());    }}

日志输出:

2025-09-21 03:16:54.669  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=上线}2025-09-21 03:16:58.837  INFO 30008 --- [ntLoopGroup-5-2] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='fa3a5fbb', status=上线}2025-09-21 03:16:58.838  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=下线}

我用 netty 提供的 websocket 服务,想达到新设备踢出老设备的在线状态,但是控制不好状态

预期的效果:

日志输出:

但是现在看结果是错误的顺序。

想在这个服务里保证状态的正确,我尝试使用自定义 eventGroup 并且设置线程数 1 来执行代码也不行,也尝试跟 ai 沟通了半宿也研究明白

netty-all 的版本是 4.2.6.Final

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Netty WebSocket 新设备踢出旧设备 连接管理 状态同步 Netty WebSocket Device Kick-out Connection Management State Synchronization
相关文章