V2EX 09月21日
Netty WebSocket 实现新设备踢出旧设备功能
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文档介绍了如何使用 Netty WebSocket 服务实现新设备登录时踢出旧设备的功能。当新的 WebSocket 连接接入时,如果相同的 URI 已经被占用,则会强制关闭旧的连接,并记录相应的上线和下线日志。代码中通过 `Channel` 的 `URI_ATTRIBUTE_KEY` 属性来存储 URI,并在 `userEventTriggered` 方法中处理握手完成事件,实现新连接的接入和旧连接的替换。同时,`channelInactive` 方法用于处理连接断开时的下线日志记录。

🎯 **新设备登录踢出旧设备机制**:当一个新的 WebSocket 连接尝试接入时,如果其 URI(如 `/ws/abc_123`)已存在于 `map` 中,则会触发旧连接的关闭操作,以保证同一 URI 只有一个活跃连接。这确保了同一账户或标识在不同设备上的登录顺序和状态的正确管理。

📝 **日志记录与状态追踪**:代码中通过 `log.info` 方法详细记录了 WebSocket 连接的状态变化。`userEventTriggered` 方法在握手成功后记录新连接的“上线”状态,而 `channelInactive` 方法则在连接断开时记录“下线”状态。这种日志记录方式有助于清晰地追踪连接的生命周期和设备状态。

🔌 **Netty WebSocket 协议处理**:使用了 Netty 的 `WebSocketServerProtocolHandler` 来处理 WebSocket 的握手过程,并定义了一个自定义的 `WebSocketFrameHandler` 来处理握手完成后的具体逻辑。`ChannelInitializer` 配置了 `HttpServerCodec`、`HttpObjectAggregator` 等组件,为 WebSocket 通信奠定基础。

🔄 **连接管理与替换逻辑**:`WebSocketFrameHandler` 中的 `userEventTriggered` 方法是实现新旧设备替换的核心。它通过 `map.get(uri)` 检查是否存在同 URI 的旧连接,如果存在则调用 `exist.close().sync()` 来关闭旧连接,然后将新连接存入 `map`。虽然代码中尝试了不同的 `close` 方式,但关键在于确保关闭操作的完成,以及后续新连接的成功注册。

@Component@RequiredArgsConstructorpublic class WebSocketServer implements ApplicationRunner, DisposableBean {    @Value("${netty.port}")    private int port;    private final EventLoopGroup bossGroup = new NioEventLoopGroup();    private final EventLoopGroup workerGroup = new NioEventLoopGroup();    private final ServerBootstrap b = new ServerBootstrap();    private final Map<String, Channel> map = new HashMap<>();    @Override    public void destroy() {        bossGroup.shutdownGracefully();        workerGroup.shutdownGracefully();    }    @Override    public void run(ApplicationArguments args) throws Exception {        b.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, 128)                .childOption(ChannelOption.SO_KEEPALIVE, true)                .childHandler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel ch) {                        ChannelPipeline pipeline = ch.pipeline();                        pipeline.addLast(new HttpServerCodec());                        pipeline.addLast(new HttpObjectAggregator(65536));                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", true, 3000L));                        pipeline.addLast(new WebSocketFrameHandler(map));                    }                });        ChannelFuture future = b.bind(port).sync();        System.out.println("WebSocket server started on port " + port);        future.channel().closeFuture().sync();    }}@Slf4jpublic class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {    private final Map<String, Channel> map;    public static final AttributeKey<String> URI_ATTRIBUTE_KEY = AttributeKey.valueOf("URI");    public WebSocketFrameHandler(Map<String, Channel> map) {        this.map = map;    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {            WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;            Channel currentChannel = ctx.channel();            String uri = handshakeComplete.requestUri();            currentChannel.attr(URI_ATTRIBUTE_KEY).set(uri);            Channel exist = map.get(uri);            if (Objects.isNull(exist)) {                map.put(uri, currentChannel);                log.info(new Message(uri, currentChannel.id().toString(), 1).toString());            } else {                //无效                exist.close().sync();//明确在 close 执行完成后在重新上线,但是好像没生效一样,直接就输出上线! 1= 上线,0= 下线                map.put(uri, currentChannel);                log.info(new Message(uri, currentChannel.id().toString(), 1).toString());                //无效                //exist.close().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));                //无效                //exist.close().sync().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));            }        } else {            super.userEventTriggered(ctx, evt);        }    }    @Override    public void channelInactive(ChannelHandlerContext ctx) {        Channel channel = ctx.channel();        String uri = channel.attr(URI_ATTRIBUTE_KEY).get();        log.info(new Message(uri, channel.id().toString(), 0).toString());    }}

日志输出:

2025-09-21 03:16:54.669  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=上线}2025-09-21 03:16:58.837  INFO 30008 --- [ntLoopGroup-5-2] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='fa3a5fbb', status=上线}2025-09-21 03:16:58.838  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=下线}

我用 netty 提供的 websocket 服务,想达到新设备踢出老设备的在线状态,但是控制不好状态

预期的效果:

日志输出:

但是现在看结果是错误的顺序。

想在这个服务里保证状态的正确,我尝试使用自定义 eventGroup 并且设置线程数 1 来执行代码也不行,也尝试跟 ai 沟通了半宿也研究明白

netty-all 的版本是 4.2.6.Final

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Netty WebSocket 新设备踢旧设备 在线状态管理 Java Netty WebSocket device management online status
相关文章