本文主要是介绍Netty 物联网自定协议(含粘包、拆包及半包处理),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Netty 物联网自定协议(含粘包、拆包及半包处理)
文章目录
- Netty 物联网自定协议(含粘包、拆包及半包处理)
- 实现:
- 1、自定义协议:
- 2.其他模块的介绍 (可以跳过)
- 1.服务器端的ChannelInitializer 实现
- 2.自定义的解码器
- 3.CustomMessageHandler (解耦,只需关注CustomMessage)
- 2、继承 ByteToMessageDecoder 类重写 decode (代码实现)
- ByteBuf与CustomMessage转换工具类:
- 3、粘包、拆包、半包报文处理思路及其解释 (重点)
- 1.责任链部分的处理
- 问题解释:
- 解释 (1)引用计数器异常问题
- 解释(1.1) 这里为什么要 cumulation.release()
- 解释 (2)为什么要跳过 读取剩下的buf
- 2.读取正确报文
- 问题解释:
- (1) 为什么要 设置 buf.readerIndex()
- 3.粘包处理
- 第一段:
- 第二段
- 4.半包处理
- 总结:
- 其他
- 为什么要重写ByteToMessageDecoder
- Netty 常用的粘包拆包解析器UML图
- 所需具备的知识点:
- 1.ByteBuf 的使用
- 2.了解设计模式 之 《责任链模式》
实现:
1、自定义协议:
/*** 自定义协议*/
@Data
public class CustomMessage {private byte head1 = (byte) 0XAA;private byte head2 = 0X86;private byte len1 = 0x00; // 预留private byte len2; // fun + cmd + dataHexprivate byte[] userId;private byte fun;private byte[] cmd;private byte[] dataHex;private byte CRC;private byte end1 = (byte) 0XDD;private byte end2 = (byte) 0XEE;
}
2.其他模块的介绍 (可以跳过)
1.服务器端的ChannelInitializer 实现
@Slf4j
@ChannelHandler.Sharable
public class LotNettyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {HeartbeatThreshold threshold = new HeartbeatThreshold(); // 心跳阈值相关处理// 不同的channel 处理方式不一样,责任链共用一个 threshold 没有问题ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(LotNettyConfig.USE_CODEC_STRATEGY.CodecStrategy().codecs()); // 自定义的解码器加载策略 见 3.自定义的解码器pipeline.addLast(new IdleStateHandler(LotNettyConfig.IDLE_CHECK_SECONDS, 0, 0));pipeline.addLast(new HeartbeatHandler(threshold));pipeline.addLast(new MiniMsgHandler(new MiniMsgProcessMessage(threshold))); // 其他自定义协议业务处理1pipeline.addLast(new CustomMessageHandler(new CustomMessageProcessMessage(threshold))); // 自定义协议业务处理log.info("初始化 pipeline ...");}
}
2.自定义的解码器
public class CustomCodecStrategy implements CodecStrategy {@Overridepublic ChannelHandler[] codecs() {// 大多数物联网设备都是二进制传输,编码器直接采用 ByteArrayEncoder()return new ChannelHandler[]{new ByteArrayEncoder(), new CustomMessageDecoder(), new MiniMsgDecoder()};}}
3.CustomMessageHandler (解耦,只需关注CustomMessage)
@Slf4j
@ChannelHandler.Sharable
public class CustomMessageHandler extends SimpleChannelInboundHandler<CustomMessage> {private final AbstractProcessMessage<CustomMessage> processMessage;public CustomMessageHandler(AbstractProcessMessage<CustomMessage> processMessage) {this.processMessage = processMessage;}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.warn("channel 关闭,清除相关的信息");LotNettyContext context = LotNettyContextUtil.getContext();context.clearChannel(ctx);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, CustomMessage msg) throws Exception {processMessage.process(ctx, msg); // 具体的业务处理,比如 设备状态上报处理等等}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("异常 ... ", cause);}
}
4.AbstractProcessMessage 主要针对不同的协议,使用不同的通讯策略进行通讯
/*** 协议处理抽象类* —— 每一个管道,都需要维护不同的ProcessMessage,心跳也是一样的,因此不能使用单例* @param <T>*/
@Slf4j
public abstract class AbstractProcessMessage<T> {protected final HeartbeatThreshold threshold;protected final AtomicBoolean useStrategy = new AtomicBoolean(false);public AbstractProcessMessage(HeartbeatThreshold threshold) {this.threshold = threshold;}public abstract void process(ChannelHandlerContext ctx, T msg);// BaseCommand 基础的心跳检测方法 比如: ping 、pong protected void useHeartbeatStrategy(ChannelHandlerContext ctx, BaseCommand baseCommand) {if (!useStrategy.get()) {// 加入 心跳检测策略LotNettyContextUtil.getContext().putOnceBaseCommand(ctx.channel().id(), baseCommand);useStrategy.compareAndSet(false, true);}}/*** 允许channel重新加载心跳策略,*/public void upUseStrategy() {useStrategy.compareAndSet(true, false);}}
2、继承 ByteToMessageDecoder 类重写 decode (代码实现)
@Slf4j
public class CustomMessageDecoder extends ByteToMessageDecoder {private final static byte FIXED_LEN = 10; // 固定长度,(报文文件长度=fun + cmd + dataHex),整条报文长度=FIXED_LEN+文件头长度 (len2)private final static byte HEAD1 = (byte) 0xAA;private final static byte HEAD2 = 0x86;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int start = buf.readerIndex() == 0 ? 0 : buf.readerIndex(); // 解决粘包 且存在多余的报文问题int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) {String hexDump = ByteBufUtil.hexDump(buf);// 粘包 (1) aa860004ef10ac0100031272ddeeaa860004ef10ac0100031472ddeeaa860004ef10ac0100031572ddee// 半包+粘包 (2) aa860004ef10 ac0100031272ddeeaa860004ef10ac0100031472ddeeaa860004ef10ac0100031572ddee// 粘包+半包 (3) aa860004ef10ac0100031272ddeeaa860004ef10ac01000314 72ddee// 粘包+半包 (4) aa860004ef10ac0100031272dd eeaa860004ef10ac0100031472ddee// 粘包+半包 + 文件头不完整 aa860004ef10ac0100031472ddeeaa8600 04ef10ac0100031472ddeelog.debug("16 进制内容:{}", hexDump);try {int begin; // 为了更直观体现,读取ByteBuf,采用显示指定readerIndex位置,ByteBuf有提供markReaderIndex(),其作用一样。while (buf.isReadable()) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();if (readableBytes < 4) {// 4 是文件头部分所占的字节数// 半包,且文件头不完整,没有读 ByteBUf的readIndex 指针没有发生偏移,直接返回,由 父类 Cumulator (ByteBuf累加器)累加等 待完整的报文return;}// 协议文件头长度byte head = buf.readByte();byte head2 = buf.readByte();buf.readByte();// 预留长度byte len2 = (byte) (buf.readByte() & 0xFF); // &0xFF 转16进制if ((head == HEAD1 && head2 == HEAD2)) {begin = readerIndex;// 半包问题// 协议文件头数据还未到齐,回到协议开始的位置,等待数据到齐if (readableBytes < FIXED_LEN + len2) {if (begin == 0) {// 重置 针对 半包buf.resetReaderIndex();return;}buf.readerIndex(begin); // 处理粘包+半包return;}// 正确报文if (readableBytes == FIXED_LEN + len2) {CustomMessage msg = CustomMessageUtil.decode(buf.readerIndex(begin)); out.add(msg);break;}int length = FIXED_LEN + len2;// 粘包问题int lastIndex = buf.forEachByte(begin, Math.min(readableBytes, length), b -> b == (byte) 0xDD || b == (byte) 0xEE);if (lastIndex > -1) {// 存在包尾CustomMessage msg = CustomMessageUtil.decode(buf.readerIndex(begin));out.add(msg);break;}}byte b = buf.readByte(); // 通过读取,来丢弃无效报文if (b == (byte) 0xDD) {byte end2 = buf.readByte();if (end2 == (byte) 0xEE) {break;}}}} catch (ProtocolException e) { // ProtocolException 为自定义异常log.error("协议解析失败,释放掉buf", e);buf.skipBytes(buf.readableBytes());} catch (Exception e) {log.error("解析失败:", e);}return;}if (start > 0) {log.warn("丢弃无效报文:{}", ByteBufUtil.hexDump(buf));buf.skipBytes(buf.readableBytes());return;}// 复制一份,防止skipBytes跳过,导致传递的消息引用计数器被释放掉,下游handler再次释放会抛异常// 同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1。ByteBuf buffer = buf.retainedDuplicate();// 解决 decode() did not read anything but decoded a message的异常// 原因是netty不允许有字节内容不读的情况发生,所以跳过已读buf.skipBytes(buf.readableBytes());out.add(buffer); // 交给下一个管道处理}
}
ByteBuf与CustomMessage转换工具类:
public class CustomMessageUtil {/*** 解码* @param buf* @return*/public static CustomMessage decode(ByteBuf buf) {CustomMessage msg = new CustomMessage();msg.setHead1(buf.readByte());msg.setHead2(buf.readByte());msg.setLen1(buf.readByte());byte len = buf.readByte();msg.setLen2(len);byte[] userId = new byte[3];buf.readBytes(userId);msg.setUserId(userId);msg.setFun(buf.readByte());byte readCount = (byte) (len - 1);decode_0x01(buf, msg, readCount); // 对 cmd + dataHex 功能模块的解析msg.setCRC(buf.readByte());msg.setEnd1(buf.readByte());msg.setEnd2(buf.readByte());return msg;}/*** 编码* @param msg* @return*/public static ByteBuf encode(CustomMessage msg) {ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.heapBuffer();buf.writeByte(msg.getHead1());buf.writeByte(msg.getHead2());buf.writeByte(msg.getLen1());int lengthIndex = buf.writerIndex();buf.writeByte(0x00); // 占位 (1)byte[] userId = msg.getUserId();if (userId != null) {for (byte c : userId) {buf.writeByte(c);}}buf.writeByte(msg.getFun());byte[] cmd = msg.getCmd();if (cmd != null) {for (byte c : cmd) {buf.writeByte(c);}}byte[] dataHex = msg.getDataHex();if (dataHex != null) {for (byte c : dataHex) {buf.writeByte(c);}}buf.writeByte(msg.getCRC());buf.writeByte(msg.getEnd1());buf.writeByte(msg.getEnd2());int i = buf.readableBytes() - 10;// 固定长度 结果等于 fun + cmd + dataHex 的长度buf.setByte(lengthIndex, i); // 塞入len 见 上(1)return buf;}}
3、粘包、拆包、半包报文处理思路及其解释 (重点)
1.责任链部分的处理
这里的目的是:识别出要处理的报文协议,否则直接交给pipeline中的下一个Handler处理
伪代码:
public class CustomMessageDecoder extends ByteToMessageDecoder {private final static byte FIXED_LEN = 10;private final static byte BEGIN1 = (byte) 0xAA;private final static byte BEGIN2 = 0x86;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) {// 识别出CustomMessage 协议头,进行解析return;}// 其他自定义的协议,交由下一个管道进行处理// 同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1。 (1)ByteBuf buffer = buf.retainedDuplicate();// 解决 decode() did not read anything but decoded a message的异常 (2)// 原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。buf.skipBytes(buf.readableBytes());out.add(buffer);}
}
问题解释:
解释 (1)引用计数器异常问题
(1)同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1
原因: ByteToMessageDecoder
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance();try {first = cumulation == null;cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {if (cumulation != null && !cumulation.isReadable()) {numReads = 0;cumulation.release(); // 由于(2)buf.skipBytes(buf.readableBytes()); 跳过了读取ByteBuf,所以ByteBuf 会被释放掉// 这里为什么要 release() ? (1.1) 下面解释cumulation = null;} else if (++numReads >= discardAfterReads) {numReads = 0;discardSomeReadBytes();}int size = out.size();firedChannelRead |= out.insertSinceRecycled();fireChannelRead(ctx, out, size);} finally {out.recycle();}}} else {ctx.fireChannelRead(msg);}}
抛出异常的部分代码:
public abstract class ReferenceCountUpdater<T extends ReferenceCounted> {private boolean retryRelease0(T instance, int decrement) {for (;;) {int rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement);if (decrement == realCnt) {if (tryFinalRelease0(instance, rawCnt)) {return true;}} else if (decrement < realCnt) {// all changes to the raw count are 2x the "real" changeif (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {return false;}} else { // 引用计数器 ,已经释放了,其他处理器,再次释放会抛出此异常throw new IllegalReferenceCountException(realCnt, -decrement);}Thread.yield(); // this benefits throughput under high contention}}}
解释(1.1) 这里为什么要 cumulation.release()
Netty 读取报文消息采用的是DirectByteBuf (堆外内存)来存储,从而减少复制过程(零拷贝相关的知识),而堆外内存的垃圾回收不像堆内存那样方便,因此采用引用计数器的方式显示的来帮助回收垃圾 (ReferenceCounted)。
前面 CustomMessageDecoder #decode()方法中,由于设置了跳过读取buf【 buf.skipBytes()】,根据 ByteToMessageDecoder # channelRead() 中的
// 部分代码if (cumulation != null && !cumulation.isReadable()) {// 省略cumulation.release(); // 省略}
这段源码的语义,认为ByteBuf已经读完,随后就释放掉。为了下游Handler能够处理消息,需要通过冗余把即将释放掉的ByteBuf复制一份出来,然后交给下游的Handler进行处理。
解释 (2)为什么要跳过 读取剩下的buf
buf.skipBytes();
// ByteToMessageDecoder 部分代码 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {int oldInputLength = in.readableBytes();decodeRemovalReentryProtection(ctx, in, out); // 重新调用 decode 解码// 【原因在这里】// ByteToMessageDecoder 不允许,不读Buf,如果 重新调用decode 没有发生读取,则会抛异常if (oldInputLength == in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}}
// 部分代码
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {decode(ctx, in, out); // 重新调用 decode解码 CustomMessageDecoder#decode()} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {fireChannelRead(ctx, out, out.size());out.clear();handlerRemoved(ctx);}}}
2.读取正确报文
报文格式:
Head1 | Head2 | Len1 | Len2 | userId | Fun | Cmd | dataHex | Crc | End1 | End2 |
---|---|---|---|---|---|---|---|---|---|---|
0xAA | 0x86 | 0x00 | Byte | Byte N | 0x01 | 0x01 | Byte N | Byte | 0xDD | 0xEE |
伪代码
int begin;while(buf.isReadable()) {int readerIndex = buf.readerIndex(); // 记录readerindex 位置int readableBytes = buf.readableBytes(); // 可读报文长度// 尝试解析出正确的协议文件头byte head = buf.readByte();byte head2 = buf.readByte();buf.readByte();// 预留长度byte len2 = (byte) (buf.readByte() & 0xFF); // &0xFF 转16进制if ((head == HEAD1 && head2 == HEAD2)) {begin = readerIndex; // 指定到报文协议头位置准备开始解析// 报文协议头和长度正确,开始解析报文if (readableBytes == FIXED_LEN + len2) {CustomMessage msg = CustomMessageUtil.decode(buf.readerIndex(begin)); // 为什么要设置 buf.readerIndex() (1)out.add(msg);break;}}}
问题解释:
(1) 为什么要 设置 buf.readerIndex()
1.正常情况下,一条完整的报文,只需要从0 开始读取就可以,由于ByteBuf 是共用的,如果出现粘包,,则读指针 readerIndex 会发生偏移,再次读取的时候,得从(粘包中)第一段报文中的包尾开始读,从0开始就不对。 (见3.粘包的处理)
2.由于前面已经从ByteBuf中读取了文件头信息,此时readerIndex已经发生了偏移,CustomMessageUtil.decode()对报文解析的时候,也需要从协议文件的头部开始读。因此重置readerIndex到正确的位置 (协议文件头),再进行解析。
3.粘包处理
粘包处理分为两段:
第一段:
第一段是先找到正确的报文协议文件头和报文协议文件尾部,然后把这段报文解析出来,塞入out列表中,再由ByteToMessageDecoder将解析出来的Message对象传递给下游的Handler,而此时readerIndex 已经发生了偏移(刚好是一段正确报文的长度),当ByteToMessageDecoder#callDecode() 发现还有可读字节,则会再次调用decode进行解析。
第二段
当ByteToMessageDecoder#callDecode()再次调用decode时,需要记录上一段报文解析的结束位置来作为下一段报文的开始读取位置,然后将报文解析出来。(如下图 readerIndex=14 ,就是上一段报文的结束位置)
ByteToMessageDecoder 源码如下:
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {decode(ctx, in, out); // buf 没有读完,再次调用 子类的decode} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {fireChannelRead(ctx, out, out.size());out.clear();handlerRemoved(ctx);}}}
粘包处理伪代码:
@Slf4j
public class CustomMessageDecoder extends ByteToMessageDecoder {private final static byte FIXED_LEN = 10; private final static byte HEAD1 = (byte) 0xAA;private final static byte HEAD2 = 0x86;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int start = buf.readerIndex() == 0 ? 0 : buf.readerIndex(); // 解决粘包 且存在多余的报文问题int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) { // 判断是否存在可以解析的文件头文协议String hexDump = ByteBufUtil.hexDump(buf);// 粘包 (1) aa860004ef10ac0100031272ddeeaa860004ef10ac0100031472ddeeaa860004ef10ac0100031572ddeelog.debug("16 进制内容:{}", hexDump);try {int begin;while (buf.isReadable()) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes()// 协议文件头长度byte head = buf.readByte();byte head2 = buf.readByte();buf.readByte();// 预留长度byte len2 = (byte) (buf.readByte() & 0xFF); // &0xFF 转16进制if ((head == HEAD1 && head2 == HEAD2)) {begin = readerIndex;int length = FIXED_LEN + len2;// 粘包问题int lastIndex = buf.forEachByte(begin, length, b -> b == (byte) 0xDD || b == (byte) 0xEE); // 判断报文是否完整if (lastIndex > -1) {// 存在包尾CustomMessage msg = CustomMessageUtil.decode(buf.readerIndex(begin));out.add(msg);break;}}byte b = buf.readByte(); // 通过读取,来丢弃无效报文if (b == (byte) 0xDD) {byte end2 = buf.readByte();if (end2 == (byte) 0xEE) {break;}}}} catch (ProtocolException e) { // ProtocolException 为自定义异常log.error("协议解析失败,释放掉buf", e);buf.skipBytes(buf.readableBytes());} catch (Exception e) {log.error("解析失败:", e);}return;}if (start > 0) {log.warn("丢弃无效报文:{}", ByteBufUtil.hexDump(buf));buf.skipBytes(buf.readableBytes());return;}}
}
解析示意图如下:
4.半包处理
一,半包的情况比较特殊,如果文件头不完整,则直接丢弃(根本无法识别)。二,如果只是将文件头解析出来,且剩下的报文不完整,则直接return 出去,等待剩下的报文,并再次读取。
ByteToMessageDecoder#Cumulator 会将剩下的半包报文,缓存(累加)起来,等待读取下一次发送过来的报文,然后进行拼接 。注意再次调用 decode 读取ByteBuf的时候,缓存的报文readerIndex指针的值是会记录下来的(即readerIndex 不一定 等于 0 ),解析的时候,需要从正确的文件头部分开始解析 见(2);
(1)半包情况1
// 伪代码
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int start = buf.readerIndex() == 0 ? 0 : buf.readerIndex(); // 解决粘包 且存在多余的报文问题int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) {// 头协议文件正确while (buf.isReadable()) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();if (readableBytes < 4) {// 4 是文件头部分// 半包,且报文不完整return;}}}// 文件头完整,直接丢弃if (start > 0) {log.warn("丢弃无效报文:{}", ByteBufUtil.hexDump(buf));buf.skipBytes(buf.readableBytes());return;}}
(2)半包情况2
// 伪代码
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {int start = buf.readerIndex() == 0 ? 0 : buf.readerIndex(); // 解决粘包 且存在多余的报文问题 。【readerIndex !=0 的情况】int index = buf.forEachByte(start, 1, new ByteProcessor.IndexOfProcessor(HEAD1));int index2 = buf.forEachByte(start, 2, new ByteProcessor.IndexOfProcessor(HEAD2));if (index > -1 && index2 > -1) {// 头协议文件正确while (buf.isReadable()) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();if (readableBytes < 4) {// 4 是文件头部分// 半包,且报文不完整return;}// 文件头if ((head == HEAD1 && head2 == HEAD2)) {begin = readerIndex;// 半包问题// 协议文件头数据还未到齐,回到协议开始的位置,等待数据到齐if (readableBytes < FIXED_LEN + len2) {if (begin == 0) {// 重置 针对 半包buf.resetReaderIndex();return;}buf.readerIndex(begin); // 处理粘包+半包 【readerIndex需要再次重置为文件头的位置】return;}}}}}
解析示意图:
总结:
处理粘包,拆包,半包报文问题,步骤如下:
1.先按正确报文来解析出文件头协议及长度,再根据长度判断整条报文是否完整,(需要注意的是,有的报文长度不包含文件头及文件尾的),报文完整则正常解析即可。
2.粘包情况,先获取协议文件长度,再通过 buf.forEachByte(),指定起始位和终止位区间是否是一条完整的报文。(由于协议文件头已经校验,只需校验协议文件包尾就可以了),是则指定 readerIndex位置进行解析即可。
3.半包情况,同样的先获取协议文件长度,在判断ByteBuf的可读字节数是否大于等于一条完整报文的长度,否则记录readerIndex位置,然后return 出去,交给ByteToMessageDecoder累加ByteBuf中的报文数据,当下次报文请求过来拼接出完整的报文,解析即可。
其他
为什么要重写ByteToMessageDecoder
- Netty 提供的粘包拆包解析器,不灵活,扩展不方便,特别是在物联网项目中,不同的设备采用的通讯协议可能不一样,报文消息长度也不是固定的,因此需要针对不同协议解析出不同的消息对象来进行具体的业务处理。
- 重写ByteToMessageDecoder,可以灵活扩展,其他协议,同时很好的利用了Netty 责任链模式,把协议解析部分和业务处理部分分离解耦,开发人员只需要关注Message对象处理即可
Netty 常用的粘包拆包解析器UML图
所需具备的知识点:
1.ByteBuf 的使用
ByteToMessageDecoder是依赖于ByteBuf的相关方法 (readableBytes),来判断解码器是否完整的读取了报文
而要实现具体的自定义协议解析,则需要了解ByteBuf相关的API方法的使用。
2.了解设计模式 之 《责任链模式》
Neyyt 的出站和入站 采用了,责任链模式,方便我们扩展不同Handler来处理不同的业务。每个Handler只需要关注和处理自己实际业务即可,反之,则交给Pipeline中的下一个Handler 处理。
有了这方面的知识点,我们的handler 分为两个部分,一、 继承 ByteToMessageDecoder 将二进制报文,解析成 我们定义的 Message对象,然后丢给下游Handler处理。二、该部分的Handler 继承 SimpleChannelInboundHandler<这里指定自定义的Message对象> 负责处理,读取到的Message对象。
流程如下:
这篇关于Netty 物联网自定协议(含粘包、拆包及半包处理)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!