netty编程之自定义编解码器

2024-08-24 00:04

本文主要是介绍netty编程之自定义编解码器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

写在前面

源码 。
本文看下netty如何自定义编解码器。为此netty专门定义抽象类io.netty.handler.codec.MessageToByteEncoderio.netty.handler.codec.ByteToMessageDecoder,后续我们实现自定义的编解码器就继承这两个类来做。

1:正戏

server 启动类:

package com.dahuyou.netty.customercodec.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {new NettyServer().bing(7397);}private void bing(int port) {//配置服务端NIO线程组EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));EventLoopGroup childGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(new MyChannelInitializer());ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {childGroup.shutdownGracefully();parentGroup.shutdownGracefully();}}}

MyChannelInitializer类:

package com.dahuyou.netty.customercodec.server;import com.dahuyou.netty.customercodec.codec.MyDecoder;
import com.dahuyou.netty.customercodec.codec.MyEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {
/**/
/*System.out.println("远端连接初始化,IP:" + channel.remoteAddress().getHostString() + ", port:" + channel.remoteAddress().getPort());*//**/
/* 解码器 *//*// 基于换行符号,基于换行符来分割字符串???
//        channel.pipeline().addLast(new LineBasedFrameDecoder(1024));// 基于指定字符串【换行符,这样功能等同于LineBasedFrameDecoder】// e.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, false, Delimiters.lineDelimiter()));// 基于最大长度// e.pipeline().addLast(new FixedLengthFrameDecoder(4));// 解码转String,注意调整自己的编码格式GBK、UTF-8 byte->stringchannel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));// 增加内置编码器,这样向对端发送消息就不要转换为二进制数据了channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));//在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyServerHandler());
\*/// 这里我们就不使用string的decoder和encoder了,而是使用自定义的编解码器// 解码器对应的抽象类是:ByteToMessageDecoder// 编码器对应的抽象类是:MessageToByteEncoder//自定义解码器channel.pipeline().addLast(new MyDecoder());//自定义编码器channel.pipeline().addLast(new MyEncoder());//在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyServerHandler());}}

其中的MyDecoder,MyEncoder就是需要我们自己的定义了编解码器了,如下:

package com.dahuyou.netty.customercodec.codec;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.nio.charset.Charset;
import java.util.List;/*** 自定义的解码器*/
//public class MyDecoder extends ByteToMessageDecoder<String> {
public class MyDecoder extends ByteToMessageDecoder {int BASE_LENGTH = 4;/*一个完整包的开始标记*/private final byte START_LABEL = 0x02;/*一个完整包的结束标记*/private final byte END_LABEL = 0x03;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//基础长度不足,我们设定基础长度为4if (in.readableBytes() < BASE_LENGTH) {return;}int beginIdx; //记录包头位置while (true) {// 获取包头开始的indexbeginIdx = in.readerIndex();// 标记包头开始的indexin.markReaderIndex();// 读到了协议的开始标志,结束while循环
//            if (in.readByte() == 0x02) {if (in.readByte() == START_LABEL) {break;}// 未读到包头,略过一个字节// 每次略过,一个字节,去读取,包头信息的开始标记in.resetReaderIndex();in.readByte();// 当略过,一个字节之后,// 数据包的长度,又变得不满足// 此时,应该结束。等待后面的数据到达if (in.readableBytes() < BASE_LENGTH) {return;}}//剩余长度不足可读取数量[没有内容长度位]int readableCount = in.readableBytes();if (readableCount <= 1) {in.readerIndex(beginIdx);return;}//长度域占4字节,读取intByteBuf byteBuf = in.readBytes(1);String msgLengthStr = byteBuf.toString(Charset.forName("GBK"));int msgLength = Integer.parseInt(msgLengthStr);//剩余长度不足可读取数量[没有结尾标识]readableCount = in.readableBytes();if (readableCount < msgLength + 1) {in.readerIndex(beginIdx);return;}ByteBuf msgContent = in.readBytes(msgLength);//如果没有结尾标识,还原指针位置[其他标识结尾]byte end = in.readByte();
//        if (end != 0x03) {if (end != END_LABEL) {in.readerIndex(beginIdx);return;}out.add(msgContent.toString(Charset.forName("GBK")));}
/*@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {}*/
}

解码器定义了这样的规则,ASCII码02作为开始字符,ASCII码03作为结束字符,这两个字符都是不可见的控制字符,分别是STX(start of text),ETX(end of text),如下是对stx比较专业的描述:

STX 是 ASCII(American Standard Code for Information Interchange,美国信息交换标准代码)中定义的一个控制字符,其代表“Start of Text”,即正文开始。在数据传输或文本通信中,STX 字符用于标记一个结构化数据块(如纯文本消息)的起始点。它的十进制 ASCII 值是 2,十六进制值是 0x02。

二者对应的ASCII码为:
在这里插入图片描述
然后第三个字符作为长度,之后就是根据解析到的长度来读取数据了,并且最后验证读取数据后的下一个字符是03。最终将数据转换为人类可读的字符串,如下图就是一个可被该解码器解析的例子:
在这里插入图片描述
开始字符 数据长度 数据 结束字符

package com.dahuyou.netty.customercodec.codec;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;//public class MyEncoder extends MessageToByteEncoder<String> {
public class MyEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {String msg = in.toString();byte[] bytes = msg.getBytes();byte[] send = new byte[bytes.length + 2];System.arraycopy(bytes, 0, send, 1, bytes.length);send[0] = 0x02;send[send.length - 1] = 0x03;out.writeInt(send.length);out.writeBytes(send);}
}

这里解码器可以暂时不用太关注,只是在数据的开头和结尾增加了STX和ETX,如下:
在这里插入图片描述
最后MyServerHandler的代码就比较简单了,如下:

package com.dahuyou.netty.customercodec.server;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;import java.text.SimpleDateFormat;
import java.util.Date;public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** 通道激活* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("远端连接初始化,IP:" + channel.remoteAddress().getHostString() + ", port:" + channel.remoteAddress().getPort());// 通知客户端链接建立成功String str = "notify channel build success: " + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";ctx.writeAndFlush(str);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 因为在初始化是已经设置了字符串的解码器,所以就不需要上述的手动读取二进制字节码的操作了,这就是框架的好处咯!System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);// 通知客户端链消息发送成功String str = "server received your msg: " + new Date() + " " + msg + "\r\n";ctx.writeAndFlush(str);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());}
}

