本文主要是介绍【BlossomRPC】服务端与客户端请求Handler,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 客户端Handler
- 服务端Handler
RPC项目
配置中心项目
网关项目
客户端Handler
承接上文,客户端的Handler其实就比较简单了,因为客户端作为接收数据的时候,我们只需要从上文提到的Cache中通过reqId的方式拿到Future/Promise对象,然后设置他们的值,就可以马上进行返回。
客户端代码如下:
import blossom.project.rpc.core.entity.RpcDto;
import blossom.project.rpc.core.entity.RpcCache;
import blossom.project.rpc.core.entity.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;import java.util.Objects;/*** @author: ZhangBlossom* @date: 2023/12/17 02:43* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* NettyRpcServerHandler类** 备忘录* 有点烧脑 分析一下这个类怎么用 先睡了* 1.1:当前类是客户端接收到服务器的response了* 1.2:如果没有报错,那么我就要从我的cache中拿到* 我特定reqId对应的promise* 1.3:设定promise的值* 1.3.1:promise一旦被设定,promise.get()的阻塞马上就会结束* 1.3.2:也就是我成功拿到了Server的响应值* 1.3.3:那么Client的这次调用就是成功的* 1.3.4:否则失败* 1.4:删除promise再缓存中的reqId* 1.5:这里如果对future/promise进行设置值之后,代理应该马上返回* 1.6:用promise的setXxx类型方法比较合适**/
@Slf4j
public class NettyRpcClientHandler extends SimpleChannelInboundHandler<RpcDto<RpcResponse>> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcDto<RpcResponse> msg) throws Exception {if (Objects.isNull(msg)){log.info("the RpcDto<Response> is Null,return...");return;}log.info("receive the Rpc Server Data, msg is: {}",msg);long reqId = msg.getHeader().getReqId();//TODO 得到并且删除 考虑一下DefaultPromise是否需要封装DefaultPromise defaultPromise = RpcCache.RESPONSE_CACHE.remove(reqId);defaultPromise.setSuccess(msg.getData());}
}
这里比较重要的是了解一下Future和Promise的特性。
Netty中的异步模型广泛地使用了Future和Promise来处理异步操作。以下是它们的核心作用:
- Future:
○ Future代表了一个可能还没有完成的异步操作的结果。
○ 在Netty中,当你执行一个异步操作(如发送数据),你会得到一个Future对象。这个对象可以用来在未来某个时刻获取操作的结果。 - Promise:
○ Promise是Future的一个子接口,它不仅代表了异步操作的结果,还可以被操作的执行者显式地标记为成功或失败。
○ 在Netty中,Promise用于在操作完成时设置操作的结果(成功或失败)。这是一个写入结果的Future。
这里使用Promise会更加方便。
因为Promise提供了setXxx类型方法,这个方法确保一旦被设置值,get/exception就会马上进行返回从而结束阻塞。因此Promise类型非常适合我们当前的场景,同时,还有一个点,就是因为,Netty那边的返回值。
我们知道Netty使用的是异步处理。
当我们发送一个请求的时候,我们会拿到一个返回值如下:
ChannelFuture sendFuture = future.channel().writeAndFlush(requestRpcDto);
//继承了Future
public interface ChannelFuture extends Future<Void>
对于ChannelFuture的处理,有非常非常多种的处理方法。
监听器,sync同步处理,await/get异步处理等。
这里由于我们希望是能显式的拿到客户端请求的返回值,同时减少阻塞等待。
我们不使用原生的方法,也就是我们只是用Netty发送完毕请求,而请求返回值最后的处理,我们通过对上面Cache的处理来进行。
我们只要确保,Cache中对于一个reqId,唯一对应一个Future/Promise对象即可。
然后再客户端拿到数据的时候,通过reqId对Promise进行设置值即可。
这样子就能结束Promise的get方法的阻塞等待。
参考思路和Promise的测试代码如下:
package blossom.project.rpc.core.entity;import io.netty.channel.DefaultEventLoop;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.concurrent.ExecutionException;/*** @author: ZhangBlossom* @date: 2023/12/16 23:39* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* RpcPromise类* Promise用来异步处理* 1:Future代表了一个可能还没有完成的异步操作的结果。* 2:在Netty中,当你执行一个异步操作(如发送数据),* 会得到一个Future对象。这个对象可以用来在未来某个时刻获取操作的结果。* 3:Promise是Future的一个子接口,它不仅代表了异步操作的结果,* 还可以被操作的执行者显式地标记为成功或失败。* 4:在Netty中,Promise用于在操作完成时设置操作的结果(成功或失败)。* 这是一个写入结果的Future。***/
@Data
@NoArgsConstructor
public class RpcPromise<T> extends DefaultPromise<T>
{//private Promise<T> promise;////public RpcPromise(Promise<T> promise) {// this.promise = promise;//}/*** 思考一下* 1: 我的代码在这里是异步处理的返回结果* 2: 什么时候这个返回结果可以被设置值?* 3: 应该就是在我client接收到server的返回值的时候* 4: 也就是说我可以再clienthandler里面添加一个对promise的处理* 5: 也就是说我得有一个cache一样的东西能缓存我的promise* 6: 然后再client得到数据的时候去设置promise的值* 7: 不论成功失败都如main函数里面一样操作就行* 8: promise应该也要被server去使用* 9: cache应该是map结构* @param args*/public static void main(String[] args) throws ExecutionException, InterruptedException {//1:使用Promise作为属性//RpcPromise<RpcResponse> promise1=new RpcPromise<>// (new DefaultPromise<RpcResponse>// (new DefaultEventLoop()));//promise1.promise.setSuccess(new RpcResponse());//promise1.setSuccess(new RpcResponse());//第二种方式 直接用原生defaultpromiseRpcPromise promise = new RpcPromise();promise.setSuccess("success");promise.get();}
}
到此为止,我想我们就已经顺利的完成了对于客户端的代理请求的处理。
总结一下完整流程:
NettyRpcClientHandler 设置 Promise 的状态
- 接收响应: 当RPC响应从服务器端返回时,Netty通过我设置的pipeline中的RpcDecoder解码这个响应,并将其传递到RpcClientHandler。
- 处理响应: 在RpcClientHandler的channelRead0方法中,代码处理接收到的响应。这个方法首先从响应消息中提取出请求ID。
- 查找对应的 Promise: 使用这个请求ID,RpcClientHandler从CACHE中查找之前存储的与该请求ID对应的Promise。
- 设置Promise的状态: 一旦找到相应的RpcFuture,RpcClientHandler就会调promise.setSuccess(msg.getData()),将Promise的状态设置为成功,并附上从响应中获取的数据。如果在处理响应的过程中发生了错误,也可能会调用setFailure方法来标记Promise为失败,并传递错误信息(懒得失败了)。
服务端Handler
服务端的Handler要做的事情也很简单,其实就是拿到请求数据之后,通过反射的方式去调用我们本地的方法即可。
这里按照我之前的思路,先编写一个RPC服务方法的缓存,缓存所有的服务信息,然后到时候服务端接收到客户端请求的时候,先从缓存中判断是否存在有方法可以被调用。
package blossom.project.rpc.core.proxy.spring.rpcmethod;import blossom.project.rpc.core.entity.RpcRequest;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;/*** @author: ZhangBlossom* @date: 2023/12/18 22:10* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* RpcServiceMethodCache类*/
public class RpcServiceMethodCache {/*** rpc方法cache* 规则:使用 class.getClass()+"."+methodName的方式保存方法路径*/public static Map<String, RpcServiceMethod> METHOD_CACHE =new ConcurrentHashMap<>();/*** 使用饿汉式单例*/private static RpcServiceMethodCache INSTANCE = new RpcServiceMethodCache();private RpcServiceMethodCache(){}public static RpcServiceMethodCache getInstance(){return INSTANCE;}/*** 当前方法用于调用rpcmethod* 这里的invoke方法最终目的就是真正的去调用client发送过来的rpc请求,* 从cache里面拿到那些有注解的rpc方法即可* @param request* @return*/public Object rpcMethodInvoke(RpcRequest request){String key=request.getClassName()+"."+request.getMethodName();RpcServiceMethod rpcServiceMethod= METHOD_CACHE.get(key);if(Objects.isNull(rpcServiceMethod)){return null;}Object methodPath =rpcServiceMethod.getMethodPath();Method method=rpcServiceMethod.getMethod();try {return method.invoke(methodPath,request.getParams());} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}
然后接下来我们就要考虑使用哪几种反射方法来进行方法的调用了。
很容易想到的三种方法。
- jdk动态代理
- cglib动态代理
- spring容器获取bean反射代理
再源码中,我对这三种方法都进行了实现。不过最简单的肯定还是用反射了,简单量小。
package blossom.project.rpc.core.netty.handler;import blossom.project.rpc.common.enums.ReqTypeEnum;
import blossom.project.rpc.core.entity.RpcDto;
import blossom.project.rpc.core.entity.RpcHeader;
import blossom.project.rpc.core.entity.RpcRequest;
import blossom.project.rpc.core.entity.RpcResponse;
import blossom.project.rpc.core.proxy.spring.rpcmethod.RpcServiceMethodCache;
import blossom.project.rpc.core.proxy.spring.server.SpringRpcProxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** @author: ZhangBlossom* @date: 2023/12/16 19:43* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* NettyRpcServerHandler类* 1:服务端接收到请求数据之后,需要进行解析* 2:解析后确定具体要调用的请求服务是哪一个* 2.1:这里应该要用到动态代理了* 2.2:分析使用那种动态代理 JDK/CGLIB/SpringIoC* 2.3:分析这三种方法的代码实现* 1:对于JDK直接用正常的反射* 2:对于CGLIB那么就是走CGLIB的常规写法* 3:对于Spring就要考虑把这些类存到容器中,* 然后要使用的时候从容器中进行获取*/
public class NettyRpcServerHandler extends SimpleChannelInboundHandler<RpcDto<RpcRequest>> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcDto<RpcRequest> msg) throws Exception {RpcHeader header = msg.getHeader();//当前是响应数据header.setReqType(ReqTypeEnum.RESPONSE.getCode());//使用反射的方式在运行时调用对应的类的方法//这里你可以思考一下用什么方式可以最快的找到我想要的类并且调用方法//目前我提供了:JDK CGLIB SpringIOC容器 HashMap自制工厂//Object data = SpringRpcProxy.invoke(msg.getData());//使用JDK动态代理//Object data = RpcInvocationHandler.invoke(msg.getData());//使用CGLIB动态代理//Object data = RpcCglibProxy.invoke(msg.getData());//使用封装好的rpc对象去发送请求Object data = RpcServiceMethodCache.getInstance().rpcMethodInvoke(msg.getData());RpcDto<RpcResponse> dto = new RpcDto();RpcResponse response = new RpcResponse();response.setData(data);response.setMsg("success!!!");dto.setData(response);dto.setHeader(header);//写出数据ctx.writeAndFlush(dto);}}
至此,我们就完成了服务端接收到请求并反射调用本地方法之后得到返回数据,并将返回数据返回给客户端的代码。
到此为止,RPC项目中最重要的几个功能我们其实就都完成了。
接下来的注册中心模块其实就只是一个简单的锦上添花了。
如果不使用注册中心,那么其实直接再项目服务启动的时候,通过application.yml文件的方式对项目的ip/port进行配置即可,然后直接从固定ip/port拿到数据即可。
这篇关于【BlossomRPC】服务端与客户端请求Handler的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!