使用 Netty 实现 IM 聊天贼简单,看不懂就锤爆艿艿的狗头~

2023-11-09 18:51

本文主要是介绍使用 Netty 实现 IM 聊天贼简单,看不懂就锤爆艿艿的狗头~,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方“芋道源码”,选择“设为星标”

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 8:55 更新文章,每天掉亿点点头发...

源码精品专栏

 
  • 原创 | Java 2020 超神之路,很肝~

  • 中文详细注释的开源项目

  • RPC 框架 Dubbo 源码解析

  • 网络应用框架 Netty 源码解析

  • 消息中间件 RocketMQ 源码解析

  • 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析

  • 作业调度中间件 Elastic-Job 源码解析

  • 分布式事务中间件 TCC-Transaction 源码解析

  • Eureka 和 Hystrix 源码解析

  • Java 并发源码

  • 1. 概述

  • 2. 构建 Netty 服务端与客户端

  • 3. 通信协议

  • 4. 消息分发

  • 5. 断开重连

  • 6. 心跳机制与空闲检测

  • 7. 认证逻辑

  • 8. 单聊逻辑

  • 9. 群聊逻辑

  • 666. 彩蛋


本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labs 的 lab-67 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

1. 概述

在《芋道 Spring Boot WebSocket 入门》文章中,我们使用 WebSocket 实现了一个简单的 IM 功能,支持身份认证、私聊消息、群聊消息。

然后就有胖友私信艿艿,希望使用纯 Netty 实现一个类似的功能。良心的艿艿,当然不会给她发红人卡,因此就有了本文。可能有胖友不知道 Netty 是什么,这里简单介绍下:

Netty 是一个 Java 开源框架。

Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。

下面,我们来新建三个项目,如下图所示:

三个项目
  • lab-67-netty-demo-server 项目:搭建 Netty 服务端。

  • lab-67-netty-demo-client 项目:搭建 Netty 客户端。

  • lab-67-netty-demo-common 项目:提供 Netty 的基础封装,提供消息的编解码、分发的功能。

另外,我们也会提供 Netty 常用功能的示例:

  • 心跳机制,实现服务端对客户端的存活检测。

  • 断线重连,实现客户端对服务端的重新连接。

不哔哔,直接开干。

友情提示:可能会胖友担心,没有 Netty 基础是不是无法阅读本文?!

艿艿的想法,看!就硬看,按照代码先自己能搭建一下哈~文末,艿艿会提供一波 Netty 基础入门的文章。

2. 构建 Netty 服务端与客户端

本小节,我们先来使用 Netty 构建服务端与客户端的核心代码,让胖友对项目的代码有个初始的认知。

2.1 构建 Netty 服务端

创建 lab-67-netty-demo-server 项目,搭建 Netty 服务端。如下图所示:

项目结构

下面,我们只会暂时看看 server 包下的代码,避免信息量过大,击穿胖友的秃头。

2.1.1 NettyServer

创建 NettyServer 类,Netty 服务端。代码如下:

@Component
public class NettyServer {private Logger logger = LoggerFactory.getLogger(getClass());@Value("${netty.port}")private Integer port;@Autowiredprivate NettyServerHandlerInitializer nettyServerHandlerInitializer;/*** boss 线程组,用于服务端接受客户端的连接*/private EventLoopGroup bossGroup = new NioEventLoopGroup();/*** worker 线程组,用于服务端接受客户端的数据读写*/private EventLoopGroup workerGroup = new NioEventLoopGroup();/*** Netty Server Channel*/private Channel channel;/*** 启动 Netty Server*/@PostConstructpublic void start() throws InterruptedException {// <2.1> 创建 ServerBootstrap 对象,用于 Netty Server 启动ServerBootstrap bootstrap = new ServerBootstrap();// <2.2> 设置 ServerBootstrap 的各种属性bootstrap.group(bossGroup, workerGroup) // <2.2.1> 设置两个 EventLoopGroup 对象.channel(NioServerSocketChannel.class)  // <2.2.2> 指定 Channel 为服务端 NioServerSocketChannel.localAddress(new InetSocketAddress(port)) // <2.2.3> 设置 Netty Server 的端口.option(ChannelOption.SO_BACKLOG, 1024) // <2.2.4> 服务端 accept 队列的大小.childOption(ChannelOption.SO_KEEPALIVE, true) // <2.2.5> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能.childOption(ChannelOption.TCP_NODELAY, true) // <2.2.6> 允许较小的数据包的发送,降低延迟.childHandler(nettyServerHandlerInitializer);// <2> 绑定端口,并同步等待成功,即启动服务端ChannelFuture future = bootstrap.bind().sync();if (future.isSuccess()) {channel = future.channel();logger.info("[start][Netty Server 启动在 {} 端口]", port);}}/*** 关闭 Netty Server*/@PreDestroypublic void shutdown() {// <3.1> 关闭 Netty Serverif (channel != null) {channel.close();}// <3.2> 优雅关闭两个 EventLoopGroup 对象bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}

???? ① 在类上,添加 @Component 注解,把 NettyServer 的创建交给 Spring 管理。

  • port 属性,读取 application.yml 配置文件的 netty.port 配置项。

  • #start() 方法,添加 @PostConstruct 注解,启动 Netty 服务器。

  • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 服务器。

???? ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Server 的启动。

<2.1> 处,创建 ServerBootstrap 类,Netty 提供的服务器的启动类,方便我们初始化 Server。

<2.2> 处,设置 ServerBootstrap 的各种属性。

友情提示:这里涉及较多 Netty 组件的知识,艿艿先以简单的语言描述,后续胖友在文末的 Netty 基础入门的文章,补充学噢。

<2.2.1> 处,调用 #group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 方法,设置使用 bossGroupworkerGroup。其中:

  • bossGroup 属性:Boss 线程组,用于服务端接受客户端的连接

  • workerGroup 属性:Worker 线程组,用于服务端接受客户端的数据读写

Netty 采用的是多 Reactor 多线程的模型,服务端可以接受更多客户端的数据读写的能力。原因是:

  • 创建专门用于接受客户端连接bossGroup 线程组,避免因为已连接的客户端的数据读写频繁,影响新的客户端的连接。

