Netty之Jboss Marshalling编解码

2024-06-23 12:32

本文主要是介绍Netty之Jboss Marshalling编解码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Netty之JbossMarshalling编解码

    JbossMarshalling是一个java对象序列化包,对JDK默认的序列化框架进行了优化,但又保持跟java.io.Serializable接口的兼容,同时增加了一些可调的参数和附加的特性,这些参数和特性可通过工厂类进行配置。

一.服务端开发

1.1 SubReqServer实现

package marshalling;

 

import serializable.SubReqClient;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

importio.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.logging.LogLevel;

importio.netty.handler.logging.LoggingHandler;

 

public class SubReqServer {

         publicvoid bind(int port) throws Exception{

                   //配置服务端的NIO线程组

                   EventLoopGroupbossGroup=new NioEventLoopGroup();

                   EventLoopGroupworkerGroup=new NioEventLoopGroup();

                   try{

                            ServerBootstrapb=new ServerBootstrap();

                            b.group(bossGroup,workerGroup)

                            .channel(NioServerSocketChannel.class)

                            .option(ChannelOption.SO_BACKLOG,100)

                            .handler(newLoggingHandler(LogLevel.INFO))

                            .childHandler(newChannelInitializer<SocketChannel>() {

                                     @Override

                                     publicvoid initChannel(SocketChannel ch){

                                               ch.pipeline().addLast(

                                                                 MarshallingCodeCFatory.buildMarshallingDecoder());

                                               ch.pipeline().addLast(

                                                                 MarshallingCodeCFatory.buildMarshallingEncoder());

                                               ch.pipeline().addLast(newSubReqServerHandler());

                                     }

                            });

              // 绑定端口,同步等待成功

                            ChannelFuturef=b.bind(port).sync();

                            //等待服务端监听端口关闭

                            f.channel().closeFuture().sync();

 

                   }catch (Exception e) {

                            //TODO: handle exception

                   }

                   finally{

                            //优雅退出,释放线程池资源

                            bossGroup.shutdownGracefully();

                            workerGroup.shutdownGracefully();

                   }

         }

        

         publicstatic void main(String[] args) throws Exception{

                   intport=8080;

                   if(args!=null&&args.length>0){

                            try{

                                     port=Integer.valueOf(args[0]);

                                    

                            }catch (NumberFormatException e) {

                                     //TODO: handle exception

                                    

                            }

                            newSubReqClient().connect(port,"127.0.0.1");

                   }

         }

 

}    

   通过MarshallingCodeCFactory工厂类创建了MarshallingDecoder解码器,并将其加入到ChannelPipeline中;通过工厂类创建MarshallingEncoder编码器,并添加到ChannelPipeline中。

1.2 MarshallingCodeFactory实现

package marshalling;

 

importio.netty.handler.codec.marshalling.DefaultMarshallerProvider;

importio.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;

importio.netty.handler.codec.marshalling.MarshallerProvider;

importio.netty.handler.codec.marshalling.MarshallingDecoder;

importio.netty.handler.codec.marshalling.MarshallingEncoder;

importio.netty.handler.codec.marshalling.UnmarshallerProvider;

 

importorg.jboss.marshalling.MarshallerFactory;

import org.jboss.marshalling.Marshalling;

importorg.jboss.marshalling.MarshallingConfiguration;

 

public class MarshallingCodeCFactory {

        

         publicstatic MarshallingDecoder buildMarshallingDecoder() {

         //参数serial表示创建的是java序列化工厂对象

                   finalMarshallerFactory marshallerFactory=Marshalling.

                                     getProvidedMarshallerFactory("serial");

                   finalMarshallingConfiguration configuration=new MarshallingConfiguration();

                   //设置版本号为5

                   configuration.setVersion(5);

                   UnmarshallerProviderprovider=new

                                     DefaultUnmarshallerProvider(marshallerFactory,configuration);

                   //最大长度为1M

                   MarshallingDecoderdecoder=new MarshallingDecoder(provider,1024);

                   returndecoder;

         }

        

         /*/

          * 创建jboss marshalling编码器marshallingencoder

          */

   public static MarshallingEncoder buildMarshallingEncoder() {

             final MarshallerFactorymarshallerFactory=Marshalling.

                                     getProvidedMarshallerFactory("serial");

                   finalMarshallingConfiguration configuration=new MarshallingConfiguration();

                   configuration.setVersion(5);

                   MarshallerProviderprovider=new DefaultMarshallerProvider(

                                     marshallerFactory,configuration);

                   MarshallingEncoderencoder=new MarshallingEncoder(provider);

                   returnencoder;

                                    

    }

}

   首先通过Marshalling工具类的getProvidedMarshallerFactory静态方法获取MarshallerFactory实例,参数“serial”表示创建的是java序列化工厂对象。创建MarshallingConfiguration对象,将它的版本号设置为5,然后根据MarshallerFactory和MarshallingConfiguration创建UnmarshallerProvider实例,最后通过构造函数创建netty的marshallingDecoder对象。

二.客户端开发

2.1 SubReqClient实现

package marshalling;

 

import serializable.SubReqClient;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.SocketChannel;

importio.netty.channel.socket.nio.NioSocketChannel;

 

public class SubReqClient {

