Netty实现WebSocket及分布式解决方案

2024-09-02 08:52

本文主要是介绍Netty实现WebSocket及分布式解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在项目中面临了两个关键需求:一是实时数据获取,二是轻量级的即时通讯功能。传统的轮询机制虽然也能够从服务器获取数据,但它存在明显的不足:首先,它无法实现真正的实时性,;其次,频繁的请求会占用宝贵的客户端资源,影响用户体验,并增加服务器的负载。

针对这些挑战,我们选择了WebSocket协议作为解决方案。WebSocket提供了一种持久的连接方式,允许服务器主动、即时地向客户端推送最新数据,从而确保了信息的实时更新。这种全双工通信机制不仅提高了数据传输的效率,还显著降低了客户端的资源消耗。同时通过WebSocket,我们能够构建一个轻量级、响应迅速的通讯平台,简单的实现了用户间的互动。

WebSocket与Http对比

  • 协议类型
    • HTTP:是一个无状态、请求/响应式的协议,通常用于客户端(如浏览器)和服务器之间的一次性事务处理。它是基于TCP的文本协议,用于分布式、协作式、超媒体信息系统。
    • WebSocket:是一个持久化的协议,提供了全双工通信机制。它允许服务器主动向客户端发送消息,而不需要客户端再次发起请求。WebSocket 也是基于TCP的,但它在建立连接后可以保持长连接状态。
  • 头部开销:
    • HTTP:每次请求和响应都需要携带完整的头部信息,这在频繁通信时会增加数据传输的开销。
    • WebSocket:一旦连接建立,后续的消息传输不需要携带HTTP头部,只有较小的数据包头,这减少了通信的开销。
  • 安全性:
    • HTTP:可以通过HTTPS来提供加密的传输。
    • WebSocket:也有对应的安全版本称为WebSocket Secure(WSS),它在WebSocket的基础上增加了TLS/SSL加密。
  • 握手过程:
    • HTTP:客户端发送请求,服务器响应请求,然后连接关闭或保持(取决于是否keep-alive)。
    • WebSocket:客户端发送一个特殊的Upgrade请求来初始化WebSocket连接,服务器响应并升级连接为WebSocket连接。
WebSocket握手过程示例
ws://ip:port/ws
请求方法: GET
状态代码: 101 Switching Protocols

请求报文

Request:
GET ws://ip:port/ws HTTP/1.1
Host: host
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: 
Upgrade: websocket
Origin: http://ip:port
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
Sec-WebSocket-Key: o88HTN24liAvfOOFHhVyKQ==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

响应报文

HTTP/1.1 101 Switching Protocols
Server: nginx/1.20.2
Date: Mon, 01 Aug 2024 09:43:58 GMT
Connection: upgrade
upgrade: websocket
sec-websocket-accept: FMID9mDELWawU35fGSFJ/1H2790=

netty实现websocket

为什么选择netty实现websocket服务端?

  • 高性能和高并发:Netty是一个基于NIO的异步事件驱动的网络应用框架,它提供了高性能的网络通信能力。
  • 简化的编程模型:Netty提供了简化的编程模型,通过ChannelHandler和ChannelPipeline等组件,开发者可以方便地构建复杂的网络通信逻辑。这种模型不仅简化了代码,还提高了代码的可维护性和可读性。
  • 协议支持:Netty支持多种协议,包括HTTP、HTTPS、TCP、UDP等,这意味着可以在Netty中轻松实现WebSocket协议的支持。
