本文主要是介绍Netty服务器结合WebSocke协议监听和接收数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
- 1.pom依赖
- 2.配置属性
- 3.创建netty服务器
- 4.建立监听和响应
- 5.创建启动器
- 6.前端static下页面
- 7.前端js
- 8.注意异常问题
- 9.创建netty服务器--使用守护线程
1.pom依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><!-- 根据需要选择版本 --><version>4.1.86.Final</version> </dependency>
2.配置属性
application.properties
#启动端口
server.port=8088
server.servlet.context-path=/rxtxcommon
#logging.level.web=DEBUGnetty.server.port=8023
//根据实际情况分配
netty.server.bossThreads=1
//根据实际情况分配
netty.server.workerThreads=4
3.创建netty服务器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;import java.util.Objects;
import java.util.concurrent.TimeUnit;@Slf4j
public class NettyServer {private Integer port;private Integer boosThreads = 1;private Integer workerThreads;private Channel channel;private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;public NettyServer(Integer port, Integer boosThreads, Integer workerThreads) throws InterruptedException{this.port = port;this.boosThreads = boosThreads;this.workerThreads = workerThreads;this.init();}private void init() throws InterruptedException {//1.创建一个服务端的引导类ServerBootstrap bootstrap = new ServerBootstrap();//2.创建反应器事件轮询组//boss轮询组(负责处理父通道:连接/接收监听(如NioServerSocketChannel))bossGroup = new NioEventLoopGroup(boosThreads);//workerGroup轮询组(负责处理子通道:读/写监听(如NioSocketChannel))if(Objects.nonNull(this.workerThreads) && this.workerThreads > 0){workerGroup= new NioEventLoopGroup(this.workerThreads);}else{//线程数默认为cpu核心数的2倍workerGroup = new NioEventLoopGroup();}//3.设置父子轮询组bootstrap.group(bossGroup, workerGroup);//4.设置传输通道类型,Netty不仅支持Java NIO,也支持阻塞式的OIObootstrap.channel(NioServerSocketChannel.class);//5.设置监听端口bootstrap.localAddress(new InetSocketAddress(this.port));//6.设置通道参数bootstrap.option(ChannelOption.SO_KEEPALIVE, true);bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);//7.装配子通道的Pipeline流水线bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();//添加对byte数组的编解码,是由netty提供的// pipeline.addLast(new ByteArrayDecoder()); //入站// pipeline.addLast(new ByteArrayEncoder()); //出站pipeline.addLast("http-codec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//负责WebSocket协议相关的握手处理和数据帧的编解码。当你创建一个WebSocket服务器时,这个处理器会处理来自客户端的WebSocket握手请求,完成握手过程,并在握手成功后,将连接转换为WebSocket连接。参数 "/websocket" 通常代表WebSocket连接的路径pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));pipeline.addLast("http-chunked", new ChunkedWriteHandler());//添加自定义处理器 pipeline.addLast(new SocketInboundHandler());}});//8.开始绑定端口,并通过调用sync()同步方法阻塞直到绑定成功ChannelFuture future = bootstrap.bind().sync();log.info("服务端启动成功,监听端口:{}", this.port);channel=future.channel();//9.自我阻塞,直到监听通道关闭ChannelFuture closeFuture = future.channel().closeFuture();closeFuture.sync();log.info("服务端已停止运行");//10.释放所有资源,包括创建的反应器线程workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();log.info("已释放服务端占用资源");}@PreDestroypublic void stop() {System.out.println("关闭netty服务器");if (channel != null) {channel.close();}if (workerGroup != null) {workerGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}if (bossGroup != null) {bossGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}}}
4.建立监听和响应
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class SocketInboundHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//主要用于处理接收数据的某些事件。它支持泛型的消息处理,并在默认情况下,当消息处理完毕后会自动释放。/*** 读取客户端发送来的消息*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器返回"+textWebSocketFrame.text()));}/*** 接入客户端*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("新客户端连接:{}", ctx.channel().id().asShortText());}/*** 断开客户端*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接断开:{}", ctx.channel().id().asShortText());}/*** 异常处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("客户端异常:{}", cause.getMessage(), cause);ctx.channel().close();}
}
5.创建启动器
@Slf4j
@Component
public class NettyStarter implements ApplicationRunner {
//ApplicationRunner 实现这个接口,不会阻断springboot启动应用线程的阻塞@Value("${netty.server.port}")private Integer port;@Value("${netty.server.bossThreads}")private Integer bossThreads;@Value("${netty.server.workerThreads:-1}")private Integer workerThreads;@Overridepublic void run(ApplicationArguments args) throws Exception {try{new NettyServer(port, bossThreads, workerThreads);}catch (Exception e){log.info("服务端启动异常:{}", e.getMessage(), e);}}
}
6.前端static下页面
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>WebSocket Client</title><script src="sockt.js"></script>
</head>
<body>
<h1>WebSocket Client</h1>
<button onclick="connect()">Connect1</button>
<button onclick="sendMessage()">Send Message1</button>
<button onclick="connect2()">Connect2</button>
<button onclick="sendMessage2()">Send Message2</button>
<ul id="messages"></ul></body>
</html>
7.前端js
var ws;
var ws2;
function connect() {debugger// 假设你的WebSocket服务器运行在以下URL,并且WebSocket的端点是"/websocket"var wsUrl = 'ws://localhost或者ip地址:8023/websocket';// 创建WebSocket连接ws = new WebSocket(wsUrl);// 监听连接打开事件ws.onopen = function(event) {console.log('WebSocket连接已打开');displayMessage('Connected to server.');};// 监听从服务器接收到的消息事件ws.onmessage = function(event) {console.log('收到来自服务器的消息:', event.data);displayMessage('Received: ' + event.data);};// 监听连接关闭事件ws.onclose = function(event) {console.log('WebSocket连接已关闭');displayMessage('Connection closed.');};// 监听连接错误事件ws.onerror = function(error) {console.error('WebSocket发生错误', error);displayMessage('WebSocket Error: ' + error.message);};
}function connect2() {debugger// 假设你的WebSocket服务器运行在以下URL,并且WebSocket的端点是"/websocket"var wsUrl = 'ws://localhost或者ip地址:8023/websocket';// 创建WebSocket连接ws2 = new WebSocket(wsUrl);// 监听连接打开事件ws2.onopen = function(event) {console.log('WebSocket连接已打开');displayMessage('Connected to server.');};// 监听从服务器接收到的消息事件ws2.onmessage = function(event) {console.log('收到服务器2信息:', event.data);displayMessage('收到服务器2信息: ' + event.data);};// 监听连接关闭事件ws2.onclose = function(event) {console.log('WebSocket连接已关闭');displayMessage('Connection closed.');};// 监听连接错误事件ws2.onerror = function(error) {console.error('WebSocket发生错误', error);displayMessage('WebSocket Error: ' + error.message);};
}function sendMessage() {if (ws.readyState === WebSocket.OPEN) {var message = prompt('请输入要发送的消息:');ws.send(message);displayMessage('Sent: ' + message);} else {displayMessage('WebSocket连接未打开,请先连接。');}
}
function sendMessage2() {if (ws2.readyState === WebSocket.OPEN) {var message = prompt('请输入要发送的消息:');ws2.send(message);displayMessage('Sent: ' + message);} else {displayMessage('WebSocket连接未打开,请先连接。');}
}
function displayMessage(message) {var messagesList = document.getElementById('messages');var li = document.createElement('li');li.textContent = message;messagesList.appendChild(li);
}
8.注意异常问题
1.不用InitializingBean接口初始化–阻塞springboot应用线程启动创建
2.不用 @PostConstruct执行初始化方法建立接口绑定–阻塞springboot应用线程启动创建
在类的方法上使用@PostConstruct注解,可以确保该方法在依赖注入完成后立即执行。这也可以作为启动Netty服务器的一个点。但是,需要注意的是,@PostConstruct在单例Bean的初始化阶段执行,可能早于ApplicationRunner或CommandLineRunner。
3.用异步注解@Async–没有尝试过
3.创建netty过程中,启动时用ApplicationRunner --正确
4.创建netty过程中,启动时用CommandLineRunner–正确
无论选择哪种方式,都需要确保Netty服务器的启动不会阻塞Spring Boot的主线程。如果需要,可以考虑使用线程池来管理Netty的IO操作,以避免阻塞。
9.创建netty服务器–使用守护线程
import com.groupname.rxtxcommon.socket.handler.WebSocketHandler2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;@Component
//public class NettyWebSocketServer implements ApplicationRunner {
public class NettyWebSocketServer {private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private Channel channel;private static Thread server;
// @PostConstruct 这个注解会阻塞springboot主线程运行@PostConstructpublic synchronized void start() {if (server!=null) return;server = new Thread(() -> {bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));// 添加你的自定义HTTP处理器pipeline.addLast(new WebSocketHandler2());}});ChannelFuture future = bootstrap.bind(8023).sync();// 阻塞当前线程,直到服务器Channel关闭System.out.println("开启netty服务器111");channel=future.channel();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();stop();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}});server.setDaemon(true);server.start();}@PreDestroypublic void stop() {System.out.println("关闭netty服务器");if (channel != null) {channel.close();}if (workerGroup != null) {workerGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}if (bossGroup != null) {bossGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}}// @Override
// public void run(ApplicationArguments args) throws Exception {
// start();
// }
}
这篇关于Netty服务器结合WebSocke协议监听和接收数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!