启动server,简单起见我们直接使用netassit测试:
在这里插入图片描述
以上代码相当于发送了byte数组[stx,4,‘a’.‘1’,‘b’,‘2’,etx]。
当然除了使用工具测试外,我们也可以程序来测试,主要是如下代码(详细看源码):

package com.dahuyou.netty.customercodec.client;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;import java.text.SimpleDateFormat;
import java.util.Date;public class MyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("通道建立成功");// 通知客户端链接建立成功
/*String str = "hello server , i'm client very happy connect to you." + " " + new Date() + " " + ((SocketChannel) ctx.channel()).localAddress().getHostString() + "\r\n";
*/String xxx = "a1b2";System.out.println(xxx.getBytes());byte[] bytes = xxx.getBytes();// 增加stx length etxbyte[] send = new byte[bytes.length + 3];System.arraycopy(bytes, 0, send, 2, bytes.length);send[0] = 0x02; // stxsend[1] = 0X34; // length 4的ASCII码是52 十六进制就是34send[send.length - 1] = 0x03; // etxByteBuf buf = Unpooled.buffer(send.length/* + 3*/); // + 3 stx 长度 etxbuf.writeBytes(send);//       ctx.writeAndFlush(str);ctx.writeAndFlush(buf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " receive msg: " + msg);//通知客户端链消息发送成功
//        String str = "hello server received your msg: " + new Date() + " " + msg + "\r\n";
//        ctx.writeAndFlush(str);}
}

即channelActive中的逻辑,注意这里写入的都是ASCII码值。

写在后面

参考文章列表

Ascii完整码表(256个) 。
工具 › 开发类 › 进制转换 。
win的netassist TCP测试工具和Linux的nc工具使用 。

这篇关于netty编程之自定义编解码器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1100872

相关文章

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)

Oracle type (自定义类型的使用)

oracle - type   type定义: oracle中自定义数据类型 oracle中有基本的数据类型,如number,varchar2,date,numeric,float....但有时候我们需要特殊的格式, 如将name定义为(firstname,lastname)的形式,我们想把这个作为一个表的一列看待,这时候就要我们自己定义一个数据类型 格式 :create or repla