Netty应用之粘包半包问题

2023-10-12 20:20

本文主要是介绍Netty应用之粘包半包问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 1.粘包现象演示
    • 2.半包现象演示
    • 3.粘包半包现象分析
      • 3.1 粘包
      • 3.2 半包
    • 4.粘包半包解决方案
      • 4.1短连接
      • 4.2 定长解码器
      • 4.3 行解码器
      • 4.4 LengthFieldBasedFrameDecoder
      • 4.5 LengthFieldBasedFrameDecoder演示

参考黑马程序员

1.粘包现象演示

我们通过一段代码演示一个这个粘包现象,首先我们来看服务端

    void start() {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}public static void main(String[] args) {new Server1().start();}

服务端这块我们首先创建两个EventLoopGroup,一个是boss就是专门负责建立连接的,另一个是worker专门处理业务逻辑的。然后把这两个EventLoopGroup放到group中,然后再handler中我们就放一个打印日志的handler。然后绑定端口8080,。

客户端

    static final Logger log = LoggerFactory.getLogger(Client1.class);public static void main(String[] args) {send();System.out.println("finish");}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 0;i < 10;i++) {ByteBuf buf = ctx.alloc().buffer(16);buf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buf);}}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}

客户端这边我们就用worker就行了当连接建立的时候客户端会循环发送bytebuf给服务端,发10次,然后我们看一下结果。

在这里插入图片描述

如图所示我这边服务端一次性的接收到了160B的bytebuf,我本来是想让他分着接收,但是他一次性的全给我接收了。这就是粘包现象。

2.半包现象演示

现在我给服务端的接收缓冲区调小,然后我们再看看结果。

serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);

在这里插入图片描述

如图所示很明显的最后一个接收的是4B之前的是40B,这很明显出现了半包。

3.粘包半包现象分析

3.1 粘包

  • 现象,发送 abc def,接收 abcdef
  • 原因
    • 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
    • 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
    • Nagle 算法:即使发送一个字节,也需要加入 tcp 头和 ip 头,也就是总字节数会使用 41 bytes,非常不经济。因此为了提高网络利用率,tcp 希望尽可能发送足够大的数据,这就是 Nagle 算法产生的缘由。该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送
      • 如果 SO_SNDBUF 的数据达到 MSS,则需要发送
      • 如果 SO_SNDBUF 中含有 FIN(表示需要连接关闭)这时将剩余数据发送,再关闭
      • 如果 TCP_NODELAY = true,则需要发送
      • 已发送的数据都收到 ack 时,则需要发送
      • 上述条件不满足,但发生超时(一般为 200ms)则需要发送
      • 除上述情况,延迟发送

3.2 半包

  • 现象,发送 abcdef,接收 abc def
  • 原因
    • 应用层:接收方 ByteBuf 小于实际发送数据量
    • 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
    • MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包,MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数,链路层对一次能够发送的最大数据有限制,不同的链路设备的 MTU 值也有所不同。以太网的 MTU 是 1500,本地回环地址的 MTU 是 65535 - 本地测试不走网卡。

4.粘包半包解决方案

4.1短连接

既然我们消息的发送是流式发送,没有消息边界。但是我们可以人为为他创造边界,即在每次消息发送之后我都断开连接,下次再发送时再重新建立连接,这样的话就不会出现粘包现象了。

解决粘包的客户端代码

    public static void main(String[] args) {for (int i = 0; i < 10; i++) {send();}System.out.println("finish");}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer(16);buf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buf);ctx.channel().close();}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}

然后我们看一下结果

在这里插入图片描述

当我们采用短连接的方式连接服务器时,数据会被明显的分开,这样粘包问题就得到了非常有效的解决。但是这种方式频繁的建立断开连接本身比较奇葩同时它并没有办法去解决半包问题,假如说我把服务端的接收缓冲区设置为16B,我这边客户端发送改成17B,这样肯定会出现半包,我们来演示一下看看结果。

服务端设置缓冲区代码

serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));

运行结果

在这里插入图片描述

如图所示短连接并没有解决半包问题。

4.2 定长解码器

在这里插入图片描述

如图所示这就是定长解码器,他会把接收到的数据,定长的分割。然后看看定长处理器怎么解决粘包问题。

