Netty服务器结合WebSocke协议监听和接收数据

2024-03-18 14:44

本文主要是介绍Netty服务器结合WebSocke协议监听和接收数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

  • 1.pom依赖
  • 2.配置属性
  • 3.创建netty服务器
  • 4.建立监听和响应
  • 5.创建启动器
  • 6.前端static下页面
  • 7.前端js
  • 8.注意异常问题
  • 9.创建netty服务器--使用守护线程

1.pom依赖

        <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><!-- 根据需要选择版本 --><version>4.1.86.Final</version> </dependency>

2.配置属性

application.properties

#启动端口
server.port=8088
server.servlet.context-path=/rxtxcommon
#logging.level.web=DEBUGnetty.server.port=8023
//根据实际情况分配
netty.server.bossThreads=1
//根据实际情况分配
netty.server.workerThreads=4

3.创建netty服务器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;import java.util.Objects;
import java.util.concurrent.TimeUnit;@Slf4j
public class NettyServer {private Integer port;private Integer boosThreads = 1;private Integer workerThreads;private Channel channel;private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;public NettyServer(Integer port, Integer boosThreads, Integer workerThreads) throws InterruptedException{this.port = port;this.boosThreads = boosThreads;this.workerThreads = workerThreads;this.init();}private void init() throws InterruptedException {//1.创建一个服务端的引导类ServerBootstrap bootstrap = new ServerBootstrap();//2.创建反应器事件轮询组//boss轮询组(负责处理父通道:连接/接收监听(如NioServerSocketChannel))bossGroup = new NioEventLoopGroup(boosThreads);//workerGroup轮询组(负责处理子通道:读/写监听(如NioSocketChannel))if(Objects.nonNull(this.workerThreads) && this.workerThreads > 0){workerGroup= new NioEventLoopGroup(this.workerThreads);}else{//线程数默认为cpu核心数的2倍workerGroup = new NioEventLoopGroup();}//3.设置父子轮询组bootstrap.group(bossGroup, workerGroup);//4.设置传输通道类型,Netty不仅支持Java NIO,也支持阻塞式的OIObootstrap.channel(NioServerSocketChannel.class);//5.设置监听端口bootstrap.localAddress(new InetSocketAddress(this.port));//6.设置通道参数bootstrap.option(ChannelOption.SO_KEEPALIVE, true);bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);//7.装配子通道的Pipeline流水线bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();//添加对byte数组的编解码,是由netty提供的//        pipeline.addLast(new ByteArrayDecoder()); //入站//        pipeline.addLast(new ByteArrayEncoder()); //出站pipeline.addLast("http-codec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//负责WebSocket协议相关的握手处理和数据帧的编解码。当你创建一个WebSocket服务器时,这个处理器会处理来自客户端的WebSocket握手请求,完成握手过程,并在握手成功后,将连接转换为WebSocket连接。参数 "/websocket" 通常代表WebSocket连接的路径pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));pipeline.addLast("http-chunked", new ChunkedWriteHandler());//添加自定义处理器 pipeline.addLast(new SocketInboundHandler());}});//8.开始绑定端口,并通过调用sync()同步方法阻塞直到绑定成功ChannelFuture future = bootstrap.bind().sync();log.info("服务端启动成功,监听端口:{}", this.port);channel=future.channel();//9.自我阻塞,直到监听通道关闭ChannelFuture closeFuture = future.channel().closeFuture();closeFuture.sync();log.info("服务端已停止运行");//10.释放所有资源,包括创建的反应器线程workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();log.info("已释放服务端占用资源");}@PreDestroypublic void stop() {System.out.println("关闭netty服务器");if (channel != null) {channel.close();}if (workerGroup != null) {workerGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}if (bossGroup != null) {bossGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}}}

4.建立监听和响应

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class SocketInboundHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//主要用于处理接收数据的某些事件。它支持泛型的消息处理,并在默认情况下,当消息处理完毕后会自动释放。/*** 读取客户端发送来的消息*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器返回"+textWebSocketFrame.text()));}/*** 接入客户端*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("新客户端连接:{}", ctx.channel().id().asShortText());}/*** 断开客户端*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接断开:{}", ctx.channel().id().asShortText());}/*** 异常处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("客户端异常:{}", cause.getMessage(), cause);ctx.channel().close();}
}

5.创建启动器

@Slf4j
@Component
public class NettyStarter implements ApplicationRunner {
//ApplicationRunner 实现这个接口,不会阻断springboot启动应用线程的阻塞@Value("${netty.server.port}")private Integer port;@Value("${netty.server.bossThreads}")private Integer bossThreads;@Value("${netty.server.workerThreads:-1}")private Integer workerThreads;@Overridepublic void run(ApplicationArguments args) throws Exception {try{new NettyServer(port, bossThreads, workerThreads);}catch (Exception e){log.info("服务端启动异常:{}", e.getMessage(), e);}}
}

6.前端static下页面

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>WebSocket Client</title><script src="sockt.js"></script>
</head>
<body>
<h1>WebSocket Client</h1>
<button onclick="connect()">Connect1</button>
<button onclick="sendMessage()">Send Message1</button>
<button onclick="connect2()">Connect2</button>
<button onclick="sendMessage2()">Send Message2</button>
<ul id="messages"></ul></body>
</html>

7.前端js


var ws;
var ws2;
function connect() {debugger// 假设你的WebSocket服务器运行在以下URL,并且WebSocket的端点是"/websocket"var wsUrl = 'ws://localhost或者ip地址:8023/websocket';// 创建WebSocket连接ws = new WebSocket(wsUrl);// 监听连接打开事件ws.onopen = function(event) {console.log('WebSocket连接已打开');displayMessage('Connected to server.');};// 监听从服务器接收到的消息事件ws.onmessage = function(event) {console.log('收到来自服务器的消息:', event.data);displayMessage('Received: ' + event.data);};// 监听连接关闭事件ws.onclose = function(event) {console.log('WebSocket连接已关闭');displayMessage('Connection closed.');};// 监听连接错误事件ws.onerror = function(error) {console.error('WebSocket发生错误', error);displayMessage('WebSocket Error: ' + error.message);};
}function connect2() {debugger// 假设你的WebSocket服务器运行在以下URL,并且WebSocket的端点是"/websocket"var wsUrl = 'ws://localhost或者ip地址:8023/websocket';// 创建WebSocket连接ws2 = new WebSocket(wsUrl);// 监听连接打开事件ws2.onopen = function(event) {console.log('WebSocket连接已打开');displayMessage('Connected to server.');};// 监听从服务器接收到的消息事件ws2.onmessage = function(event) {console.log('收到服务器2信息:', event.data);displayMessage('收到服务器2信息: ' + event.data);};// 监听连接关闭事件ws2.onclose = function(event) {console.log('WebSocket连接已关闭');displayMessage('Connection closed.');};// 监听连接错误事件ws2.onerror = function(error) {console.error('WebSocket发生错误', error);displayMessage('WebSocket Error: ' + error.message);};
}function sendMessage() {if (ws.readyState === WebSocket.OPEN) {var message = prompt('请输入要发送的消息:');ws.send(message);displayMessage('Sent: ' + message);} else {displayMessage('WebSocket连接未打开,请先连接。');}
}
function sendMessage2() {if (ws2.readyState === WebSocket.OPEN) {var message = prompt('请输入要发送的消息:');ws2.send(message);displayMessage('Sent: ' + message);} else {displayMessage('WebSocket连接未打开,请先连接。');}
}
function displayMessage(message) {var messagesList = document.getElementById('messages');var li = document.createElement('li');li.textContent = message;messagesList.appendChild(li);
}

8.注意异常问题

1.不用InitializingBean接口初始化–阻塞springboot应用线程启动创建
2.不用 @PostConstruct执行初始化方法建立接口绑定–阻塞springboot应用线程启动创建

在类的方法上使用@PostConstruct注解,可以确保该方法在依赖注入完成后立即执行。这也可以作为启动Netty服务器的一个点。但是,需要注意的是,@PostConstruct在单例Bean的初始化阶段执行,可能早于ApplicationRunner或CommandLineRunner。
3.用异步注解@Async–没有尝试过
3.创建netty过程中,启动时用ApplicationRunner --正确
4.创建netty过程中,启动时用CommandLineRunner–正确
无论选择哪种方式,都需要确保Netty服务器的启动不会阻塞Spring Boot的主线程。如果需要,可以考虑使用线程池来管理Netty的IO操作,以避免阻塞。

9.创建netty服务器–使用守护线程

import com.groupname.rxtxcommon.socket.handler.WebSocketHandler2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;@Component
//public class NettyWebSocketServer implements ApplicationRunner {
public class NettyWebSocketServer {private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private Channel channel;private static Thread server;
//    @PostConstruct 这个注解会阻塞springboot主线程运行@PostConstructpublic synchronized void start() {if (server!=null) return;server = new Thread(() -> {bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));// 添加你的自定义HTTP处理器pipeline.addLast(new WebSocketHandler2());}});ChannelFuture future = bootstrap.bind(8023).sync();// 阻塞当前线程,直到服务器Channel关闭System.out.println("开启netty服务器111");channel=future.channel();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();stop();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}});server.setDaemon(true);server.start();}@PreDestroypublic void stop() {System.out.println("关闭netty服务器");if (channel != null) {channel.close();}if (workerGroup != null) {workerGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}if (bossGroup != null) {bossGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);}}//    @Override
//    public void run(ApplicationArguments args) throws Exception {
//        start();
//    }
}

这篇关于Netty服务器结合WebSocke协议监听和接收数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/822667

相关文章

Apache Tomcat服务器版本号隐藏的几种方法

《ApacheTomcat服务器版本号隐藏的几种方法》本文主要介绍了ApacheTomcat服务器版本号隐藏的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1. 隐藏HTTP响应头中的Server信息编辑 server.XML 文件2. 修China编程改错误

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

Python如何实现 HTTP echo 服务器

《Python如何实现HTTPecho服务器》本文介绍了如何使用Python实现一个简单的HTTPecho服务器,该服务器支持GET和POST请求,并返回JSON格式的响应,GET请求返回请求路... 一个用来做测试的简单的 HTTP echo 服务器。from http.server import HT

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

如何安装 Ubuntu 24.04 LTS 桌面版或服务器? Ubuntu安装指南

《如何安装Ubuntu24.04LTS桌面版或服务器?Ubuntu安装指南》对于我们程序员来说,有一个好用的操作系统、好的编程环境也是很重要,如何安装Ubuntu24.04LTS桌面... Ubuntu 24.04 LTS,代号 Noble NumBAT,于 2024 年 4 月 25 日正式发布,引入了众

如何提高Redis服务器的最大打开文件数限制

《如何提高Redis服务器的最大打开文件数限制》文章讨论了如何提高Redis服务器的最大打开文件数限制,以支持高并发服务,本文给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录如何提高Redis服务器的最大打开文件数限制问题诊断解决步骤1. 修改系统级别的限制2. 为Redis进程特别设置限制

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

vue如何监听对象或者数组某个属性的变化详解

《vue如何监听对象或者数组某个属性的变化详解》这篇文章主要给大家介绍了关于vue如何监听对象或者数组某个属性的变化,在Vue.js中可以通过watch监听属性变化并动态修改其他属性的值,watch通... 目录前言用watch监听深度监听使用计算属性watch和计算属性的区别在vue 3中使用watchE

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r