自定义handler
@ChannelHandler.Sharable
public class WebSocketBusinessHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {// 处理客户端传输过来的消息String content = msg.text();log.debug("接收到消息:{}", content);Response resp = execute(content);ctx.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(resp)));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("处理命令错误", cause);Response response = Response.fail(((BusinessException) cause).getErrCode());ctx.channel().writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(response)));}
}
实现ChannelInitializer.initChannel方法
ChannelPipeline pipeline = ch.pipeline();// websocket 基于http协议,所以要有http编解码器
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
// ====================== 以上是用于支持http协议    ======================
//可以增加一些内部接口// ====================== 以下是支持httpWebsocket ======================
// WebSocket 数据压缩扩展
pipeline.addLast(new WebSocketServerCompressionHandler());
/*** websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws* 本handler会帮你处理一些繁重的复杂的事* 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳* 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义的handler
pipeline.addLast(new WebSocketBusinessHandler());
服务端启动
EventLoopGroup mainGroup = new NioEventLoopGroup();
EventLoopGroup subGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(mainGroup, subGroup).channel(NioServerSocketChannel.class).childHandler(initializer);ChannelFuture channelFuture = serverBootstrap.bind(port).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
} finally {mainGroup.shutdownGracefully();subGroup.shutdownGracefully();
}

在nginx、ingress暴露出来

# ingress暴露ws
- backend:serviceName: websocketservicePort: websocketpath: /wspathType: ImplementationSpecific
  http{# websocket# 根据客户端请求中 $http_upgrade 的值,来构造改变 $connection_upgrade 的值#  $connection_upgrade 的值会一直是 upgrade。然后如果 $http_upgrade 为空字符串的话,那值会是 close。map $http_upgrade $connection_upgrade {default upgrade;'' close;}}location  ^~  /ws{proxy_pass http://test.com;proxy_redirect    off;proxy_set_header X-Real-IP $remote_addr;proxy_set_header Host $host;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection $connection_upgrade;}

nginx -s reload

分布式解决方案

WebSocket是有状态协议的,客户端连接服务器时只和集群中一个节点连接,数据传输过程中也只与这一节点通信。因此,WebSocket集群需要解决会话共享的问题。如果只采用单节点部署,虽然可以避免这一问题,但无法水平扩展支撑更高负载,有单点的风险。
WebSocket是有状态的,无法像直接HTTP以集群方式实现负载均衡,长连接建立后即与服务端某个节点保持着会话,因此集群下想要得知会话属于哪个节点,有两种方案,一种是使用类似微服务的注册中心来维护全局的会话映射关系,一种是使用事件广播由各节点自行判断是否持有会话。

以方案二为例:

连接管理
/*** WebSocket的连接对象。*/
public class WebSocketConn {/*** 通道*/private Channel channel;/*** 应用标签 application*/private String ;/*** 人员id*/private Long personId;/*** 人员名称*/private String personName;public WebSocketConn(Channel channel) {this.channel = channel;}public void bind(String appId, Long personId, String personName) {this.app = appId;this.personId = personId;this.personName = personName;}/*** 查询符合的数据*/public boolean match(@NonNull String appId, @NonNull Long targetPersonId) {if(app.equals(appId) && personId.equals(targetPersonId)){return true;}return false;}public Channel getChannel() {return channel;}public String getApp() {return app;}
}public class WebSocketConnRegister {private Map<Channel, WebSocketConn> connMap = new ConcurrentHashMap<>();public  void register(WebSocketConn conn) {connMap.put(conn.getChannel(), conn);}public void unregister(Channel channel) {connMap.remove(channel);}public WebSocketConn getByChannel(Channel channel) {return connMap.get(channel);}public List<WebSocketConn> findByTarget(String application, Long personId) {return connMap.values().stream().filter(w -> w.match(application, personId)).collect(Collectors.toList());}}
连接接入或断开

WebSocketBusinessHandler 实现ChannelHandlerAdapter的handlerAdded、handlerRemoved方法

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.debug("有连接进入,连接channel_id:{}", ctx.channel().id().asLongText());WebSocketConn conn = new WebSocketConn(ctx.channel());register.register(conn);
}@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.debug("连接断开,连接channel:{}", ctx.channel().id().asLongText());register.unregister(ctx.channel());
}

WebSocketBusinessHandler.channelRead0 增加处理逻辑
1、根据收到消息来;
2、发送channel 不在此jvm实例中,则发送到消息队列中

WebSocketConn conn = register.getByChannel(ctx.channel());
if (conn != null) {Response<ClientCommandRet> resp = 处理业务逻辑;ctx.writeAndFlush(new TextWebSocketFrame(JsonUtils.toJson(resp)));return;
}else{// 不在当前实例,发送消息到RocketMQapplicationContext.getBean(RocketMQTemplate.class).convertAndSend("topicName",event);ctx.channel().close();
}

其它实例上监听topic后,同样查找目标发送channel,如果不存在,直接结束。