客户端

    static final Logger log = LoggerFactory.getLogger(Client1.class);public static void main(String[] args) {send();System.out.println("finish");}public static byte[] fill10Bytes(char c, int len) {byte[] bytes = new byte[10];Arrays.fill(bytes, (byte) '_');for (int i = 0; i < len; i++) {bytes[i] = (byte) c;}System.out.println(new String(bytes));return bytes;}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();char c = '0';Random r = new Random();for (int i = 0; i < 10; i++) {byte[] bytes = fill10Bytes(c, r.nextInt(10) + 1);c++;buf.writeBytes(bytes);}ctx.writeAndFlush(buf);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}

假如说我规定每10B是一个有效的数据,所以我客户端都会发送10B的数据,通过fill10Bytes来创建一个定长的10B的数据大小,然后填入1~10随机长度的字符,如果小于10用’_'占位。发送10次。由于我要观察在客户端这块是否发生了粘包,所以我得添加一个LoggingHandler来打印日志。

服务端

    void start() {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);// 调整系统的接收缓冲区(滑动窗口)
//            serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);// 调整 netty 的接收缓冲区(byteBuf)serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new FixedLengthFrameDecoder(10));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}public static void main(String[] args) {new Server2().start();}

在这里插入图片描述

运行一下看看客户端在发送的过程中已经粘包了。然后我们看看服务端接收的情况如何。

在这里插入图片描述

客户端虽然发送来了粘包的数据,但是通过定长解码器服务端正确的接收到了消息。虽然这种方式能正确解决粘包问题,但是我们收到了很多无用的’_'数据实际上也是一种浪费。

4.3 行解码器

我们可以通过节点分隔符来解决粘包半包问题。Netty为我们提供了两种行解码器,如下图所示。

在这里插入图片描述

第一种是LinedBasedFrameDecoder,这个就是指当遇到换行符的时候就分割。

在这里插入图片描述

第二种就是这个DelimiterBasedFrameDecoder,这个你可以自己指定分隔符。下面我们可以演示一下这个LinedBasedFrameDecoder。

客户端

    static final Logger log = LoggerFactory.getLogger(Client1.class);public static void main(String[] args) {send();System.out.println("finish");}public static StringBuilder makeString(char c, int len) {StringBuilder sb = new StringBuilder(len + 2);for (int i = 0; i < len; i++) {sb.append(c);}sb.append("\n");return sb;}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();char c = '0';Random r = new Random();for (int i = 0; i < 10; i++) {StringBuilder sb = makeString(c, r.nextInt(256) + 1);c++;buf.writeBytes(sb.toString().getBytes());}ctx.writeAndFlush(buf);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}

makeString方法就是将传进来的字符转化为字符串,然后再返回之前向字符串末尾添加"\n"。然后我们在handler中随机生成1~256长度的字符串发送给服务端。

服务端

    void start() {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}public static void main(String[] args) {new Server3().start();}

在handler当中我们添加LinedBasedFrameDecoder,然后设置最大长度为1024,如果超过1024个长度还没有遇到换行符的话,就说明你这个数据传过来有问题,会抛一个异常。

4.4 LengthFieldBasedFrameDecoder

    public LengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip) {this(maxFrameLength,lengthFieldOffset, lengthFieldLength, lengthAdjustment,initialBytesToStrip, true);}

这个解码器一共有5个参数我们通过一个例子来看看这5个参数都是啥意思。

  • maxFrameLength:最大帧长,如果超过了这个最大帧长报错。
  • lengthFieldOffset:长度偏移量,这个是数据发送的时候从第几个字节开始是数据标识长度的字段。
  • lengthFieldLength:数据中标识真正数据长度的字节数。
  • lengthAdjustment:数据的length部分和真正的数据部分之间的数据长度。
  • initialBytesToStrip:对接收的数据切掉最前面的字节数。

由于本人不善言辞,性格内向,表达能力欠缺,所以上述参数的描述可能不太准确所以我们接下来通过几个例子来更加深刻的理解这些参数。下面例子只针对最后四个参数,第一个参数嘴再笨也能讲明白了。

在这里插入图片描述

如图所示lengthFieldLength为2,说明在解码之前接收到的数据中由两部分组成第一部分是Length代表了真实数据的长度,第二部分就是真实数据,我们这个例子当中真实数据一共是12bytes,所以Length也是0x000C,也是12。接收到之后他一看真实数据是12bytes,所以他就会接收后12bytes的真实数据。

