Netty 物联网自定协议(含粘包、拆包及半包处理)

2023-10-12 20:20

本文主要是介绍Netty 物联网自定协议(含粘包、拆包及半包处理),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Netty 物联网自定协议(含粘包、拆包及半包处理)

文章目录

  • Netty 物联网自定协议(含粘包、拆包及半包处理)
    • 实现:
      • 1、自定义协议:
      • 2.其他模块的介绍 (可以跳过)
          • 1.服务器端的ChannelInitializer 实现
          • 2.自定义的解码器
          • 3.CustomMessageHandler (解耦,只需关注CustomMessage)
      • 2、继承 ByteToMessageDecoder 类重写 decode (代码实现)
        • ByteBuf与CustomMessage转换工具类:
      • 3、粘包、拆包、半包报文处理思路及其解释 (重点)
        • 1.责任链部分的处理
          • 问题解释:
          • 解释 (1)引用计数器异常问题
            • 解释(1.1) 这里为什么要 cumulation.release()
          • 解释 (2)为什么要跳过 读取剩下的buf
        • 2.读取正确报文
          • 问题解释:
            • (1) 为什么要 设置 buf.readerIndex()
        • 3.粘包处理
          • 第一段:
          • 第二段
        • 4.半包处理
        • 总结:
    • 其他
      • 为什么要重写ByteToMessageDecoder
        • Netty 常用的粘包拆包解析器UML图
      • 所需具备的知识点:
          • 1.ByteBuf 的使用
          • 2.了解设计模式 之 《责任链模式》

实现:


1、自定义协议:

/*** 自定义协议*/
@Data
public class CustomMessage {private byte head1 = (byte) 0XAA;private byte head2 = 0X86;private byte len1 = 0x00; // 预留private byte len2; // fun + cmd + dataHexprivate byte[] userId;private byte fun;private byte[] cmd;private byte[] dataHex;private byte CRC;private byte end1 = (byte) 0XDD;private byte end2 = (byte) 0XEE;
}

2.其他模块的介绍 (可以跳过)

