本文websocket使用netty实现,原因:因为netty提供有心跳组件,方便实现心跳检测
一、添加依赖
在springboot项目中添加下面两个依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty-all.version}</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>${hutool.version}</version> </dependency>
|
二、netty的基本实现
netty服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| package com.aixbox.websocketdemo.webSocket;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.NettyRuntime; import io.netty.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct; import javax.annotation.PreDestroy;
@Slf4j @Configuration public class NettyWebSocketServer { public static final int WEB_SOCKET_PORT = 8090; public static final NettyWebSocketServerHandler NETTY_WEB_SOCKET_SERVER_HANDLER = new NettyWebSocketServerHandler(); private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());
@PostConstruct public void start() throws InterruptedException { run(); }
@PreDestroy public void destroy() { Future<?> future = bossGroup.shutdownGracefully(); Future<?> future1 = workerGroup.shutdownGracefully(); future.syncUninterruptibly(); future1.syncUninterruptibly(); log.info("关闭 ws server 成功"); }
public void run() throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new IdleStateHandler(30, 0, 0)); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler("/")); pipeline.addLast(NETTY_WEB_SOCKET_SERVER_HANDLER); } }); serverBootstrap.bind(WEB_SOCKET_PORT).sync(); }
}
|
websocket处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.aixbox.websocketdemo.webSocket;
import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONUtil; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j;
@Slf4j @Sharable public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("收到客户端消息:{}", msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息:" + msg.text())); } }
|
:::info
在上面的NettyWebSocketServer创建了一个run方法用来启动netty服务器,并且使用@PostConstruct注解在项目启动时启动,使用@PreDestroy注解在项目关闭时关闭线程。
在run方法中设置了netty服务器的参数,以及设置了webSocket处理器。
:::
:::info
在NettyWebSocketServerHandler方法中则是接收了请求发送的消息以及给请求方推送接收到了消息
:::
使用apifox测试

三、添加心跳机制
:::info
因为前端用户如果下线,后端是无法感知的,所以需要维持一个心跳,前端一定时间发送一个心跳包维持心跳,我们在前门的run方法中已经添加了 pipeline.addLast(new IdleStateHandler(30, 0, 0));处理器,在30秒前端没有发送消息,这个处理器就会发送一个事件,我们通过检测事件关闭连接
:::
在NettyWebSocketServerHandler处理心跳,30秒没消息关闭连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package com.aixbox.websocketdemo.webSocket;
import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONUtil; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j;
@Slf4j @Sharable public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { ctx.channel().close(); } } super.userEventTriggered(ctx, evt); }
@Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("收到客户端消息:{}", msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息:" + msg.text())); } }
|
使用apifox测试
使用apifox测试,可以看到30秒没有消息,连接关闭了

四、管理WebSocket连接的channel
webSocket处理器添加握手完成后的事件中间channel保存到map中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| package com.aixbox.websocketdemo.webSocket;
import cn.hutool.extra.spring.SpringUtil; import com.aixbox.websocketdemo.service.WebSocketService; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j;
@Slf4j @Sharable public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private WebSocketService webSocketService;
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.webSocketService = getService(); }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { ctx.channel().close(); } } else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { this.webSocketService.connect(ctx.channel()); } super.userEventTriggered(ctx, evt); }
@Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("收到客户端消息:{}", msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息:" + msg.text())); }
private WebSocketService getService() { return SpringUtil.getBean(WebSocketService.class); } }
|
WebSocketServiceImpl实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.aixbox.websocketdemo.service.impl;
import com.aixbox.websocketdemo.service.WebSocketService; import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
@Component @Slf4j public class WebSocketServiceImpl implements WebSocketService {
private static final ConcurrentHashMap<Channel, String> ONLINE_WS_MAP = new ConcurrentHashMap<>();
@Override public void connect(Channel channel) { ONLINE_WS_MAP.put(channel, ""); } }
|
下篇在vue项目中使用websocket