netty编程之使用protostuff作为数据传输载体

2024-08-29 19:12

本文主要是介绍netty编程之使用protostuff作为数据传输载体,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

写在前面

源码 。
本文看下使用protostuff作为数据传输的载体。

1:正戏

1.1:server

server main:

package com.dahuyou.netty.protostuff.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {new NettyServer().bing(7397);}private void bing(int port) {//配置服务端NIO线程组EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));EventLoopGroup childGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(new MyChannelInitializer());ChannelFuture f = b.bind(port).sync();System.out.println("netty server start done. {}");f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {childGroup.shutdownGracefully();parentGroup.shutdownGracefully();}}}

MyChannelInitializer:

package com.dahuyou.netty.protostuff.server;import com.dahuyou.netty.protostuff.codec.ObjDecoder;
import com.dahuyou.netty.protostuff.codec.ObjEncoder;
import com.dahuyou.netty.protostuff.domain.MsgInfo;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {//对象传输处理channel.pipeline().addLast(new ObjDecoder(MsgInfo.class));channel.pipeline().addLast(new ObjEncoder(MsgInfo.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyServerHandler());}}

这里设置了自定义的protostuff的编解码器,如下:

package com.dahuyou.netty.protostuff.codec;import com.dahuyou.netty.protostuff.util.SerializationUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class ObjEncoder extends MessageToByteEncoder {private Class<?> genericClass;public ObjEncoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overrideprotected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out)  {if (genericClass.isInstance(in)) {byte[] data = SerializationUtil.serialize(in);out.writeInt(data.length);out.writeBytes(data);}}}
package com.dahuyou.netty.protostuff.codec;import com.dahuyou.netty.protostuff.util.SerializationUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;public class ObjDecoder extends ByteToMessageDecoder {private Class<?> genericClass;public ObjDecoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int dataLength = in.readInt();if (in.readableBytes() < dataLength) {in.resetReaderIndex();return;}byte[] data = new byte[dataLength];in.readBytes(data);out.add(SerializationUtil.deserialize(data, genericClass));}}

消息处理类MyServerHandler:

package com.dahuyou.netty.protostuff.server;import com.alibaba.fastjson.JSON;
import com.dahuyou.netty.protostuff.util.MsgUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*///通知客户端链接建立成功String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";ctx.writeAndFlush(MsgUtil.buildMsg(channel.id().toString(), str));}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息类型:" + msg.getClass());System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息内容:" + JSON.toJSONString(msg));}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

1.2:client

client main:

package com.dahuyou.netty.protostuff.client;import com.dahuyou.netty.protostuff.util.MsgUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) {new NettyClient().connect("127.0.0.1", 7397);}private void connect(String inetHost, int inetPort) {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new MyChannelInitializer());ChannelFuture f = b.connect(inetHost, inetPort).sync();System.out.println("netty client start done. {}");f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服务端,我是大忽悠。"));f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服务端,我是大忽悠。"));f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}}

MyChannelInitializer:

package com.dahuyou.netty.protostuff.client;import com.dahuyou.netty.protostuff.codec.ObjDecoder;
import com.dahuyou.netty.protostuff.codec.ObjEncoder;
import com.dahuyou.netty.protostuff.domain.MsgInfo;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//对象传输处理channel.pipeline().addLast(new ObjDecoder(MsgInfo.class));channel.pipeline().addLast(new ObjEncoder(MsgInfo.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyClientHandler());}}

protostuff的编解码器同server端的。
消息处理类MyClientHandler:

package com.dahuyou.netty.protostuff.client;import com.alibaba.fastjson.JSON;
import com.dahuyou.netty.protostuff.util.MsgUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyClientHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*///通知客户端链接建立成功String str = "通知服务端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString();ctx.writeAndFlush(MsgUtil.buildMsg(channel.id().toString(), str));}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("断开链接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息类型:" + msg.getClass());System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息内容:" + JSON.toJSONString(msg));}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

1.3:测试

在这里插入图片描述
在这里插入图片描述
酱!!!

写在后面

参考文章列表

protostuff序列化方式学习 。

这篇关于netty编程之使用protostuff作为数据传输载体的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1118682

相关文章

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

C++ Primer 多维数组的使用

《C++Primer多维数组的使用》本文主要介绍了多维数组在C++语言中的定义、初始化、下标引用以及使用范围for语句处理多维数组的方法,具有一定的参考价值,感兴趣的可以了解一下... 目录多维数组多维数组的初始化多维数组的下标引用使用范围for语句处理多维数组指针和多维数组多维数组严格来说,C++语言没

在 Spring Boot 中使用 @Autowired和 @Bean注解的示例详解

《在SpringBoot中使用@Autowired和@Bean注解的示例详解》本文通过一个示例演示了如何在SpringBoot中使用@Autowired和@Bean注解进行依赖注入和Bean... 目录在 Spring Boot 中使用 @Autowired 和 @Bean 注解示例背景1. 定义 Stud

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景

使用Python快速实现链接转word文档

《使用Python快速实现链接转word文档》这篇文章主要为大家详细介绍了如何使用Python快速实现链接转word文档功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 演示代码展示from newspaper import Articlefrom docx import

oracle DBMS_SQL.PARSE的使用方法和示例

《oracleDBMS_SQL.PARSE的使用方法和示例》DBMS_SQL是Oracle数据库中的一个强大包,用于动态构建和执行SQL语句,DBMS_SQL.PARSE过程解析SQL语句或PL/S... 目录语法示例注意事项DBMS_SQL 是 oracle 数据库中的一个强大包,它允许动态地构建和执行

SpringBoot中使用 ThreadLocal 进行多线程上下文管理及注意事项小结

《SpringBoot中使用ThreadLocal进行多线程上下文管理及注意事项小结》本文详细介绍了ThreadLocal的原理、使用场景和示例代码,并在SpringBoot中使用ThreadLo... 目录前言技术积累1.什么是 ThreadLocal2. ThreadLocal 的原理2.1 线程隔离2

Python itertools中accumulate函数用法及使用运用详细讲解

《Pythonitertools中accumulate函数用法及使用运用详细讲解》:本文主要介绍Python的itertools库中的accumulate函数,该函数可以计算累积和或通过指定函数... 目录1.1前言:1.2定义:1.3衍生用法:1.3Leetcode的实际运用:总结 1.1前言:本文将详