@Component
@RocketMQMessageListener(topic = "topicName",consumerGroup = "groupName",messageModel = MessageModel.BROADCASTING)
public static class WebsocketConsumerListener implements RocketMQListener<String> {public void onMessage(String event) {log.info("consume event: {}", event);try {List<WebSocketConn> webSocketConnList = register.findByTarget(event.getApplication(), event.getTargetPersonId());String msg = assembleMsg(event);webSocketConns.stream().parallel().forEach(conn -> sendMsg2Channel(msg, conn));} catch (Exception e) {e.printStackTrace();}}
}private void sendMsg2Channel(String msg, WebSocketConn conn) {try {conn.getChannel().writeAndFlush(new TextWebSocketFrame(msg));} catch (Exception e) {log.error("推送消息失败", e);try {conn.getChannel().close();} catch (Exception ex) {}}
}

注意:
1、rocketmq使用广播模式,因为每条消息每个节点都要消费一次。
2、kafka则要保证所有的partition都要订阅到。

List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor("topicName");

参考

http://www.ruanyifeng.com/blog/2017/05/websocket.html
http://coolaf.com/tool/chattest 测试工具

这篇关于Netty实现WebSocket及分布式解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1129528

相关文章

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义

opencv图像处理之指纹验证的实现

《opencv图像处理之指纹验证的实现》本文主要介绍了opencv图像处理之指纹验证的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、简介二、具体案例实现1. 图像显示函数2. 指纹验证函数3. 主函数4、运行结果三、总结一、

Springboot处理跨域的实现方式(附Demo)

《Springboot处理跨域的实现方式(附Demo)》:本文主要介绍Springboot处理跨域的实现方式(附Demo),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录Springboot处理跨域的方式1. 基本知识2. @CrossOrigin3. 全局跨域设置4.

Linux samba共享慢的原因及解决方案

《Linuxsamba共享慢的原因及解决方案》:本文主要介绍Linuxsamba共享慢的原因及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux samba共享慢原因及解决问题表现原因解决办法总结Linandroidux samba共享慢原因及解决

Spring Boot 3.4.3 基于 Spring WebFlux 实现 SSE 功能(代码示例)

《SpringBoot3.4.3基于SpringWebFlux实现SSE功能(代码示例)》SpringBoot3.4.3结合SpringWebFlux实现SSE功能,为实时数据推送提供... 目录1. SSE 简介1.1 什么是 SSE?1.2 SSE 的优点1.3 适用场景2. Spring WebFlu

基于SpringBoot实现文件秒传功能

《基于SpringBoot实现文件秒传功能》在开发Web应用时,文件上传是一个常见需求,然而,当用户需要上传大文件或相同文件多次时,会造成带宽浪费和服务器存储冗余,此时可以使用文件秒传技术通过识别重复... 目录前言文件秒传原理代码实现1. 创建项目基础结构2. 创建上传存储代码3. 创建Result类4.

SpringBoot日志配置SLF4J和Logback的方法实现

《SpringBoot日志配置SLF4J和Logback的方法实现》日志记录是不可或缺的一部分,本文主要介绍了SpringBoot日志配置SLF4J和Logback的方法实现,文中通过示例代码介绍的非... 目录一、前言二、案例一:初识日志三、案例二:使用Lombok输出日志四、案例三:配置Logback一

Python如何使用__slots__实现节省内存和性能优化

《Python如何使用__slots__实现节省内存和性能优化》你有想过,一个小小的__slots__能让你的Python类内存消耗直接减半吗,没错,今天咱们要聊的就是这个让人眼前一亮的技巧,感兴趣的... 目录背景:内存吃得满满的类__slots__:你的内存管理小助手举个大概的例子:看看效果如何?1.

Python+PyQt5实现多屏幕协同播放功能

《Python+PyQt5实现多屏幕协同播放功能》在现代会议展示、数字广告、展览展示等场景中,多屏幕协同播放已成为刚需,下面我们就来看看如何利用Python和PyQt5开发一套功能强大的跨屏播控系统吧... 目录一、项目概述:突破传统播放限制二、核心技术解析2.1 多屏管理机制2.2 播放引擎设计2.3 专

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很