1.服务器端的ChannelInitializer 实现
@Slf4j
@ChannelHandler.Sharable
public class LotNettyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {HeartbeatThreshold threshold = new HeartbeatThreshold(); // 心跳阈值相关处理// 不同的channel 处理方式不一样,责任链共用一个 threshold 没有问题ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(LotNettyConfig.USE_CODEC_STRATEGY.CodecStrategy().codecs()); // 自定义的解码器加载策略  见 3.自定义的解码器pipeline.addLast(new IdleStateHandler(LotNettyConfig.IDLE_CHECK_SECONDS, 0, 0));pipeline.addLast(new HeartbeatHandler(threshold));pipeline.addLast(new MiniMsgHandler(new MiniMsgProcessMessage(threshold))); // 其他自定义协议业务处理1pipeline.addLast(new CustomMessageHandler(new CustomMessageProcessMessage(threshold))); // 自定义协议业务处理log.info("初始化 pipeline ...");}
}
2.自定义的解码器
public class CustomCodecStrategy implements CodecStrategy {@Overridepublic ChannelHandler[] codecs() {// 大多数物联网设备都是二进制传输,编码器直接采用 ByteArrayEncoder()return new ChannelHandler[]{new ByteArrayEncoder(), new CustomMessageDecoder(), new MiniMsgDecoder()};}}
3.CustomMessageHandler (解耦,只需关注CustomMessage)
@Slf4j
@ChannelHandler.Sharable
public class CustomMessageHandler extends SimpleChannelInboundHandler<CustomMessage> {private final AbstractProcessMessage<CustomMessage> processMessage;public CustomMessageHandler(AbstractProcessMessage<CustomMessage> processMessage) {this.processMessage = processMessage;}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.warn("channel 关闭,清除相关的信息");LotNettyContext context = LotNettyContextUtil.getContext();context.clearChannel(ctx);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, CustomMessage msg) throws Exception {processMessage.process(ctx, msg); // 具体的业务处理,比如 设备状态上报处理等等}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("异常 ... ", cause);}
}

4.AbstractProcessMessage 主要针对不同的协议,使用不同的通讯策略进行通讯

/*** 协议处理抽象类* —— 每一个管道,都需要维护不同的ProcessMessage,心跳也是一样的,因此不能使用单例* @param <T>*/
@Slf4j
public abstract class AbstractProcessMessage<T> {protected final HeartbeatThreshold threshold;protected final AtomicBoolean useStrategy = new AtomicBoolean(false);public AbstractProcessMessage(HeartbeatThreshold threshold) {this.threshold = threshold;}public abstract void process(ChannelHandlerContext ctx, T msg);// BaseCommand 基础的心跳检测方法  比如: ping 、pong     protected void useHeartbeatStrategy(ChannelHandlerContext ctx, BaseCommand baseCommand) {if (!useStrategy.get()) {// 加入 心跳检测策略LotNettyContextUtil.getContext().putOnceBaseCommand(ctx.channel().id(), baseCommand);useStrategy.compareAndSet(false, true);}}/*** 允许channel重新加载心跳策略,*/public void upUseStrategy() {useStrategy.compareAndSet(true, false);}}

2、继承 ByteToMessageDecoder 类重写 decode (代码实现)

@Slf4j
public class CustomMessageDecoder extends ByteToMessageDecoder {private final static byte FIXED_LEN = 10; // 固定长度,(报文文件长度=fun + cmd + dataHex),整条报文长度=FIXED_LEN+文件头长度 (len2)private final static byte HEAD1 = (byte) 0xAA;private final static byte HEAD2 = 0x86;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int start = buf.readerIndex() == 0 ? 0 : buf.readerIndex(); // 解决粘包 且存在多余的报文问题int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) {String hexDump = ByteBufUtil.hexDump(buf);// 粘包 (1) aa860004ef10ac0100031272ddeeaa860004ef10ac0100031472ddeeaa860004ef10ac0100031572ddee// 半包+粘包 (2) aa860004ef10 ac0100031272ddeeaa860004ef10ac0100031472ddeeaa860004ef10ac0100031572ddee// 粘包+半包 (3) aa860004ef10ac0100031272ddeeaa860004ef10ac01000314 72ddee// 粘包+半包 (4) aa860004ef10ac0100031272dd eeaa860004ef10ac0100031472ddee// 粘包+半包 + 文件头不完整 aa860004ef10ac0100031472ddeeaa8600 04ef10ac0100031472ddeelog.debug("16 进制内容:{}", hexDump);try {int begin; // 为了更直观体现,读取ByteBuf,采用显示指定readerIndex位置,ByteBuf有提供markReaderIndex(),其作用一样。while (buf.isReadable()) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();if (readableBytes < 4) {// 4 是文件头部分所占的字节数// 半包,且文件头不完整,没有读 ByteBUf的readIndex 指针没有发生偏移,直接返回,由 父类 Cumulator (ByteBuf累加器)累加等						   待完整的报文return;}// 协议文件头长度byte head = buf.readByte();byte head2 = buf.readByte();buf.readByte();// 预留长度byte len2 = (byte) (buf.readByte() & 0xFF); // &0xFF 转16进制if ((head == HEAD1 && head2 == HEAD2)) {begin = readerIndex;// 半包问题// 协议文件头数据还未到齐,回到协议开始的位置,等待数据到齐if (readableBytes < FIXED_LEN + len2) {if (begin == 0) {// 重置 针对 半包buf.resetReaderIndex();return;}buf.readerIndex(begin); // 处理粘包+半包return;}// 正确报文if (readableBytes == FIXED_LEN + len2) {CustomMessage msg = CustomMessageUtil.decode(buf.readerIndex(begin)); out.add(msg);break;}int length = FIXED_LEN + len2;// 粘包问题int lastIndex = buf.forEachByte(begin, Math.min(readableBytes, length), b -> b == (byte) 0xDD || b == (byte) 0xEE);if (lastIndex > -1) {// 存在包尾CustomMessage msg = CustomMessageUtil.decode(buf.readerIndex(begin));out.add(msg);break;}}byte b = buf.readByte(); // 通过读取,来丢弃无效报文if (b == (byte) 0xDD) {byte end2 = buf.readByte();if (end2 == (byte) 0xEE) {break;}}}} catch (ProtocolException e) { // ProtocolException  为自定义异常log.error("协议解析失败,释放掉buf", e);buf.skipBytes(buf.readableBytes());} catch (Exception e) {log.error("解析失败:", e);}return;}if (start > 0) {log.warn("丢弃无效报文:{}", ByteBufUtil.hexDump(buf));buf.skipBytes(buf.readableBytes());return;}// 复制一份,防止skipBytes跳过,导致传递的消息引用计数器被释放掉,下游handler再次释放会抛异常// 同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1。ByteBuf buffer = buf.retainedDuplicate();// 解决 decode() did not read anything but decoded a message的异常// 原因是netty不允许有字节内容不读的情况发生,所以跳过已读buf.skipBytes(buf.readableBytes());out.add(buffer); // 交给下一个管道处理}
}
ByteBuf与CustomMessage转换工具类:
public class CustomMessageUtil {/*** 解码* @param buf* @return*/public static CustomMessage decode(ByteBuf buf) {CustomMessage msg = new CustomMessage();msg.setHead1(buf.readByte());msg.setHead2(buf.readByte());msg.setLen1(buf.readByte());byte len = buf.readByte();msg.setLen2(len);byte[] userId = new byte[3];buf.readBytes(userId);msg.setUserId(userId);msg.setFun(buf.readByte());byte readCount = (byte) (len - 1);decode_0x01(buf, msg, readCount); // 对 cmd + dataHex 功能模块的解析msg.setCRC(buf.readByte());msg.setEnd1(buf.readByte());msg.setEnd2(buf.readByte());return msg;}/*** 编码* @param msg* @return*/public static ByteBuf encode(CustomMessage msg) {ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.heapBuffer();buf.writeByte(msg.getHead1());buf.writeByte(msg.getHead2());buf.writeByte(msg.getLen1());int lengthIndex = buf.writerIndex();buf.writeByte(0x00); // 占位 (1)byte[] userId = msg.getUserId();if (userId != null) {for (byte c : userId) {buf.writeByte(c);}}buf.writeByte(msg.getFun());byte[] cmd = msg.getCmd();if (cmd != null) {for (byte c : cmd) {buf.writeByte(c);}}byte[] dataHex = msg.getDataHex();if (dataHex != null) {for (byte c : dataHex) {buf.writeByte(c);}}buf.writeByte(msg.getCRC());buf.writeByte(msg.getEnd1());buf.writeByte(msg.getEnd2());int i = buf.readableBytes() - 10;// 固定长度  结果等于 fun + cmd + dataHex 的长度buf.setByte(lengthIndex, i); // 塞入len  见 上(1)return buf;}}

3、粘包、拆包、半包报文处理思路及其解释 (重点)

1.责任链部分的处理

这里的目的是:识别出要处理的报文协议,否则直接交给pipeline中的下一个Handler处理

​ 伪代码:

public class CustomMessageDecoder extends ByteToMessageDecoder {private final static byte FIXED_LEN = 10;private final static byte BEGIN1 = (byte) 0xAA;private final static byte BEGIN2 = 0x86;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) {// 识别出CustomMessage 协议头,进行解析return;}// 其他自定义的协议,交由下一个管道进行处理// 同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1。   (1)ByteBuf buffer = buf.retainedDuplicate();// 解决 decode() did not read anything but decoded a message的异常   (2)// 原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。buf.skipBytes(buf.readableBytes());out.add(buffer);}
}
问题解释:
解释 (1)引用计数器异常问题

​ (1)同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1

原因: ByteToMessageDecoder

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance();try {first = cumulation == null;cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {if (cumulation != null && !cumulation.isReadable()) {numReads = 0;cumulation.release(); // 由于(2)buf.skipBytes(buf.readableBytes()); 跳过了读取ByteBuf,所以ByteBuf 会被释放掉// 这里为什么要 release() ? (1.1) 下面解释cumulation = null;} else if (++numReads >= discardAfterReads) {numReads = 0;discardSomeReadBytes();}int size = out.size();firedChannelRead |= out.insertSinceRecycled();fireChannelRead(ctx, out, size);} finally {out.recycle();}}} else {ctx.fireChannelRead(msg);}}

抛出异常的部分代码:

public abstract class  ReferenceCountUpdater<T extends ReferenceCounted> {private boolean retryRelease0(T instance, int decrement) {for (;;) {int rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement);if (decrement == realCnt) {if (tryFinalRelease0(instance, rawCnt)) {return true;}} else if (decrement < realCnt) {// all changes to the raw count are 2x the "real" changeif (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {return false;}} else { // 引用计数器 ,已经释放了,其他处理器,再次释放会抛出此异常throw new IllegalReferenceCountException(realCnt, -decrement);}Thread.yield(); // this benefits throughput under high contention}}}
解释(1.1) 这里为什么要 cumulation.release()

​ Netty 读取报文消息采用的是DirectByteBuf (堆外内存)来存储,从而减少复制过程(零拷贝相关的知识),而堆外内存的垃圾回收不像堆内存那样方便,因此采用引用计数器的方式显示的来帮助回收垃圾 (ReferenceCounted)。

​ 前面 CustomMessageDecoder #decode()方法中,由于设置了跳过读取buf【 buf.skipBytes()】,根据 ByteToMessageDecoder # channelRead() 中的

// 部分代码if (cumulation != null && !cumulation.isReadable()) {// 省略cumulation.release(); // 省略} 

这段源码的语义,认为ByteBuf已经读完,随后就释放掉。为了下游Handler能够处理消息,需要通过冗余把即将释放掉的ByteBuf复制一份出来,然后交给下游的Handler进行处理。

解释 (2)为什么要跳过 读取剩下的buf

buf.skipBytes();

// ByteToMessageDecoder 部分代码 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {int oldInputLength = in.readableBytes();decodeRemovalReentryProtection(ctx, in, out); // 重新调用 decode 解码// 【原因在这里】// ByteToMessageDecoder 不允许,不读Buf,如果 重新调用decode 没有发生读取,则会抛异常if (oldInputLength == in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}}
// 部分代码
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {decode(ctx, in, out); // 重新调用 decode解码   CustomMessageDecoder#decode()} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {fireChannelRead(ctx, out, out.size());out.clear();handlerRemoved(ctx);}}}
2.读取正确报文

报文格式:

Head1Head2Len1Len2userIdFunCmddataHexCrcEnd1End2
0xAA0x860x00ByteByte N0x010x01Byte NByte0xDD0xEE

伪代码

	int begin;while(buf.isReadable()) 	{int readerIndex = buf.readerIndex(); // 记录readerindex 位置int readableBytes = buf.readableBytes(); // 可读报文长度// 尝试解析出正确的协议文件头byte head = buf.readByte();byte head2 = buf.readByte();buf.readByte();// 预留长度byte len2 = (byte) (buf.readByte() & 0xFF); // &0xFF 转16进制if ((head == HEAD1 && head2 == HEAD2)) {begin = readerIndex; // 指定到报文协议头位置准备开始解析// 报文协议头和长度正确,开始解析报文if (readableBytes == FIXED_LEN + len2) {CustomMessage msg = CustomMessageUtil.decode(buf.readerIndex(begin)); // 为什么要设置 buf.readerIndex() (1)out.add(msg);break;}}}
问题解释:
(1) 为什么要 设置 buf.readerIndex()

​ 1.正常情况下,一条完整的报文,只需要从0 开始读取就可以,由于ByteBuf 是共用的,如果出现粘包,,则读指针 readerIndex 会发生偏移,再次读取的时候,得从(粘包中)第一段报文中的包尾开始读,从0开始就不对。 (见3.粘包的处理)

​ 2.由于前面已经从ByteBuf中读取了文件头信息,此时readerIndex已经发生了偏移,CustomMessageUtil.decode()对报文解析的时候,也需要从协议文件的头部开始读。因此重置readerIndex到正确的位置 (协议文件头),再进行解析。

在这里插入图片描述

3.粘包处理

粘包处理分为两段:

第一段:

第一段是先找到正确的报文协议文件头和报文协议文件尾部,然后把这段报文解析出来,塞入out列表中,再由ByteToMessageDecoder将解析出来的Message对象传递给下游的Handler,而此时readerIndex 已经发生了偏移(刚好是一段正确报文的长度),当ByteToMessageDecoder#callDecode() 发现还有可读字节,则会再次调用decode进行解析。

第二段

当ByteToMessageDecoder#callDecode()再次调用decode时,需要记录上一段报文解析的结束位置来作为下一段报文的开始读取位置,然后将报文解析出来。(如下图 readerIndex=14 ,就是上一段报文的结束位置)

ByteToMessageDecoder 源码如下:

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {decode(ctx, in, out); // buf 没有读完,再次调用 子类的decode} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {fireChannelRead(ctx, out, out.size());out.clear();handlerRemoved(ctx);}}}

粘包处理伪代码:

@Slf4j
public class CustomMessageDecoder extends ByteToMessageDecoder {private final static byte FIXED_LEN = 10; private final static byte HEAD1 = (byte) 0xAA;private final static byte HEAD2 = 0x86;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int start = buf.readerIndex() == 0 ? 0 : buf.readerIndex(); // 解决粘包 且存在多余的报文问题int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) { // 判断是否存在可以解析的文件头文协议String hexDump = ByteBufUtil.hexDump(buf);// 粘包 (1) aa860004ef10ac0100031272ddeeaa860004ef10ac0100031472ddeeaa860004ef10ac0100031572ddeelog.debug("16 进制内容:{}", hexDump);try {int begin;while (buf.isReadable()) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes()// 协议文件头长度byte head = buf.readByte();byte head2 = buf.readByte();buf.readByte();// 预留长度byte len2 = (byte) (buf.readByte() & 0xFF); // &0xFF 转16进制if ((head == HEAD1 && head2 == HEAD2)) {begin = readerIndex;int length = FIXED_LEN + len2;// 粘包问题int lastIndex = buf.forEachByte(begin, length, b -> b == (byte) 0xDD || b == (byte) 0xEE); // 判断报文是否完整if (lastIndex > -1) {// 存在包尾CustomMessage msg = CustomMessageUtil.decode(buf.readerIndex(begin));out.add(msg);break;}}byte b = buf.readByte(); // 通过读取,来丢弃无效报文if (b == (byte) 0xDD) {byte end2 = buf.readByte();if (end2 == (byte) 0xEE) {break;}}}} catch (ProtocolException e) { // ProtocolException  为自定义异常log.error("协议解析失败,释放掉buf", e);buf.skipBytes(buf.readableBytes());} catch (Exception e) {log.error("解析失败:", e);}return;}if (start > 0) {log.warn("丢弃无效报文:{}", ByteBufUtil.hexDump(buf));buf.skipBytes(buf.readableBytes());return;}}
}

解析示意图如下:

在这里插入图片描述

在这里插入图片描述

4.半包处理

​ 一,半包的情况比较特殊,如果文件头不完整,则直接丢弃(根本无法识别)。二,如果只是将文件头解析出来,且剩下的报文不完整,则直接return 出去,等待剩下的报文,并再次读取。

ByteToMessageDecoder#Cumulator 会将剩下的半包报文,缓存(累加)起来,等待读取下一次发送过来的报文,然后进行拼接 。注意再次调用 decode 读取ByteBuf的时候,缓存的报文readerIndex指针的值是会记录下来的(即readerIndex 不一定 等于 0 ),解析的时候,需要从正确的文件头部分开始解析 见(2);

(1)半包情况1

// 伪代码	
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int start = buf.readerIndex() == 0 ? 0 : buf.readerIndex(); // 解决粘包 且存在多余的报文问题int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) {// 头协议文件正确while (buf.isReadable()) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();if (readableBytes < 4) {// 4 是文件头部分// 半包,且报文不完整return;}}}// 文件头完整,直接丢弃if (start > 0) {log.warn("丢弃无效报文:{}", ByteBufUtil.hexDump(buf));buf.skipBytes(buf.readableBytes());return;}}

(2)半包情况2

// 伪代码	
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int start = buf.readerIndex() == 0 ? 0 : buf.readerIndex(); // 解决粘包 且存在多余的报文问题 。【readerIndex !=0 的情况】int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) {// 头协议文件正确while (buf.isReadable()) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();if (readableBytes < 4) {// 4 是文件头部分// 半包,且报文不完整return;}// 文件头if ((head == HEAD1 && head2 == HEAD2)) {begin = readerIndex;// 半包问题// 协议文件头数据还未到齐,回到协议开始的位置,等待数据到齐if (readableBytes < FIXED_LEN + len2) {if (begin == 0) {// 重置 针对 半包buf.resetReaderIndex();return;}buf.readerIndex(begin); // 处理粘包+半包   【readerIndex需要再次重置为文件头的位置】return;}}}}}

解析示意图:

在这里插入图片描述

总结:

处理粘包,拆包,半包报文问题,步骤如下:

1.先按正确报文来解析出文件头协议及长度,再根据长度判断整条报文是否完整,(需要注意的是,有的报文长度不包含文件头及文件尾的),报文完整则正常解析即可。

2.粘包情况,先获取协议文件长度,再通过 buf.forEachByte(),指定起始位和终止位区间是否是一条完整的报文。(由于协议文件头已经校验,只需校验协议文件包尾就可以了),是则指定 readerIndex位置进行解析即可。

3.半包情况,同样的先获取协议文件长度,在判断ByteBuf的可读字节数是否大于等于一条完整报文的长度,否则记录readerIndex位置,然后return 出去,交给ByteToMessageDecoder累加ByteBuf中的报文数据,当下次报文请求过来拼接出完整的报文,解析即可。


其他

为什么要重写ByteToMessageDecoder

  • Netty 提供的粘包拆包解析器,不灵活,扩展不方便,特别是在物联网项目中,不同的设备采用的通讯协议可能不一样,报文消息长度也不是固定的,因此需要针对不同协议解析出不同的消息对象来进行具体的业务处理。
  • 重写ByteToMessageDecoder,可以灵活扩展,其他协议,同时很好的利用了Netty 责任链模式,把协议解析部分和业务处理部分分离解耦,开发人员只需要关注Message对象处理即可
Netty 常用的粘包拆包解析器UML图

在这里插入图片描述

所需具备的知识点:

1.ByteBuf 的使用

​ ByteToMessageDecoder是依赖于ByteBuf的相关方法 (readableBytes),来判断解码器是否完整的读取了报文

而要实现具体的自定义协议解析,则需要了解ByteBuf相关的API方法的使用。

2.了解设计模式 之 《责任链模式》

Neyyt 的出站和入站 采用了,责任链模式,方便我们扩展不同Handler来处理不同的业务。每个Handler只需要关注和处理自己实际业务即可,反之,则交给Pipeline中的下一个Handler 处理。

​ 有了这方面的知识点,我们的handler 分为两个部分,一、 继承 ByteToMessageDecoder 将二进制报文,解析成 我们定义的 Message对象,然后丢给下游Handler处理。二、该部分的Handler 继承 SimpleChannelInboundHandler<这里指定自定义的Message对象> 负责处理,读取到的Message对象。

流程如下:

在这里插入图片描述

这篇关于Netty 物联网自定协议(含粘包、拆包及半包处理)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/198216

相关文章

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

MyBatis延迟加载的处理方案

《MyBatis延迟加载的处理方案》MyBatis支持延迟加载(LazyLoading),允许在需要数据时才从数据库加载,而不是在查询结果第一次返回时就立即加载所有数据,延迟加载的核心思想是,将关联对... 目录MyBATis如何处理延迟加载?延迟加载的原理1. 开启延迟加载2. 延迟加载的配置2.1 使用

Android WebView的加载超时处理方案

《AndroidWebView的加载超时处理方案》在Android开发中,WebView是一个常用的组件,用于在应用中嵌入网页,然而,当网络状况不佳或页面加载过慢时,用户可能会遇到加载超时的问题,本... 目录引言一、WebView加载超时的原因二、加载超时处理方案1. 使用Handler和Timer进行超

Python中处理NaN值的技巧分享

《Python中处理NaN值的技巧分享》在数据科学和数据分析领域,NaN(NotaNumber)是一个常见的概念,它表示一个缺失或未定义的数值,在Python中,尤其是在使用pandas库处理数据时,... 目录NaN 值的来源和影响使用 pandas 的 isna()和 isnull()函数直接比较 Na

详解Python中通用工具类与异常处理

《详解Python中通用工具类与异常处理》在Python开发中,编写可重用的工具类和通用的异常处理机制是提高代码质量和开发效率的关键,本文将介绍如何将特定的异常类改写为更通用的ValidationEx... 目录1. 通用异常类:ValidationException2. 通用工具类:Utils3. 示例文

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

【Linux】应用层http协议

一、HTTP协议 1.1 简要介绍一下HTTP        我们在网络的应用层中可以自己定义协议,但是,已经有大佬定义了一些现成的,非常好用的应用层协议,供我们直接使用,HTTP(超文本传输协议)就是其中之一。        在互联网世界中,HTTP(超文本传输协议)是一个至关重要的协议,他定义了客户端(如浏览器)与服务器之间如何进行通信,以交换或者传输超文本(比如HTML文档)。