在这里插入图片描述

lengthFieldOffset为2,也就是说从数据开始的地方向后移动两个字节才能到Length部分,lengthFieldLength为3说明Length为2字节0x00000C。

在这里插入图片描述

lengthAdjustment就是以长度字段为基准还有几个字节是真实数据。如图所示,该参数为2,然后Head1就是夹在Length和Actual之间的,正好是两个字节。

在这里插入图片描述

如图所示,0xCA1字节在1字节之后是Length部分所以lengthFieldOffset为1,然后Length为0x000C是2字节所以lengthFieldLength为2,0xFE在Length和Actual之间1字节,所以lengthAdjustment为1。由于解码之后前面3字节的数据都被截取掉了所以initialBytesToStrip为3字节,因此这个initalBytesToStrip是解码之后要从前面切掉的字节数。

4.5 LengthFieldBasedFrameDecoder演示

这次我们使用EmbeddedChannel来进行演示好处就是不用写服务端和客户端了。

    public static void main(String[] args) {EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),new LoggingHandler(LogLevel.DEBUG));//  4 个字节的内容长度, 实际内容ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();send(buffer, "Hello, world");send(buffer, "Hi!");channel.writeInbound(buffer);}private static void send(ByteBuf buffer, String content) {byte[] bytes = content.getBytes(); // 实际内容int length = bytes.length; // 实际内容长度buffer.writeInt(length);buffer.writeBytes(bytes);}

如图所示我们客户端也就是send这个函数,负责准备发送数据,将要发送的数据写入buffer当中,然后通过channel.writeInbound(buffer)。发送给服务端。服务端handler就是LengthFieldBasedFrameDecoder和LoggingHandler,然后LengthFieldBasedFrameDecoder的参数分别是最大数据长度1024超过就抛异常,由于我的buffer上来就先写长度所以长度之前没有东西,所以lengthFieldOffset为0,然后Length为int型4字节,所以lengthFieldLength为4,length与实际内容之间不需要填充字节,所以lengthAdjustment为0,然后就是实际内容了,由于我们要对解码后数据的前4字节的长度单位做切割,所以initialBytesToStrip为4。

在这里插入图片描述

最后结果如图所示不多不少,正正好好。

这篇关于Netty应用之粘包半包问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/198215

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

好题——hdu2522(小数问题:求1/n的第一个循环节)

好喜欢这题,第一次做小数问题,一开始真心没思路,然后参考了网上的一些资料。 知识点***********************************无限不循环小数即无理数,不能写作两整数之比*****************************(一开始没想到,小学没学好) 此题1/n肯定是一个有限循环小数,了解这些后就能做此题了。 按照除法的机制,用一个函数表示出来就可以了,代码如下

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

zoj3820(树的直径的应用)

题意:在一颗树上找两个点,使得所有点到选择与其更近的一个点的距离的最大值最小。 思路:如果是选择一个点的话,那么点就是直径的中点。现在考虑两个点的情况,先求树的直径,再把直径最中间的边去掉,再求剩下的两个子树中直径的中点。 代码如下: #include <stdio.h>#include <string.h>#include <algorithm>#include <map>#

购买磨轮平衡机时应该注意什么问题和技巧

在购买磨轮平衡机时,您应该注意以下几个关键点: 平衡精度 平衡精度是衡量平衡机性能的核心指标,直接影响到不平衡量的检测与校准的准确性,从而决定磨轮的振动和噪声水平。高精度的平衡机能显著减少振动和噪声,提高磨削加工的精度。 转速范围 宽广的转速范围意味着平衡机能够处理更多种类的磨轮,适应不同的工作条件和规格要求。 振动监测能力 振动监测能力是评估平衡机性能的重要因素。通过传感器实时监

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

缓存雪崩问题

缓存雪崩是缓存中大量key失效后当高并发到来时导致大量请求到数据库,瞬间耗尽数据库资源,导致数据库无法使用。 解决方案: 1、使用锁进行控制 2、对同一类型信息的key设置不同的过期时间 3、缓存预热 1. 什么是缓存雪崩 缓存雪崩是指在短时间内,大量缓存数据同时失效,导致所有请求直接涌向数据库,瞬间增加数据库的负载压力,可能导致数据库性能下降甚至崩溃。这种情况往往发生在缓存中大量 k