         publicvoid connect(int port,String host) throws Exception{

                   //配置客户端NIO线程组

                   EventLoopGroupgroup=new NioEventLoopGroup();

                   try{

                            Bootstrapb=new Bootstrap();

                            b.group(group).channel(NioSocketChannel.class)

                            .option(ChannelOption.TCP_NODELAY,true)

                            .handler(newChannelInitializer<SocketChannel>() {

                                     @Override

                                     publicvoid initChannel(SocketChannel ch) throws Exception{

                                               ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());

                                               ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

                                               ch.pipeline().addLast(newSubReqClientHandler());

                                     }

                            });

                            //发起异步连接操作

                            ChannelFuturef=b.connect(host,port).sync();

                            //等待客户端链路关闭

                            f.channel().closeFuture().sync();

                   }catch (Exception e) {

                            //TODO: handle exception

                   }finally{

                            //优雅退出,释放NIO线程组

                            group.shutdownGracefully();

                   }

         }

  

        

         publicstatic void main(String[] args) throws Exception{

                   intport=8080;

                   if(args!=null&&args.length>0){

                            try{

                                     port=Integer.valueOf(args[0]);

                                    

                            }catch (NumberFormatException e) {

                                     //TODO: handle exception

                                    

                            }

                            newSubReqClient().connect(port,"127.0.0.1");

                   }

         }

}

    客户端成功接收到了服务端返回的10条应答消息,subReqID为从0到9,与服务端发送的应答消息完全一致。

    利用netty的marshalling编解码器,可以轻松地开发出与jboss内部模块进行远程通信的程序,而且支持异步非阻塞,这无疑降低了基于netty开发应用程序与jboss内部模块对接的难度。

 

这篇关于Netty之Jboss Marshalling编解码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1087167

相关文章

【Netty】netty中都是用了哪些设计模式

对于工程师来说,掌握并理解运用设计模式,是非常重要的,但是除了学习基本的概念之外,需要结合优秀的中间件、框架源码学习其中的优秀软件设计,这样才能以不变应万变。 单例模式 单例模式解决的对象的唯一性,一般来说就是构造方法私有化、然后提供一个静态的方法获取实例。 在netty中,select用于处理CONTINUE、SELECT、BUSY_WAIT 三种策略,通过DefaultSelectStra

Java语言的Netty框架+云快充协议1.5+充电桩系统+新能源汽车充电桩系统源码

介绍 云快充协议+云快充1.5协议+云快充1.6+云快充协议开源代码+云快充底层协议+云快充桩直连+桩直连协议+充电桩协议+云快充源码 软件架构 1、提供云快充底层桩直连协议,版本为云快充1.5,对于没有对接过充电桩系统的开发者尤为合适; 2、包含:启动充电、结束充电、充电中实时数据获取、报文解析、Netty通讯框架、包解析工具、调试器模拟器软件等; 源码合作 提供完整云快充协议源代码

Linux speex音频库-音频数据编解码

speex音频数据编解码 speex简述speex encoder(编码器)speex decoder(解码器)denoise vad (降噪,语音活性检测) speex简述 speex官网 Speex: A Free Codec For Free Speech Overview Speex is an Open Source/Free Software patent-fre

Netty源码解析9-ChannelHandler实例之MessageToByteEncoder

MessageToByteEncoder框架可见用户使用POJO对象编码为字节数据存储到ByteBuf。用户只需定义自己的编码方法encode()即可。 首先看类签名: public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter 可知该类只处理出站事件,切确的说是write事件

Netty源码解析8-ChannelHandler实例之CodecHandler

编解码处理器作为Netty编程时必备的ChannelHandler,每个应用都必不可少。Netty作为网络应用框架,在网络上的各个应用之间不断进行数据交互。而网络数据交换的基本单位是字节,所以需要将本应用的POJO对象编码为字节数据发送到其他应用,或者将收到的其他应用的字节数据解码为本应用可使用的POJO对象。这一部分,又和JAVA中的序列化和反序列化对应。幸运的是,有很多其他的开源工具(prot

Netty源码解析7-ChannelHandler实例之TimeoutHandler

请戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigData TimeoutHandler 在开发TCP服务时,一个常见的需求便是使用心跳保活客户端。而Netty自带的三个超时处理器IdleStateHandler,ReadTimeoutHandler和WriteTimeoutHandler可完美满足此需求。其中IdleSt

Netty源码解析6-ChannelHandler实例之LoggingHandler

LoggingHandler 日志处理器LoggingHandler是使用Netty进行开发时的好帮手,它可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。首先看类签名: @Sharablepublic class LoggingHandler extends ChannelDuplexHandler 注解Sharable说明LoggingHandler没有状态相关变量,

Netty源码解析5-ChannelHandler

ChannelHandler并不处理事件,而由其子类代为处理:ChannelInboundHandler拦截和处理入站事件,ChannelOutboundHandler拦截和处理出站事件。ChannelHandler和ChannelHandlerContext通过组合或继承的方式关联到一起成对使用。事件通过ChannelHandlerContext主动调用如fireXXX()和write(msg)

Netty源码解析4-Handler综述

Netty中的Handler简介 Handler在Netty中,占据着非常重要的地位。Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、 统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。一句话,没有它做不到的只有你想不到的 Netty中的所有handler都实现自ChannelHandler接口。按照输入

Netty源码解析3-Pipeline

请戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigData Channel实现概览 在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。 那么ChannelPipeline是什么呢?我觉得可以理解为ChannelHandler的容器:一个Channel包含一个Chan