  • 创建专门用于接收客户端读写workerGroup 线程组,多个线程进行客户端的数据读写,可以支持更多客户端。

课后习题:感兴趣的胖友,后续可以看看《【NIO 系列】——之 Reactor 模型》文章。

<2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioServerSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Socket 实现类。

<2.2.3> 处,调用 #localAddress(SocketAddress localAddress) 方法,设置服务端的端口

<2.2.4> 处,调用 option#(ChannelOption<T> option, T value) 方法,设置服务端接受客户端的连接队列大小。因为 TCP 建立连接是三次握手,所以第一次握手完成后,会添加到服务端的连接队列中。

课后习题:更多相关内容,后续可以看看《浅谈 TCP Socket 的 backlog 参数》文章。

<2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的心跳保活功能。

课后习题:更多相关内容,后续可以看看《TCP Keepalive 机制刨根问底》文章。

<2.2.6> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许较小的数据包的发送,降低延迟。

课后习题:更多相关内容,后续可以看看《详解 Socket 编程 --- TCP_NODELAY 选项》文章。

<2.2.7> 处,调用 #childHandler(ChannelHandler childHandler) 方法,设置客户端连接上来的 Channel 的处理器为 NettyServerHandlerInitializer。稍后我们在「2.1.2 NettyServerHandlerInitializer」小节来看看。

<2.3> 处,调用 #bind() + #sync() 方法,绑定端口,并同步等待成功,即启动服务端。

???? ③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Server 的关闭。

<3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Server,这样客户端就不再能连接了。

<3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

2.1.2 NettyServerHandlerInitializer

在看 NettyServerHandlerInitializer 的代码之前,我们需要先了解下 Netty 的 ChannelHandler 组件,用来处理 Channel 的各种事件。这里的事件很广泛,比如可以是连接、数据读写、异常、数据转换等等。

ChannelHandler 有非常多的子类,其中有个非常特殊的 ChannelInitializer,它用于 Channel 创建时,实现自定义的初始化逻辑。这里我们创建的 NettyServerHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,代码如下:

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {/*** 心跳超时时间*/private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;@Autowiredprivate MessageDispatcher messageDispatcher;@Autowiredprivate NettyServerHandler nettyServerHandler;@Overrideprotected void initChannel(Channel ch) {// <1> 获得 Channel 对应的 ChannelPipelineChannelPipeline channelPipeline = ch.pipeline();// <2> 添加一堆 NettyServerHandler 到 ChannelPipeline 中channelPipeline// 空闲检测.addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS))// 编码器.addLast(new InvocationEncoder())// 解码器.addLast(new InvocationDecoder())// 消息分发器.addLast(messageDispatcher)// 服务端处理器.addLast(nettyServerHandler);}}

在每一个客户端与服务端建立完成连接时,服务端会创建一个 Channel 与之对应。此时,NettyServerHandlerInitializer 会进行执行 #initChannel(Channel c) 方法,进行自定义的初始化。

友情提示:创建的客户端的 Channel,不要和「2.1.1 NettyServer」小节的 NioServerSocketChannel 混淆,不是同一个哈。

#initChannel(Channel ch) 方法的 ch 参数,就是此时创建的客户端 Channel。

<1> 处,调用 Channel 的 #pipeline() 方法,获得客户端 Channel 对应的 ChannelPipeline。ChannelPipeline 由一系列的 ChannelHandler 组成,又或者说是 ChannelHandler 。这样, Channel 所有上所有的事件都会经过 ChannelPipeline,被其上的 ChannelHandler 所处理。

<2> 处,添加五个 ChannelHandler 到 ChannelPipeline 中,每一个的作用看其上的注释。具体的,我们会在后续的小节详细解释。

2.1.3 NettyServerHandler

创建 NettyServerHandler 类,继承 ChannelInboundHandlerAdapter 类,实现客户端 Channel 建立连接、断开连接、异常时的处理。代码如下:

@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate NettyChannelManager channelManager;@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 从管理器中添加channelManager.add(ctx.channel());}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) {// 从管理器中移除channelManager.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);// 断开连接ctx.channel().close();}}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

channelManager 属性,是我们实现的客户端 Channel 的管理器。

  • #channelActive(ChannelHandlerContext ctx) 方法,在客户端和服务端建立连接完成时,调用 NettyChannelManager 的 #add(Channel channel) 方法,添加到其中

  • #channelUnregistered(ChannelHandlerContext ctx) 方法,在客户端和服务端断开连接时,调用 NettyChannelManager 的 #add(Channel channel) 方法,从其中移除

具体的 NettyChannelManager 的源码,我们在「2.1.4 NettyChannelManager」 小节中来瞅瞅~

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法,断开和客户端的连接。

2.1.4 NettyChannelManager

创建 NettyChannelManager 类,提供两种功能。

???? ① 客户端 Channel 的管理。代码如下:

@Component
public class NettyChannelManager {/*** {@link Channel#attr(AttributeKey)} 属性中,表示 Channel 对应的用户*/private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user");private Logger logger = LoggerFactory.getLogger(getClass());/*** Channel 映射*/private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();/*** 用户与 Channel 的映射。** 通过它,可以获取用户对应的 Channel。这样,我们可以向指定用户发送消息。*/private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>();/*** 添加 Channel 到 {@link #channels} 中** @param channel Channel*/public void add(Channel channel) {channels.put(channel.id(), channel);logger.info("[add][一个连接({})加入]", channel.id());}/*** 添加指定用户到 {@link #userChannels} 中** @param channel Channel* @param user 用户*/public void addUser(Channel channel, String user) {Channel existChannel = channels.get(channel.id());if (existChannel == null) {logger.error("[addUser][连接({}) 不存在]", channel.id());return;}// 设置属性channel.attr(CHANNEL_ATTR_KEY_USER).set(user);// 添加到 userChannelsuserChannels.put(user, channel);}/*** 将 Channel 从 {@link #channels} 和 {@link #userChannels} 中移除** @param channel Channel*/public void remove(Channel channel) {// 移除 channelschannels.remove(channel.id());// 移除 userChannelsif (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) {userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get());}logger.info("[remove][一个连接({})离开]", channel.id());}
}

???? ② 向客户端 Channel 发送消息。代码如下:

@Component
public class NettyChannelManager {/*** 向指定用户发送消息** @param user 用户* @param invocation 消息体*/public void send(String user, Invocation invocation) {// 获得用户对应的 ChannelChannel channel = userChannels.get(user);if (channel == null) {logger.error("[send][连接不存在]");return;}if (!channel.isActive()) {logger.error("[send][连接({})未激活]", channel.id());return;}// 发送消息channel.writeAndFlush(invocation);}/*** 向所有用户发送消息** @param invocation 消息体*/public void sendAll(Invocation invocation) {for (Channel channel : channels.values()) {if (!channel.isActive()) {logger.error("[send][连接({})未激活]", channel.id());return;}// 发送消息channel.writeAndFlush(invocation);}}}

2.1.5 引入依赖

创建 pom.xml 文件,引入 Netty 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>lab-67-netty-demo</artifactId><groupId>cn.iocoder.springboot.labs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>lab-67-netty-demo-server</artifactId><properties><!-- 依赖相关配置 --><spring.boot.version>2.2.4.RELEASE</spring.boot.version><!-- 插件相关配置 --><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.source>1.8</maven.compiler.source></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- Spring Boot 基础依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Netty 依赖 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.50.Final</version></dependency><!-- 引入 netty-demo-common 封装 --><dependency><groupId>cn.iocoder.springboot.labs</groupId><artifactId>lab-67-netty-demo-common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies></project>

2.1.6 NettyServerApplication

创建 NettyServerApplication 类,Netty Server 启动类。代码如下:

@SpringBootApplication
public class NettyServerApplication {public static void main(String[] args) {SpringApplication.run(NettyServerApplication.class, args);}}

2.1.7 简单测试

执行 NettyServerApplication 类,启动 Netty Server 服务器。日志如下:

... // 省略其他日志2020-06-21 00:16:38.801  INFO 41948 --- [           main] c.i.s.l.n.server.NettyServer             : [start][Netty Server 启动在 8888 端口]
2020-06-21 00:16:38.893  INFO 41948 --- [           main] c.i.s.l.n.NettyServerApplication         : Started NettyServerApplication in 0.96 seconds (JVM running for 1.4)

Netty Server 启动在 8888 端口。

2.2 构建 Netty 客户端

创建 lab-67-netty-demo-client 项目,搭建 Netty 客户端。如下图所示:

项目结构

下面,我们只会暂时看看 client 包下的代码,避免信息量过大,击穿胖友的秃头。

2.2.1 NettyClient

创建 NettyClient 类,Netty 客户端。代码如下:

@Component
public class NettyClient {/*** 重连频率,单位:秒*/private static final Integer RECONNECT_SECONDS = 20;private Logger logger = LoggerFactory.getLogger(getClass());@Value("${netty.server.host}")private String serverHost;@Value("${netty.server.port}")private Integer serverPort;@Autowiredprivate NettyClientHandlerInitializer nettyClientHandlerInitializer;/*** 线程组,用于客户端对服务端的连接、数据读写*/private EventLoopGroup eventGroup = new NioEventLoopGroup();/*** Netty Client Channel*/private volatile Channel channel;/*** 启动 Netty Server*/@PostConstructpublic void start() throws InterruptedException {// <2.1> 创建 Bootstrap 对象,用于 Netty Client 启动Bootstrap bootstrap = new Bootstrap();// <2.2>bootstrap.group(eventGroup) // <2.2.1> 设置一个 EventLoopGroup 对象.channel(NioSocketChannel.class)  // <2.2.2> 指定 Channel 为客户端 NioSocketChannel.remoteAddress(serverHost, serverPort) // <2.2.3> 指定连接服务器的地址.option(ChannelOption.SO_KEEPALIVE, true) // <2.2.4> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能.option(ChannelOption.TCP_NODELAY, true) //<2.2.5>  允许较小的数据包的发送,降低延迟.handler(nettyClientHandlerInitializer);// <2.3> 连接服务器,并异步等待成功,即启动客户端bootstrap.connect().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 连接失败if (!future.isSuccess()) {logger.error("[start][Netty Client 连接服务器({}:{}) 失败]", serverHost, serverPort);reconnect();return;}// 连接成功channel = future.channel();logger.info("[start][Netty Client 连接服务器({}:{}) 成功]", serverHost, serverPort);}});}public void reconnect() {// ... 暂时省略代码。}/*** 关闭 Netty Server*/@PreDestroypublic void shutdown() {// <3.1> 关闭 Netty Clientif (channel != null) {channel.close();}// <3.2> 优雅关闭一个 EventLoopGroup 对象eventGroup.shutdownGracefully();}/*** 发送消息** @param invocation 消息体*/public void send(Invocation invocation) {if (channel == null) {logger.error("[send][连接不存在]");return;}if (!channel.isActive()) {logger.error("[send][连接({})未激活]", channel.id());return;}// 发送消息channel.writeAndFlush(invocation);}}

友情提示:整体代码,是和「2.1.1 NettyServer」对等,且基本是一致的。

???? ① 在类上,添加 @Component 注解,把 NettyClient 的创建交给 Spring 管理。

  • serverHostserverPort 属性,读取 application.yml 配置文件的 netty.server.hostnetty.server.port 配置项。

  • #start() 方法,添加 @PostConstruct 注解,启动 Netty 客户端。

  • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 客户端。

???? ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Client 的启动,建立和服务器的连接。

<2.1> 处,创建 Bootstrap 类,Netty 提供的客户端的启动类,方便我们初始化 Client。

<2.2> 处,设置 Bootstrap 的各种属性。

<2.2.1> 处,调用 #group(EventLoopGroup group) 方法,设置使用 eventGroup 线程组,实现客户端对服务端的连接、数据读写。

<2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Client 实现类。

<2.2.3> 处,调用 #remoteAddress(SocketAddress localAddress) 方法,设置连接服务端的地址

<2.2.4> 处,调用 #option(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的心跳保活功能。

<2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许较小的数据包的发送,降低延迟。

<2.2.7> 处,调用 #handler(ChannelHandler childHandler) 方法,设置自己 Channel 的处理器为 NettyClientHandlerInitializer。稍后我们在「2.2.2 NettyClientHandlerInitializer」小节来看看。

<2.3> 处,调用 #connect() 方法,连接服务器,并异步等待成功,即启动客户端。同时,添加回调监听器 ChannelFutureListener,在连接服务端失败的时候,调用 #reconnect() 方法,实现定时重连。???? 具体 #reconnect() 方法的代码,我们稍后在瞅瞅哈。

③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Client 的关闭。

<3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Client,这样客户端就断开和服务端的连接。

<3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

#send(Invocation invocation) 方法,实现向服务端发送消息。

因为 NettyClient 是客户端,所以无需像 NettyServer 一样使用「2.1.4 NettyChannelManager」维护 Channel 的集合。

2.2.2 NettyClientHandlerInitializer

创建的 NettyClientHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,实现和服务端建立连接后,添加相应的 ChannelHandler 处理器。代码如下:

@Component
public class NettyClientHandlerInitializer extends ChannelInitializer<Channel> {/*** 心跳超时时间*/private static final Integer READ_TIMEOUT_SECONDS = 60;@Autowiredprivate MessageDispatcher messageDispatcher;@Autowiredprivate NettyClientHandler nettyClientHandler;@Overrideprotected void initChannel(Channel ch) {ch.pipeline()// 空闲检测.addLast(new IdleStateHandler(READ_TIMEOUT_SECONDS, 0, 0)).addLast(new ReadTimeoutHandler(3 * READ_TIMEOUT_SECONDS))// 编码器.addLast(new InvocationEncoder())// 解码器.addLast(new InvocationDecoder())// 消息分发器.addLast(messageDispatcher)// 客户端处理器.addLast(nettyClientHandler);}}

和「2.1.2 NettyServerHandlerInitializer」的代码基本一样,差别在于空闲检测额外增加 IdleStateHandler,客户端处理器换成了 NettyClientHandler

2.2.3 NettyClientHandler

创建 NettyClientHandler 类,实现客户端 Channel 断开连接、异常时的处理。代码如下:

@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate NettyClient nettyClient;@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 发起重连nettyClient.reconnect();// 继续触发事件super.channelInactive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);// 断开连接ctx.channel().close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {// 空闲时,向服务端发起一次心跳if (event instanceof IdleStateEvent) {logger.info("[userEventTriggered][发起一次心跳]");HeartbeatRequest heartbeatRequest = new HeartbeatRequest();ctx.writeAndFlush(new Invocation(HeartbeatRequest.TYPE, heartbeatRequest)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {super.userEventTriggered(ctx, event);}}}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

#channelInactive(ChannelHandlerContext ctx) 方法,实现在和服务端断开连接时,调用 NettyClient 的 #reconnect() 方法,实现客户端定时和服务端重连

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法,断开和客户端的连接。

#userEventTriggered(ChannelHandlerContext ctx, Object event) 方法,在客户端在空闲时,向服务端发送一次心跳,即心跳机制。这块的内容,我们稍后详细讲讲。

2.2.4 引入依赖

创建 pom.xml 文件,引入 Netty 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>lab-67-netty-demo</artifactId><groupId>cn.iocoder.springboot.labs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>lab-67-netty-demo-client</artifactId><properties><!-- 依赖相关配置 --><spring.boot.version>2.2.4.RELEASE</spring.boot.version><!-- 插件相关配置 --><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.source>1.8</maven.compiler.source></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- 实现对 Spring MVC 的自动化配置 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Netty 依赖 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.50.Final</version></dependency><!-- 引入 netty-demo-common 封装 --><dependency><groupId>cn.iocoder.springboot.labs</groupId><artifactId>lab-67-netty-demo-common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies></project>

2.2.5 NettyClientApplication

创建 NettyClientApplication 类,Netty Client 启动类。代码如下:

@SpringBootApplication
public class NettyClientApplication {public static void main(String[] args) {SpringApplication.run(NettyClientApplication.class, args);}}

2.2.6 简单测试

执行 NettyClientApplication 类,启动 Netty Client 客户端。日志如下:

... // 省略其他日志2020-06-21 09:06:12.205  INFO 44029 --- [ntLoopGroup-2-1] c.i.s.l.n.client.NettyClient             : [start][Netty Client 连接服务器(127.0.0.1:8888) 成功]

同时 Netty Server 服务端发现有一个客户端接入,打印如下日志:

2020-06-21 09:06:12.268  INFO 41948 --- [ntLoopGroup-3-1] c.i.s.l.n.server.NettyChannelManager     : [add][一个连接(db652822)加入]

2.3 小结

至此,我们已经构建 Netty 服务端和客户端完成。因为 Netty 提供的 API 非常便利,所以我们不会像直接使用 NIO 时,需要处理大量底层且细节的代码。

不过,如上的内容仅仅是本文的开胃菜,正片即将开始!美滋滋,继续往下看,奥利给!

3. 通信协议

在「2. 构建 Netty 服务端与客户端」小节中,我们实现了客户端和服务端的连接功能。而本小节,我们要让它们两能够说上话,即进行数据的读写

在日常项目的开发中,前端和后端之间采用 HTTP 作为通信协议,使用文本内容进行交互,数据格式一般是 JSON。但是在 TCP 的世界里,我们需要自己基于二进制构建,构建客户端和服务端的通信协议。

我们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登录请求,对应的类如下:

public class AuthRequest {/** 用户名 **/private String username;/** 密码 **/private String password;}
  • 显然,我们无法将一个 Java 对象直接丢到 TCP Socket 当中,而是需要将其转换成 byte 字节数组,才能写入到 TCP Socket 中去。即,需要将消息对象通过序列化,转换成 byte 字节数组。

  • 同时,在服务端收到 byte 字节数组时,需要将其又转换成 Java 对象,即反序列化。不然,服务端对着一串 byte 字节处理个毛线?!

友情提示:服务端向客户端发消息,也是一样的过程哈!

序列化的工具非常多,例如说 Google 提供的 Protobuf,性能高效,且序列化出来的二进制数据较小。Netty 对 Protobuf 进行集成,提供了相应的编解码器。如下图所示:

Netty protobuf

但是考虑到很多胖友对 Protobuf 并不了解,因为它实现序列化又增加胖友的额外学习成本。因此,艿艿仔细一个捉摸,还是采用 JSON 方式进行序列化。可能胖友会疑惑,JSON 不是将对象转换成字符串吗?嘿嘿,我们再把字符串转换成 byte 字节数组就可以啦~

下面,我们新建 lab-67-netty-demo-common 项目,并在 codec 包下,实现我们自定义的通信协议。如下图所示:

项目结构

3.1 Invocation

创建 Invocation 类,通信协议的消息体。代码如下:

/*** 通信协议的消息体*/
public class Invocation {/*** 类型*/private String type;/*** 消息,JSON 格式*/private String message;// 空构造方法public Invocation() {}public Invocation(String type, String message) {this.type = type;this.message = message;}public Invocation(String type, Message message) {this.type = type;this.message = JSON.toJSONString(message);}// ... 省略 setter、getter、toString 方法
}

①  type 属性,类型,用于匹配对应的消息处理器。如果类比 HTTP 协议,type 属性相当于请求地址。

message 属性,消息内容,使用 JSON 格式。

另外,Message 是我们定义的消息接口。代码如下:

public interface Message {// ... 空,作为标记接口}

3.2 粘包与拆包

在开始看 Invocation 的编解码处理器之前,我们先了解下粘包拆包的概念。

如果的内容,引用《Netty 解决粘包和拆包问题的四种方案》文章的内容,进行二次编辑。

3.2.1 产生原因

产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。

  • 如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。

    例如说,在《详解 Socket 编程 --- TCP_NODELAY 选项》文章中我们可以看到,在关闭 Nagle 算法时,请求不会等待满足缓冲区大小,而是尽快发出,降低延迟。

  • 如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

如下图展示了粘包和拆包的一个示意图,演示了粘包和拆包的三种情况:

示例图
  • A 和 B 两个包都刚好满足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而还是使用两个独立的包进行发送。

  • A 和 B 两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端。

  • B 包比较大,因而将其拆分为两个包 B_1 和 B_2 进行发送,而这里由于拆分后的 B_2 比较小,其又与 A 包合并在一起发送。

3.2.2 解决方案

对于粘包和拆包问题,常见的解决方案有三种:

???? ① 客户端在发送数据包的时候,每个包都固定长度。比如 1024 个字节大小,如果客户端发送的数据长度不足 1024 个字节,则通过补充空格的方式补全到指定长度。

这种方式,艿艿暂时没有找到采用这种方式的案例。

???? ② 客户端在每个包的末尾使用固定的分隔符。例如 \r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的 \r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包。

具体的案例,有 HTTP、WebSocket、Redis。

???? ③ 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。

友情提示:方案 ③ 是 ① 的升级版,动态长度

本文,艿艿将采用这种方式,在每次 Invocation 序列化成字节数组写入 TCP Socket 之前,先将字节数组的长度写到其中。如下图所示:

Invocation 序列化

3.3 InvocationEncoder

创建 InvocationEncoder 类,实现将 Invocation 序列化,并写入到 TCP Socket 中。代码如下:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {private Logger logger = LoggerFactory.getLogger(getClass());@Overrideprotected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {// <2.1> 将 Invocation 转换成 byte[] 数组byte[] content = JSON.toJSONBytes(invocation);// <2.2> 写入 lengthout.writeInt(content.length);// <2.3> 写入内容out.writeBytes(content);logger.info("[encode][连接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString());}}

① MessageToByteEncoder 是 Netty 定义的编码 ChannelHandler 抽象类,将泛型 <I> 消息转换成字节数组。

#encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,进行编码的逻辑。

<2.1> 处,调用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,将 Invocation 转换成 字节数组。

<2.2> 处,将字节数组的长度,写入到 TCP Socket 当中。这样,后续「3.4 InvocationDecoder」可以根据该长度,解析到消息,解决粘包和拆包的问题

友情提示:MessageToByteEncoder 会最终将 ByteBuf out 写到 TCP Socket 中。

<2.3> 处,将字节数组,写入到 TCP Socket 当中。

3.4 InvocationDecoder

创建 InvocationDecoder 类,实现从 TCP Socket 读取字节数组,反序列化成 Invocation。代码如下:

public class InvocationDecoder extends ByteToMessageDecoder {private Logger logger = LoggerFactory.getLogger(getClass());@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {// <2.1> 标记当前读取位置in.markReaderIndex();// <2.2> 判断是否能够读取 length 长度if (in.readableBytes() <= 4) {return;}// <2.3> 读取长度int length = in.readInt();if (length < 0) {throw new CorruptedFrameException("negative length: " + length);}// <3.1> 如果 message 不够可读,则退回到原读取位置if (in.readableBytes() < length) {in.resetReaderIndex();return;}// <3.2> 读取内容byte[] content = new byte[length];in.readBytes(content);// <3.3> 解析成 InvocationInvocation invocation = JSON.parseObject(content, Invocation.class);out.add(invocation);logger.info("[decode][连接({}) 解析到一条消息({})]", ctx.channel().id(), invocation.toString());}}

① ByteToMessageDecoder 是 Netty 定义的解码 ChannelHandler 抽象类,在 TCP Socket 读取到新数据时,触发进行解码。

② 在 <2.1><2.2><2.3> 处,从 TCP Socket 中读取长度

③ 在 <3.1><3.2><3.3> 处,从 TCP Socket 中读取字节数组,并反序列化成 Invocation 对象。

最终,添加 List<Object> out 中,交给后续的 ChannelHandler 进行处理。稍后,我们将在「4. 消息分发」小结中,会看到 MessageDispatcher 将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

3.5 引入依赖

创建 pom.xml 文件,引入 Netty、FastJSON 等等依赖。

  
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>lab-67-netty-demo</artifactId><groupId>cn.iocoder.springboot.labs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>lab-67-netty-demo-common</artifactId><properties><!-- 插件相关配置 --><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.source>1.8</maven.compiler.source></properties><dependencies><!-- Netty 依赖 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.50.Final</version></dependency><!-- FastJSON 依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.71</version></dependency><!-- 引入 Spring 相关依赖 --><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.5.RELEASE</version></dependency><!-- 引入 SLF4J 依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency></dependencies></project>

3.6 小结

至此,我们已经完成通信协议的定义、编解码的逻辑,是不是蛮有趣的?!

另外,我们在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代码中,将编解码器添加到其中。如下图所示:

编解码器的初始化

4. 消息分发

在 SpringMVC 中,DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。

lab-67-netty-demo-client 项目的 dispatcher 包中,我们创建了 MessageDispatcher 类,实现和 DispatcherServlet 类似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

dispatcher

下面,我们来看看具体的代码实现。

4.1 Message

创建 Message 接口,定义消息的标记接口。代码如下:

public interface Message {
}

下图,是我们涉及到的 Message 实现类。如下图所示:

Message 实现类

4.2 MessageHandler

创建 MessageHandler 接口,消息处理器接口。代码如下:

public interface MessageHandler<T extends Message> {/*** 执行处理消息** @param channel 通道* @param message 消息*/void execute(Channel channel, T message);/*** @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段*/String getType();}
  • 定义了泛型 <T> ,需要是 Message 的实现类。

  • 定义的两个接口方法,胖友自己看下注释哈。

下图,是我们涉及到的 MessageHandler 实现类。如下图所示:

MessageHandler 实现类

4.3 MessageHandlerContainer

创建 MessageHandlerContainer 类,作为 MessageHandler 的容器。代码如下:

public class MessageHandlerContainer implements InitializingBean {private Logger logger = LoggerFactory.getLogger(getClass());/*** 消息类型与 MessageHandler 的映射*/private final Map<String, MessageHandler> handlers = new HashMap<>();@Autowiredprivate ApplicationContext applicationContext;@Overridepublic void afterPropertiesSet() throws Exception {// 通过 ApplicationContext 获得所有 MessageHandler BeanapplicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean.forEach(messageHandler -> handlers.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中logger.info("[afterPropertiesSet][消息处理器数量:{}]", handlers.size());}/*** 获得类型对应的 MessageHandler** @param type 类型* @return MessageHandler*/MessageHandler getMessageHandler(String type) {MessageHandler handler = handlers.get(type);if (handler == null) {throw new IllegalArgumentException(String.format("类型(%s) 找不到匹配的 MessageHandler 处理器", type));}return handler;}/*** 获得 MessageHandler 处理的消息类** @param handler 处理器* @return 消息类*/static Class<? extends Message> getMessageClass(MessageHandler handler) {// 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);// 获得接口的 Type 数组Type[] interfaces = targetClass.getGenericInterfaces();Class<?> superclass = targetClass.getSuperclass();while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准interfaces = superclass.getGenericInterfaces();superclass = targetClass.getSuperclass();}if (Objects.nonNull(interfaces)) {// 遍历 interfaces 数组for (Type type : interfaces) {// 要求 type 是泛型参数if (type instanceof ParameterizedType) {ParameterizedType parameterizedType = (ParameterizedType) type;// 要求是 MessageHandler 接口if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();// 取首个元素if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {return (Class<Message>) actualTypeArguments[0];} else {throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));}}}}}throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));}}

① 实现 InitializingBean 接口,在 #afterPropertiesSet() 方法中,扫描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。

② 在 #getMessageHandler(String type) 方法中,获得类型对应的 MessageHandler 对象。稍后,我们会在 MessageDispatcher 调用该方法。

③ 在 #getMessageClass(MessageHandler handler) 方法中,通过 MessageHandler 中,通过解析其类上的泛型,获得消息类型对应的 Class 类。这是参考 rocketmq-spring 项目的 DefaultRocketMQListenerContainer#getMessageType() 方法,进行略微修改。

友情提示:如果胖友对 Java 的泛型机制没有做过一点了解,可能略微有点硬核。可以先暂时跳过,知道意图即可。

4.4 MessageDispatcher

创建 MessageDispatcher 类,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。代码如下:

@ChannelHandler.Sharable
public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {@Autowiredprivate MessageHandlerContainer messageHandlerContainer;private final ExecutorService executor =  Executors.newFixedThreadPool(200);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {// <3.1> 获得 type 对应的 MessageHandler 处理器MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());// 获得  MessageHandler 处理器的消息类Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);// <3.2> 解析消息Message message = JSON.parseObject(invocation.getMessage(), messageClass);// <3.3> 执行逻辑executor.submit(new Runnable() {@Overridepublic void run() {// noinspection uncheckedmessageHandler.execute(ctx.channel(), message);}});}}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

② SimpleChannelInboundHandler 是 Netty 定义的消息处理 ChannelHandler 抽象类,处理消息的类型是 <I> 泛型时。

#channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,处理消息,进行分发。

消息分发

<3.1> 处,调用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,获得 Invocation 的 type 对应的 MessageHandler 处理器

然后,调用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,获得  MessageHandler 处理器的消息类

<3.2> 处,调用 JSON 的 # parseObject(String text, Class<T> clazz) 方法,将 Invocation 的 message 解析成 MessageHandler 对应的消息对象

<3.3> 处,丢到线程池中,然后调用 MessageHandler 的 #execute(Channel channel, T message) 方法,执行业务逻辑

注意,为什么要丢到 executor 线程池中呢?我们先来了解下 EventGroup 的线程模型。

友情提示:在我们启动 Netty 服务端或者客户端时,都会设置其 EventGroup。

EventGroup 我们可以先简单理解成一个线程池,并且线程池的大小仅仅是 CPU 数量 * 2。每个 Channel 仅仅会被分配到其中的一个线程上,进行数据的读写。并且,多个 Channel 会共享一个线程,即使用同一个线程进行数据的读写。

那么胖友试着思考下,MessageHandler 的具体逻辑视线中,往往会涉及到 IO 处理,例如说进行数据库的读取。这样,就会导致一个 Channel 在执行 MessageHandler 的过程中,阻塞了共享当前线程的其它 Channel 的数据读取。

因此,我们在这里创建了 executor 线程池,进行 MessageHandler 的逻辑执行,避免阻塞 Channel 的数据读取。

可能会有胖友说,我们是不是能够把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长连接的 Netty 服务端,往往会有 1000 ~ 100000 的 Netty 客户端连接上来,这样无论设置多大的线程池,都会出现阻塞数据读取的情况。

友情提示:executor 线程池,我们一般称之为业务线程池或者逻辑线程池,顾名思义,就是执行业务逻辑的。

这样的设计方式,目前 Dubbo 等等 RPC 框架,都采用这种方式。

后续,胖友可以认真阅读下《【NIO 系列】——之 Reactor 模型》文章,进一步理解。

4.5 NettyServerConfig

创建 NettyServerConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。代码如下:

@Configuration
public class NettyServerConfig {@Beanpublic MessageDispatcher messageDispatcher() {return new MessageDispatcher();}@Beanpublic MessageHandlerContainer messageHandlerContainer() {return new MessageHandlerContainer();}}

4.6 NettyClientConfig

友情提示:和「4.5 NettyServerConfig」小结一致。

创建 NettyClientConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。代码如下:

@Configuration
public class NettyClientConfig {@Beanpublic MessageDispatcher messageDispatcher() {return new MessageDispatcher();}@Beanpublic MessageHandlerContainer messageHandlerContainer() {return new MessageHandlerContainer();}}

4.7 小结

后续,我们将在如下小节,具体演示消息分发的使用:

  • 「6. 心跳机制与空闲检测」

  • 「7. 认证逻辑」

  • 「7. 单聊逻辑」

  • 「8. 群聊逻辑」

5. 断开重连

Netty 客户端需要实现断开重连机制,解决各种情况下的断开情况。例如说:

  • Netty 客户端启动时,Netty 服务端处于挂掉,导致无法连接上。

  • 在运行过程中,Netty 服务端挂掉,导致连接被断开。

  • 任一一端网络抖动,导致连接异常断开。

具体的代码实现比较简单,只需要在两个地方增加重连机制。

  • Netty 客户端启动时,无法连接 Netty 服务端时,发起重连。

  • Netty 客户端运行时,和 Netty 断开连接时,发起重连。

考虑到重连会存在失败的情况,我们采用定时重连的方式,避免占用过多资源。

5.1 具体代码

① 在 NettyClient 中,提供 #reconnect() 方法,实现定时重连的逻辑。代码如下:

// NettyClient.javapublic void reconnect() {eventGroup.schedule(new Runnable() {@Overridepublic void run() {logger.info("[reconnect][开始重连]");try {start();} catch (InterruptedException e) {logger.error("[reconnect][重连失败]", e);}}}, RECONNECT_SECONDS, TimeUnit.SECONDS);logger.info("[reconnect][{} 秒后将发起重连]", RECONNECT_SECONDS);
}

通过调用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,实现定时逻辑。而在内部的具体逻辑,调用 NettyClient 的 #start() 方法,发起连接 Netty 服务端。

又因为 NettyClient 在 #start() 方法在连接 Netty 服务端失败时,又会调用 #reconnect() 方法,从而再次发起定时重连。如此循环反复,知道 Netty 客户端连接上 Netty 服务端。如下图所示:

NettyClient 重连

② 在 NettyClientHandler 中,实现 #channelInactive(ChannelHandlerContext ctx) 方法,在发现和 Netty 服务端断开时,调用 Netty Client 的 #reconnect() 方法,发起重连。代码如下:

// NettyClientHandler.java@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {// 发起重连nettyClient.reconnect();// 继续触发事件super.channelInactive(ctx);
}

5.2 简单测试

① 启动 Netty Client,不要启动 Netty Server,控制台打印日志如下图:

重连失败

可以看到 Netty Client 在连接失败时,不断发起定时重连。

② 启动 Netty Server,控制台打印如下图:

重连成功

可以看到 Netty Client 成功重连上 Netty Server。

6. 心跳机制与空闲检测

在上文中,艿艿推荐胖友阅读《TCP Keepalive 机制刨根问底》文章,我们可以了解到 TCP 自带的空闲检测机制,默认是 2 小时。这样的检测机制,从系统资源层面上来说是可以接受的。

但是在业务层面,如果 2 小时才发现客户端与服务端的连接实际已经断开,会导致中间非常多的消息丢失,影响客户的使用体验。

因此,我们需要在业务层面,自己实现空闲检测,保证尽快发现客户端与服务端实际已经断开的情况。实现逻辑如下:

  • 服务端发现 180 秒未从客户端读取到消息,主动断开连接。

  • 客户端发现 180 秒未从服务端读取到消息,主动断开连接。

考虑到客户端和服务端之间并不是一直有消息的交互,所以我们需要增加心跳机制

  • 客户端每 60 秒向服务端发起一次心跳消息,保证服务端可以读取到消息。

  • 服务端在收到心跳消息时,回复客户端一条确认消息,保证客户端可以读取到消息。

友情提示:

  • 为什么是 180 秒?可以加大或者减小,看自己希望多快检测到连接异常。过短的时间,会导致心跳过于频繁,占用过多资源。

  • 为什么是 60 秒?三次机会,确认是否心跳超时。

虽然听起来有点复杂,但是实现起来并不复杂哈。

6.1 服务端的空闲检测

在 NettyServerHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。如下图所示:

ReadTimeoutHandler

通过这样的方式,实现服务端发现 180 秒未从客户端读取到消息,主动断开连接。

6.2 客户端的空闲检测

友情提示:和「6.1 服务端的空闲检测」一致。

在 NettyClientHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。如下图所示:

ReadTimeoutHandler

通过这样的方式,实现客户端发现 180 秒未从服务端读取到消息,主动断开连接。

6.3 心跳机制

Netty 提供了 IdleStateHandler 处理器,提供空闲检测的功能,在 Channel 的读或者写空闲时间太长时,将会触发一个 IdleStateEvent 事件。

这样,我们只需要在 NettyClientHandler 处理器中,在接收到 IdleStateEvent 事件时,客户端向客户端发送一次心跳消息。如下图所示:

客户端心跳
  • 其中,HeartbeatRequest 是心跳请求。

同时,我们在服务端项目中,创建了一个 HeartbeatRequestHandler 消息处理器,在收到客户端的心跳请求时,回复客户端一条确认消息。代码如下:

@Component
public class HeartbeatRequestHandler implements MessageHandler<HeartbeatRequest> {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic void execute(Channel channel, HeartbeatRequest message) {logger.info("[execute][收到连接({}) 的心跳请求]", channel.id());// 响应心跳HeartbeatResponse response = new HeartbeatResponse();channel.writeAndFlush(new Invocation(HeartbeatResponse.TYPE, response));}@Overridepublic String getType() {return HeartbeatRequest.TYPE;}}
  • 其中,HeartbeatResponse 是心跳确认响应

6.4 简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,耐心等待 60 秒后,可以看到心跳日志如下:

// ... 客户端
2020-06-22 08:24:47.275  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler   : [userEventTriggered][发起一次心跳]
2020-06-22 08:24:47.335  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(44223e18) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:24:47.408  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(44223e18) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
2020-06-22 08:24:47.409  INFO 57005 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatResponseHandler   : [execute][收到连接(44223e18) 的心跳响应]// ... 服务端
2020-06-22 08:24:47.388  INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(34778465) 解析到一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:24:47.390  INFO 56998 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatRequestHandler    : [execute][收到连接(34778465) 的心跳请求]
2020-06-22 08:24:47.399  INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(34778465) 编码了一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]

7. 认证逻辑

友情提示:从本小节开始,我们就具体看看业务逻辑的处理示例。

认证的过程,如下图所示:

认证流程

7.1 AuthRequest

创建 AuthRequest 类,定义用户认证请求。代码如下:

public class AuthRequest implements Message {public static final String TYPE = "AUTH_REQUEST";/*** 认证 Token*/private String accessToken;// ... 省略 setter、getter、toString 方法
}

这里我们使用 accessToken 认证令牌进行认证。

因为一般情况下,我们使用 HTTP 进行登录系统,然后使用登录后的身份标识(例如说 accessToken 认证令牌),将客户端和当前用户进行认证绑定。

7.2 AuthResponse

创建 AuthResponse 类,定义用户认证响应。代码如下:

public class AuthResponse implements Message {public static final String TYPE = "AUTH_RESPONSE";/*** 响应状态码*/private Integer code;/*** 响应提示*/private String message;// ... 省略 setter、getter、toString 方法
}

7.3 AuthRequestHandler

服务端...

创建 AuthRequestHandler 类,为服务端处理客户端的认证请求。代码如下:

@Component
public class AuthRequestHandler implements MessageHandler<AuthRequest> {@Autowiredprivate NettyChannelManager nettyChannelManager;@Overridepublic void execute(Channel channel, AuthRequest authRequest) {// <1> 如果未传递 accessTokenif (StringUtils.isEmpty(authRequest.getAccessToken())) {AuthResponse authResponse = new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入");channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));return;}// <2> ... 此处应有一段// <3> 将用户和 Channel 绑定// 考虑到代码简化,我们先直接使用 accessToken 作为 UsernettyChannelManager.addUser(channel, authRequest.getAccessToken());// <4> 响应认证成功AuthResponse authResponse = new AuthResponse().setCode(0);channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));}@Overridepublic String getType() {return AuthRequest.TYPE;}}

代码比较简单,胖友看看 <1><2><3><4> 上的注释。

7.4 AuthResponseHandler

客户端...

创建 AuthResponseHandler 类,为客户端处理服务端的认证响应。代码如下:

@Component
public class AuthResponseHandler implements MessageHandler<AuthResponse> {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic void execute(Channel channel, AuthResponse message) {logger.info("[execute][认证结果:{}]", message);}@Overridepublic String getType() {return AuthResponse.TYPE;}}

打印个认证结果,方便调试。

7.5 TestController

客户端...

创建 TestController 类,提供 /test/mock 接口,模拟客户端向服务端发送请求。代码如下:

@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate NettyClient nettyClient;@PostMapping("/mock")public String mock(String type, String message) {// 创建 Invocation 对象Invocation invocation = new Invocation(type, message);// 发送消息nettyClient.send(invocation);return "success";}}

7.6 简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,然后使用 Postman 模拟一次认证请求。如下图所示:

Postman 模拟认证请求

同时,可以看到认证成功的日志如下:

// 客户端...
2020-06-22 08:41:12.364  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(9e086597) 编码了一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})]
2020-06-22 08:41:12.390  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]
2020-06-22 08:41:12.392  INFO 57583 --- [pool-1-thread-1] c.i.s.l.n.m.auth.AuthResponseHandler     : [execute][认证结果:AuthResponse{code=0, message='null'}]// 服务端...
2020-06-22 08:41:12.374  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})]
2020-06-22 08:41:12.379  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(791f122b) 编码了一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]

8. 单聊逻辑

私聊的过程,如下图所示:

私聊流程

服务端负责将客户端 A 发送的私聊消息,转发给客户端 B。

8.1 ChatSendToOneRequest

创建 ChatSendToOneRequest 类,发送给指定人的私聊消息的请求。代码如下:

public class ChatSendToOneRequest implements Message {public static final String TYPE = "CHAT_SEND_TO_ONE_REQUEST";/*** 发送给的用户*/private String toUser;/*** 消息编号*/private String msgId;/*** 内容*/private String content;// ... 省略 setter、getter、toString 方法
}

8.2 ChatSendResponse

创建 ChatSendResponse 类,聊天发送消息结果的响应。代码如下:

public class ChatSendResponse implements Message {public static final String TYPE = "CHAT_SEND_RESPONSE";/*** 消息编号*/private String msgId;/*** 响应状态码*/private Integer code;/*** 响应提示*/private String message;// ... 省略 setter、getter、toString 方法
}

8.3 ChatRedirectToUserRequest

创建 ChatRedirectToUserRequest 类, 转发消息给一个用户的请求。代码如下:

public class ChatRedirectToUserRequest implements Message {public static final String TYPE = "CHAT_REDIRECT_TO_USER_REQUEST";/*** 消息编号*/private String msgId;/*** 内容*/private String content;// ... 省略 setter、getter、toString 方法
}

友情提示:写完之后,艿艿突然发现少了一个 fromUser 字段,表示来自谁的消息。

8.4 ChatSendToOneHandler

服务端...

创建 ChatSendToOneHandler 类,为服务端处理客户端的私聊请求。代码如下:

@Component
public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> {@Autowiredprivate NettyChannelManager nettyChannelManager;@Overridepublic void execute(Channel channel, ChatSendToOneRequest message) {// <1> 这里,假装直接成功ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));// <2> 创建转发的消息,发送给指定用户ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId()).setContent(message.getContent());nettyChannelManager.send(message.getToUser(), new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));}@Overridepublic String getType() {return ChatSendToOneRequest.TYPE;}}

代码比较简单,胖友看看 <1><2> 上的注释。

8.5 ChatSendResponseHandler

客户端...

创建 ChatSendResponseHandler 类,为客户端处理服务端的聊天响应。代码如下:

@Component
public class ChatSendResponseHandler implements MessageHandler<ChatSendResponse> {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic void execute(Channel channel, ChatSendResponse message) {logger.info("[execute][发送结果:{}]", message);}@Overridepublic String getType() {return ChatSendResponse.TYPE;}}

打印个聊天发送结果,方便调试。

8.6 ChatRedirectToUserRequestHandler

客户端

创建 ChatRedirectToUserRequestHandler 类,为客户端处理服务端的转发消息的请求。代码如下:

@Component
public class ChatRedirectToUserRequestHandler implements MessageHandler<ChatRedirectToUserRequest> {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic void execute(Channel channel, ChatRedirectToUserRequest message) {logger.info("[execute][收到消息:{}]", message);}@Overridepublic String getType() {return ChatRedirectToUserRequest.TYPE;}}

打印个聊天接收消息,方便调试。

8.7 简单测试

① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A。然后使用 Postman 模拟一次认证请求(用户为 yunai)。如下图所示:

Postman 模拟认证请求

③ 启动 Netty Client 客户端 B。注意,需要设置 --server.port 端口为 8081,避免冲突。如下图所示:

IDEA 设置

然后使用 Postman 模拟一次认证请求(用户为 tutou)。如下图所示:

Postman 模拟认证请求

④ 最后使用 Postman 模拟一次 yunai 芋艿给 tutou 土豆发送一次私聊消息。如下图所示:

Postman 模拟私聊请求

同时,可以看到客户端 A 向客户端 B 发送私聊消息的日志如下:

// 客户端 A...(芋艿)
2020-06-22 08:48:09.505  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tudou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:09.510  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:09.511  INFO 57583 --- [ool-1-thread-69] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}]
2020-06-22 08:48:35.148  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:35.150  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:35.150  INFO 57583 --- [ool-1-thread-70] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}]// 服务端 ...
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})]// 客户端 B...(秃头)
2020-06-22 08:48:18.277  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler   : [userEventTriggered][发起一次心跳]
2020-06-22 08:48:18.278  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(24fbc3e8) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:48:18.280  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
2020-06-22 08:48:18.281  INFO 59613 --- [pool-1-thread-4] c.i.s.l.n.m.h.HeartbeatResponseHandler   : [execute][收到连接(24fbc3e8) 的心跳响应]
2020-06-22 08:48:35.150  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})]
2020-06-22 08:48:35.151  INFO 59613 --- [pool-1-thread-5] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='1', content='你猜'}]

9. 群聊逻辑

群聊的过程,如下图所示:

群聊流程

服务端负责将客户端 A 发送的群聊消息,转发给客户端 A、B、C。

友情提示:考虑到逻辑简洁,艿艿提供的本小节的示例,并不是一个一个群,而是所有人在一个大的群聊中哈~

9.1 ChatSendToAllRequest

创建 ChatSendToOneRequest 类,发送给所有人的群聊消息的请求。代码如下:

public class ChatSendToAllRequest implements Message {public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";/*** 消息编号*/private String msgId;/*** 内容*/private String content;// ... 省略 setter、getter、toString 方法
}

友情提示:如果是正经的群聊,会有一个 groupId 字段,表示群编号。

9.2 ChatSendResponse

和「8.2 ChatSendResponse」小节一致。

9.3 ChatRedirectToUserRequest

和「8.3 ChatRedirectToUserRequest」小节一致。

9.4 ChatSendToAllHandler

服务端...

创建 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。代码如下:

@Component
public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> {@Autowiredprivate NettyChannelManager nettyChannelManager;@Overridepublic void execute(Channel channel, ChatSendToAllRequest message) {// <1> 这里,假装直接成功ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));// <2> 创建转发的消息,并广播发送ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId()).setContent(message.getContent());nettyChannelManager.sendAll(new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));}@Overridepublic String getType() {return ChatSendToAllRequest.TYPE;}}

代码比较简单,胖友看看 <1><2> 上的注释。

9.5 ChatSendResponseHandler

和「8.5 ChatSendResponseHandler」小节一致。

9.6 ChatRedirectToUserRequestHandler

和「8.6 ChatRedirectToUserRequestHandler」小节一致。

9.7 简单测试

① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A。然后使用 Postman 模拟一次认证请求(用户为 yunai)。如下图所示:

Postman 模拟认证请求

③ 启动 Netty Client 客户端 B。注意,需要设置 --server.port 端口为 8081,避免冲突。

IDEA 设置

④ 启动 Netty Client 客户端 C。注意,需要设置 --server.port 端口为 8082,避免冲突。

IDEA 设置

⑤ 最后使用 Postman 模拟一次发送群聊消息。如下图所示:

Postman 模拟群聊请求

同时,可以看到客户端 A 群发给所有客户端的日志如下:

// 客户端 A...
2020-06-22 08:55:44.898  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})]
2020-06-22 08:55:44.901  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 57583 --- [ol-1-thread-148] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='2', code=0, message='null'}]
2020-06-22 08:55:44.901  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.903  INFO 57583 --- [ol-1-thread-149] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]// 服务端...
2020-06-22 08:55:44.898  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-4] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9dc03826) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]// 客户端 B...
2020-06-22 08:55:44.902  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.902  INFO 59613 --- [ool-1-thread-83] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]// 客户端 C...
2020-06-22 08:55:44.901  INFO 61597 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9128c71c) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.903  INFO 61597 --- [ool-1-thread-16] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

666. 彩蛋

至此,我们已经通过 Netty 实现了一个简单的 IM 功能,是不是收获蛮大的,嘿嘿。

下面,良心的艿艿,再来推荐一波文章,嘿嘿。

  • 想要了解 Netty 源码的,可以阅读《Netty 实现原理与源码解析系统 —— 精品合集》文章。

  • 想要入门 Netty 基础的,可以阅读《Netty Bootstrap(图解)》文章。

等后续,艿艿会在 https://github.com/YunaiV/onemall 开源项目中,实现一个相对完整的客服功能,哈哈哈~



欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 20 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

兄弟,一口,点个!????

这篇关于使用 Netty 实现 IM 聊天贼简单,看不懂就锤爆艿艿的狗头~的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/377966

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu2289(简单二分)

虽说是简单二分,但是我还是wa死了  题意:已知圆台的体积,求高度 首先要知道圆台体积怎么求:设上下底的半径分别为r1,r2,高为h,V = PI*(r1*r1+r1*r2+r2*r2)*h/3 然后以h进行二分 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#includ

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi