自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现

2024-04-03 18:48

本文主要是介绍自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、zk的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-zk</artifactId><packaging>jar</packaging><name>rpc-zk Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- ZooKeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></dependency></dependencies><build><finalName>rpc-zk</finalName></build>
</project>

cn.tianjun.zk.Constant

package cn.tianjun.zk;/*** 常量*/
public class Constant {public static final int ZK_SESSION_TIMEOUT = 5000;//zk超时时间public static final String ZK_REGISTRY_PATH = "/registry";//注册节点public static final String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";//节点
}

cn.tianjun.zk.ServiceDiscovery

package cn.tianjun.zk;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 本类用于client发现server节点的变化 ,实现负载均衡**/
public class ServiceDiscovery {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);private CountDownLatch latch = new CountDownLatch(1);private volatile List<String> dataList = new ArrayList<String>();private String registryAddress;/*** zk链接** @param registryAddress*/public ServiceDiscovery(String registryAddress) {this.registryAddress = registryAddress;ZooKeeper zk = connectServer();if (zk != null) {watchNode(zk);}}/*** 发现新节点** @return*/public String discover() {String data = null;int size = dataList.size();// 存在新节点,使用即可if (size > 0) {if (size == 1) {data = dataList.get(0);LOGGER.debug("using only data: {}", data);} else {data = dataList.get(ThreadLocalRandom.current().nextInt(size));LOGGER.debug("using random data: {}", data);}}return data;}/*** 链接** @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 监听** @param zk*/private void watchNode(final ZooKeeper zk) {try {// 获取所有子节点List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH,new Watcher() {public void process(WatchedEvent event) {// 节点改变if (event.getType() == Event.EventType.NodeChildrenChanged) {watchNode(zk);}}});List<String> dataList = new ArrayList<String>();// 循环子节点for (String node : nodeList) {// 获取节点中的服务器地址byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/"+ node, false, null);// 存储到list中dataList.add(new String(bytes));}LOGGER.debug("node data: {}", dataList);// 将节点信息记录在成员变量this.dataList = dataList;} catch (Exception e) {LOGGER.error("", e);}}
}

cn.tianjun.zk.ServiceRegistry

package cn.tianjun.zk;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 服务注册 ,ZK 在该架构中扮演了“服务注册表”的角色,用于注册所有服务器的地�?与端口,并对客户端提供服务发现的功能* */
public class ServiceRegistry {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);private CountDownLatch latch = new CountDownLatch(1);private String registryAddress;public ServiceRegistry(String registryAddress) {//zookeeper的地�?this.registryAddress = registryAddress;}/*** 创建zookeeper链接* * @param data*/public void register(String data) {if (data != null) {ZooKeeper zk = connectServer();if (zk != null) {createNode(zk, data);}}}/*** 创建zookeeper链接,监�?* * @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 创建节点* * @param zk* @param data*/private void createNode(ZooKeeper zk, String data) {try {byte[] bytes = data.getBytes();if (zk.exists(Constant.ZK_REGISTRY_PATH, null) == null) {zk.create(Constant.ZK_REGISTRY_PATH, null, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}String path = zk.create(Constant.ZK_DATA_PATH, bytes,Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOGGER.debug("create zookeeper node ({} => {})", path, data);} catch (Exception e) {LOGGER.error("", e);}}
}

2、utils的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-utils</artifactId><packaging>jar</packaging><name>rpc-utils Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><!-- Protostuff --><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-core</artifactId></dependency><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-runtime</artifactId></dependency><!-- Objenesis --><dependency><groupId>org.objenesis</groupId><artifactId>objenesis</artifactId></dependency></dependencies><build><finalName>rpc-utils</finalName></build>
</project>

cn.tianjun.rpc.utils.RpcDecoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;/*** RPC 解码器**/
public class RpcDecoder extends ByteToMessageDecoder {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcDecoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int dataLength = in.readInt();if (dataLength < 0) {ctx.close();}if (in.readableBytes() < dataLength) {in.resetReaderIndex();}//将ByteBuf转换为byte[]byte[] data = new byte[dataLength];in.readBytes(data);//将data转换成objectObject obj = SerializationUtil.deserialize(data, genericClass);out.add(obj);}
}

cn.tianjun.rpc.utils.RpcEncoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;/*** RPC 编码器**/
public class RpcEncoder extends MessageToByteEncoder<Object> {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcEncoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic void encode(ChannelHandlerContext ctx, Object inob, ByteBuf out)throws Exception {//序列化if (genericClass.isInstance(inob)) {byte[] data = SerializationUtil.serialize(inob);out.writeInt(data.length);out.writeBytes(data);}}
}

cn.tianjun.rpc.utils.RpcRequest

package cn.tianjun.rpc.utils;/*** 封装 RPC 请求*  封装发送的object的反射属性*/
public class RpcRequest {private String requestId;private String className;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParameterTypes() {return parameterTypes;}public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}public Object[] getParameters() {return parameters;}public void setParameters(Object[] parameters) {this.parameters = parameters;}
}

cn.tianjun.rpc.utils.RpcResponse

package cn.tianjun.rpc.utils;/*** 封装 RPC 响应* 封装相应object*/
public class RpcResponse {private String requestId;private Throwable error;private Object result;public boolean isError() {return error != null;}public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Throwable getError() {return error;}public void setError(Throwable error) {this.error = error;}public Object getResult() {return result;}public void setResult(Object result) {this.result = result;}
}

cn.tianjun.rpc.utils.SerializationUtil

package cn.tianjun.rpc.utils;import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;/*** 序列化工具类(基于 Protostuff 实现)**/
public class SerializationUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();private static Objenesis objenesis = new ObjenesisStd(true);private SerializationUtil() {}/*** 获取类的schema* @param cls* @return*/@SuppressWarnings("unchecked")private static <T> Schema<T> getSchema(Class<T> cls) {Schema<T> schema = (Schema<T>) cachedSchema.get(cls);if (schema == null) {schema = RuntimeSchema.createFrom(cls);if (schema != null) {cachedSchema.put(cls, schema);}}return schema;}/*** 序列化(对象 -> 字节数组)*/@SuppressWarnings("unchecked")public static <T> byte[] serialize(T obj) {Class<T> cls = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(cls);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//序列�?} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/*** 反序列化(字节数组 -> 对象)*/public static <T> T deserialize(byte[] data, Class<T> cls) {try {/** 如果一个类没有参数为空的构造方法时候,那么你直接调用newInstance方法试图得到一个实例对象的时候是会抛出异常的* 通过ObjenesisStd可以完美的避开这个问题* */T message = (T) objenesis.newInstance(cls);//实例化Schema<T> schema = getSchema(cls);//获取类的schemaProtostuffIOUtil.mergeFrom(data, message, schema);return message;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
}

3、protocol的实现

这个只是接口的定义,pom.xml里面为空。

cn.tianjun.rpc.protocol.HelloService

package cn.tianjun.rpc.protocol;public interface HelloService {String hello(String name);String hello(Person person);
}

cn.tianjun.rpc.protocol.Person

package cn.tianjun.rpc.protocol;public class Person {private String firstName;private String lastName;public Person() {}public Person(String firstName, String lastName) {this.firstName = firstName;this.lastName = lastName;}public String getFirstName() {return firstName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}
}

这篇关于自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/873702

相关文章

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

Python脚本实现自动删除C盘临时文件夹

《Python脚本实现自动删除C盘临时文件夹》在日常使用电脑的过程中,临时文件夹往往会积累大量的无用数据,占用宝贵的磁盘空间,下面我们就来看看Python如何通过脚本实现自动删除C盘临时文件夹吧... 目录一、准备工作二、python脚本编写三、脚本解析四、运行脚本五、案例演示六、注意事项七、总结在日常使用

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

Python实现高效地读写大型文件

《Python实现高效地读写大型文件》Python如何读写的是大型文件,有没有什么方法来提高效率呢,这篇文章就来和大家聊聊如何在Python中高效地读写大型文件,需要的可以了解下... 目录一、逐行读取大型文件二、分块读取大型文件三、使用 mmap 模块进行内存映射文件操作(适用于大文件)四、使用 pand

python实现pdf转word和excel的示例代码

《python实现pdf转word和excel的示例代码》本文主要介绍了python实现pdf转word和excel的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、引言二、python编程1,PDF转Word2,PDF转Excel三、前端页面效果展示总结一