实现Websocket服务的两种方式——实现简单聊天室功能

  • 方法一:Netty实现 —— 需要另外占用一个与springboot项目中tomcat不通的端口;

  • 方法二:spring-boot-starter-websocket实现 —— 可共用springboot项目中tomcat使用的端口;


一、Netty实现

1、引入netty的依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.42.Final</version>
</dependency>

2、为了方便打包编译,我就使用springboot项目启动nettyServer吧:

可以借助ApplicationRunner,也可以借助CommandLineRunner完成,在Springboot项目启动后,即可自动执行Runner中的run()方法:

@Component
public class NettyChatRunner implements ApplicationRunner {
    @Value("${websocket.port}")
    private Integer port;

    @Value("${websocket.netty}")
    private String path;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        new NettyChatServer(port, path).start();
    }
}

3、如果有Netty基础,这个NettyServer服务,就很容易读懂了:

public class NettyChatServer {
    private int port;
    private String path;

    public NettyChatServer(int port, String path){
        this.port = port;
        this.path = path;
    }

    public void start(){
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(4);
        ServerBootstrap bootstrap = new ServerBootstrap();

        try {
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new HttpServerCodec())  //将请求和响应进行http编码或解码
                                    .addLast(new HttpObjectAggregator(64*1024))  //解码成FullHttpRequest,因为HttpServerCodec无法解码Post请求体中的参数
                                    .addLast(new ChunkedWriteHandler())  //大数据流的支持
//                                    .addLast(new HttpRequestHandler("/chat"))  //专门用来处理普通的http请求的handler
                                    .addLast(new WebSocketServerProtocolHandler(path))  //专门用来处理websocket请求的handler
                                    .addLast(new MyWebsocketHandler());  // 自定义的websocket处理handler处理类
                        }
                    });

            ChannelFuture future = bootstrap.bind(port).sync();
            if (future.isSuccess()){
                System.out.println("websocket服务启动成功,端口为:" + port);
            }

            ChannelFuture closeFuture = future.channel().closeFuture().sync();// 等待连接关闭
            if (closeFuture.isSuccess()){
                System.out.println("websocket服务被关闭!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

最重要的socketChannel.pipeline中的每一个handler都做了注释;

4、然后就是所有netty开发的项目中最重要的环节——自定义Handler:

@Slf4j
public class MyWebsocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 维护后期所有链接上来的客户端的channel信息,相当于连接池
    private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 当有客户端连接时,执行的逻辑 ——> 需要将连上的客户端记录下来,并通知其它用户
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel newConnect = ctx.channel();

        //通知其它所有的用户,有新用户上线
        log.info("[欢迎]" + newConnect.remoteAddress() + "进入聊天室");
        channels.forEach(ch -> {
            if (ch != newConnect){
                ch.writeAndFlush("[欢迎]" + newConnect.remoteAddress() + "进入聊天室");
            }
        });

        channels.add(newConnect);
    }

    // 当有客户端断开连接时,执行的逻辑 ——> 将连接移除,并通知其他用户
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel leaveConnect = ctx.channel();

        log.info("[再见]" + leaveConnect.remoteAddress() + "离开聊天室");
        //通知其它所有的用户,有用户下线
        channels.forEach(ch -> {
            if (ch != leaveConnect){
                ch.writeAndFlush("[再见]" + leaveConnect.remoteAddress() + "离开聊天室");
            }
        });

        channels.remove(leaveConnect);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel causeChannel = ctx.channel();
        log.error(causeChannel.remoteAddress() + ":异常退出:" + cause.getCause());
        causeChannel.close();
        channels.remove(causeChannel);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        Channel incoming = ctx.channel();

        log.info("用户<" + incoming.remoteAddress() + ">说:" + msg.text());
        //通知其它用户
        channels.forEach(ch -> {
            if (ch != incoming){
                ch.writeAndFlush(new TextWebSocketFrame("用户" + incoming.remoteAddress() + "说:" + msg.text()));
            } else {
                ch.writeAndFlush(new TextWebSocketFrame("我说:" + msg.text()));
            }
        });
    }
}

二、spring-boot-starter-websocket实现

1、引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.5.7</version>
</dependency>

2、通过实现 WebsocketConfigurer ,完成自定义handler和拦截器的配置:

@Configuration
@EnableWebSocket  //重要,容易忘
public class WebSocketConfig implements WebSocketConfigurer {
    @Value("${websocket.http}")
    private String path;

    @Resource
    private ChatHandler chatHandler;

    @Resource
    private ChatInterceptor chatInterceptor;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatHandler, path)  // 配置主处理逻辑
                .addInterceptors(chatInterceptor) // 握手拦截器,可以做一些连接校验等
                .setAllowedOrigins("*"); // 解决跨域
    }
}

3、自定义的拦截器ChatHandler(字节重写对应的方法即可):

@Slf4j
@Component
public class ChatHandler extends TextWebSocketHandler {

    public static AtomicInteger onlineNum = new AtomicInteger(0);
    public static CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();


    /**
     * 连接建立时
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String sessionId = session.getId();
        log.info("【欢迎】用户【" + sessionId + "】上线了!");
        sessions.forEach(s -> {
            if (s.getId() != sessionId){
                try {
                    s.sendMessage(new TextMessage("【欢迎】用户【" + sessionId + "】上线了!"));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        sessions.add(session);
        onlineNum.getAndIncrement();
    }

    /**
     * 收到消息是时
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        String sessionId = session.getId();

        sessions.forEach(s -> {
            try {
                if (s.getId() != sessionId){
                    s.sendMessage(new TextMessage("用户【" + sessionId + "】说:" + message.getPayload()));
                }else {
                    s.sendMessage(new TextMessage("【我】说:" + message.getPayload()));
                }
            }catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 发生错误时
     * @param session
     * @param exception
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {

    }

    /**
     * 连接关闭后
     * @param session
     * @param closeStatus
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        String sessionId = session.getId();
        log.info("【再见】用户【" + sessionId + "】下线了");
        sessions.forEach(s -> {
            if (s.getId() != sessionId){
                try {
                    s.sendMessage(new TextMessage("【再见】用户【" + sessionId + "】上线了!"));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        sessions.remove(session);
        onlineNum.getAndDecrement();
    }

    /**
     *
     * @return
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}

4、自定义握手拦截,完成一些校验之类的工作:

@Component
public class ChatInterceptor extends HttpSessionHandshakeInterceptor {
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//        return false;
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

    }
}

其实,除了通过实现WebsocketHandler,并重写其中对应的方法外,还提供了一种注解方式的实现,只需要在对应的方法上添加以下注解,皆可实现与上面handler相同的功能,更加灵活!

@OnOpen
@OnClose
@OnError
@OnMessage

以上功能的自测截图,略!还可以通过一个websocket给另外一个websocket推送外部消息,很灵活!

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