【BlossomRPC】服务端与客户端请求Handler

2024-03-31 02:20

本文主要是介绍【BlossomRPC】服务端与客户端请求Handler,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 客户端Handler
  • 服务端Handler

RPC项目

配置中心项目

网关项目

客户端Handler

承接上文,客户端的Handler其实就比较简单了,因为客户端作为接收数据的时候,我们只需要从上文提到的Cache中通过reqId的方式拿到Future/Promise对象,然后设置他们的值,就可以马上进行返回。
客户端代码如下:

import blossom.project.rpc.core.entity.RpcDto;
import blossom.project.rpc.core.entity.RpcCache;
import blossom.project.rpc.core.entity.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;import java.util.Objects;/*** @author: ZhangBlossom* @date: 2023/12/17 02:43* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* NettyRpcServerHandler类** 备忘录* 有点烧脑 分析一下这个类怎么用 先睡了* 1.1:当前类是客户端接收到服务器的response了* 1.2:如果没有报错,那么我就要从我的cache中拿到* 我特定reqId对应的promise* 1.3:设定promise的值* 1.3.1:promise一旦被设定,promise.get()的阻塞马上就会结束* 1.3.2:也就是我成功拿到了Server的响应值* 1.3.3:那么Client的这次调用就是成功的* 1.3.4:否则失败* 1.4:删除promise再缓存中的reqId* 1.5:这里如果对future/promise进行设置值之后,代理应该马上返回* 1.6:用promise的setXxx类型方法比较合适**/
@Slf4j
public class NettyRpcClientHandler extends SimpleChannelInboundHandler<RpcDto<RpcResponse>> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcDto<RpcResponse> msg) throws Exception {if (Objects.isNull(msg)){log.info("the RpcDto<Response> is Null,return...");return;}log.info("receive the Rpc Server Data, msg is: {}",msg);long reqId = msg.getHeader().getReqId();//TODO 得到并且删除 考虑一下DefaultPromise是否需要封装DefaultPromise defaultPromise = RpcCache.RESPONSE_CACHE.remove(reqId);defaultPromise.setSuccess(msg.getData());}
}

这里比较重要的是了解一下Future和Promise的特性。
Netty中的异步模型广泛地使用了Future和Promise来处理异步操作。以下是它们的核心作用:

  1. Future:
    ○ Future代表了一个可能还没有完成的异步操作的结果。
    ○ 在Netty中,当你执行一个异步操作(如发送数据),你会得到一个Future对象。这个对象可以用来在未来某个时刻获取操作的结果。
  2. Promise:
    ○ Promise是Future的一个子接口,它不仅代表了异步操作的结果,还可以被操作的执行者显式地标记为成功或失败。
    ○ 在Netty中,Promise用于在操作完成时设置操作的结果(成功或失败)。这是一个写入结果的Future。
    这里使用Promise会更加方便。
    因为Promise提供了setXxx类型方法,这个方法确保一旦被设置值,get/exception就会马上进行返回从而结束阻塞。因此Promise类型非常适合我们当前的场景,同时,还有一个点,就是因为,Netty那边的返回值。
    我们知道Netty使用的是异步处理。
    当我们发送一个请求的时候,我们会拿到一个返回值如下:
ChannelFuture sendFuture = future.channel().writeAndFlush(requestRpcDto);
//继承了Future
public interface ChannelFuture extends Future<Void> 

对于ChannelFuture的处理,有非常非常多种的处理方法。
监听器,sync同步处理,await/get异步处理等。
这里由于我们希望是能显式的拿到客户端请求的返回值,同时减少阻塞等待。
我们不使用原生的方法,也就是我们只是用Netty发送完毕请求,而请求返回值最后的处理,我们通过对上面Cache的处理来进行。
我们只要确保,Cache中对于一个reqId,唯一对应一个Future/Promise对象即可。
然后再客户端拿到数据的时候,通过reqId对Promise进行设置值即可。
这样子就能结束Promise的get方法的阻塞等待。
参考思路和Promise的测试代码如下:

package blossom.project.rpc.core.entity;import io.netty.channel.DefaultEventLoop;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.concurrent.ExecutionException;/*** @author: ZhangBlossom* @date: 2023/12/16 23:39* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* RpcPromise类* Promise用来异步处理* 1:Future代表了一个可能还没有完成的异步操作的结果。* 2:在Netty中,当你执行一个异步操作(如发送数据),* 会得到一个Future对象。这个对象可以用来在未来某个时刻获取操作的结果。* 3:Promise是Future的一个子接口,它不仅代表了异步操作的结果,* 还可以被操作的执行者显式地标记为成功或失败。* 4:在Netty中,Promise用于在操作完成时设置操作的结果(成功或失败)。* 这是一个写入结果的Future。***/
@Data
@NoArgsConstructor
public class RpcPromise<T>  extends DefaultPromise<T>
{//private Promise<T> promise;////public RpcPromise(Promise<T> promise) {//    this.promise = promise;//}/*** 思考一下* 1: 我的代码在这里是异步处理的返回结果* 2: 什么时候这个返回结果可以被设置值?* 3: 应该就是在我client接收到server的返回值的时候* 4: 也就是说我可以再clienthandler里面添加一个对promise的处理* 5: 也就是说我得有一个cache一样的东西能缓存我的promise* 6: 然后再client得到数据的时候去设置promise的值* 7: 不论成功失败都如main函数里面一样操作就行* 8: promise应该也要被server去使用* 9: cache应该是map结构* @param args*/public static void main(String[] args) throws ExecutionException, InterruptedException {//1:使用Promise作为属性//RpcPromise<RpcResponse> promise1=new RpcPromise<>//        (new DefaultPromise<RpcResponse>//                (new DefaultEventLoop()));//promise1.promise.setSuccess(new RpcResponse());//promise1.setSuccess(new RpcResponse());//第二种方式 直接用原生defaultpromiseRpcPromise promise = new RpcPromise();promise.setSuccess("success");promise.get();}
}

到此为止,我想我们就已经顺利的完成了对于客户端的代理请求的处理。
总结一下完整流程:
NettyRpcClientHandler 设置 Promise 的状态

  1. 接收响应: 当RPC响应从服务器端返回时,Netty通过我设置的pipeline中的RpcDecoder解码这个响应,并将其传递到RpcClientHandler。
  2. 处理响应: 在RpcClientHandler的channelRead0方法中,代码处理接收到的响应。这个方法首先从响应消息中提取出请求ID。
  3. 查找对应的 Promise: 使用这个请求ID,RpcClientHandler从CACHE中查找之前存储的与该请求ID对应的Promise。
  4. 设置Promise的状态: 一旦找到相应的RpcFuture,RpcClientHandler就会调promise.setSuccess(msg.getData()),将Promise的状态设置为成功,并附上从响应中获取的数据。如果在处理响应的过程中发生了错误,也可能会调用setFailure方法来标记Promise为失败,并传递错误信息(懒得失败了)。

服务端Handler

服务端的Handler要做的事情也很简单,其实就是拿到请求数据之后,通过反射的方式去调用我们本地的方法即可。
这里按照我之前的思路,先编写一个RPC服务方法的缓存,缓存所有的服务信息,然后到时候服务端接收到客户端请求的时候,先从缓存中判断是否存在有方法可以被调用。

package blossom.project.rpc.core.proxy.spring.rpcmethod;import blossom.project.rpc.core.entity.RpcRequest;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;/*** @author: ZhangBlossom* @date: 2023/12/18 22:10* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* RpcServiceMethodCache类*/
public class RpcServiceMethodCache {/*** rpc方法cache* 规则:使用 class.getClass()+"."+methodName的方式保存方法路径*/public static Map<String, RpcServiceMethod> METHOD_CACHE =new ConcurrentHashMap<>();/*** 使用饿汉式单例*/private static RpcServiceMethodCache INSTANCE = new RpcServiceMethodCache();private RpcServiceMethodCache(){}public static RpcServiceMethodCache getInstance(){return INSTANCE;}/*** 当前方法用于调用rpcmethod* 这里的invoke方法最终目的就是真正的去调用client发送过来的rpc请求,* 从cache里面拿到那些有注解的rpc方法即可* @param request* @return*/public Object rpcMethodInvoke(RpcRequest request){String key=request.getClassName()+"."+request.getMethodName();RpcServiceMethod rpcServiceMethod= METHOD_CACHE.get(key);if(Objects.isNull(rpcServiceMethod)){return null;}Object methodPath =rpcServiceMethod.getMethodPath();Method method=rpcServiceMethod.getMethod();try {return method.invoke(methodPath,request.getParams());} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}

然后接下来我们就要考虑使用哪几种反射方法来进行方法的调用了。
很容易想到的三种方法。

  1. jdk动态代理
  2. cglib动态代理
  3. spring容器获取bean反射代理
    再源码中,我对这三种方法都进行了实现。不过最简单的肯定还是用反射了,简单量小。
package blossom.project.rpc.core.netty.handler;import blossom.project.rpc.common.enums.ReqTypeEnum;
import blossom.project.rpc.core.entity.RpcDto;
import blossom.project.rpc.core.entity.RpcHeader;
import blossom.project.rpc.core.entity.RpcRequest;
import blossom.project.rpc.core.entity.RpcResponse;
import blossom.project.rpc.core.proxy.spring.rpcmethod.RpcServiceMethodCache;
import blossom.project.rpc.core.proxy.spring.server.SpringRpcProxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** @author: ZhangBlossom* @date: 2023/12/16 19:43* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* NettyRpcServerHandler类* 1:服务端接收到请求数据之后,需要进行解析* 2:解析后确定具体要调用的请求服务是哪一个* 2.1:这里应该要用到动态代理了* 2.2:分析使用那种动态代理 JDK/CGLIB/SpringIoC* 2.3:分析这三种方法的代码实现* 1:对于JDK直接用正常的反射* 2:对于CGLIB那么就是走CGLIB的常规写法* 3:对于Spring就要考虑把这些类存到容器中,* 然后要使用的时候从容器中进行获取*/
public class NettyRpcServerHandler extends SimpleChannelInboundHandler<RpcDto<RpcRequest>> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcDto<RpcRequest> msg) throws Exception {RpcHeader header = msg.getHeader();//当前是响应数据header.setReqType(ReqTypeEnum.RESPONSE.getCode());//使用反射的方式在运行时调用对应的类的方法//这里你可以思考一下用什么方式可以最快的找到我想要的类并且调用方法//目前我提供了:JDK CGLIB SpringIOC容器 HashMap自制工厂//Object data = SpringRpcProxy.invoke(msg.getData());//使用JDK动态代理//Object data = RpcInvocationHandler.invoke(msg.getData());//使用CGLIB动态代理//Object data = RpcCglibProxy.invoke(msg.getData());//使用封装好的rpc对象去发送请求Object data = RpcServiceMethodCache.getInstance().rpcMethodInvoke(msg.getData());RpcDto<RpcResponse> dto = new RpcDto();RpcResponse response = new RpcResponse();response.setData(data);response.setMsg("success!!!");dto.setData(response);dto.setHeader(header);//写出数据ctx.writeAndFlush(dto);}}

至此,我们就完成了服务端接收到请求并反射调用本地方法之后得到返回数据,并将返回数据返回给客户端的代码。

到此为止,RPC项目中最重要的几个功能我们其实就都完成了。
接下来的注册中心模块其实就只是一个简单的锦上添花了。
如果不使用注册中心,那么其实直接再项目服务启动的时候,通过application.yml文件的方式对项目的ip/port进行配置即可,然后直接从固定ip/port拿到数据即可。

这篇关于【BlossomRPC】服务端与客户端请求Handler的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/863175

相关文章

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

Python手搓邮件发送客户端

《Python手搓邮件发送客户端》这篇文章主要为大家详细介绍了如何使用Python手搓邮件发送客户端,支持发送邮件,附件,定时发送以及个性化邮件正文,感兴趣的可以了解下... 目录1. 简介2.主要功能2.1.邮件发送功能2.2.个性签名功能2.3.定时发送功能2. 4.附件管理2.5.配置加载功能2.6.

Java后端接口中提取请求头中的Cookie和Token的方法

《Java后端接口中提取请求头中的Cookie和Token的方法》在现代Web开发中,HTTP请求头(Header)是客户端与服务器之间传递信息的重要方式之一,本文将详细介绍如何在Java后端(以Sp... 目录引言1. 背景1.1 什么是 HTTP 请求头?1.2 为什么需要提取请求头?2. 使用 Spr

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

SpringBoot实现websocket服务端及客户端的详细过程

《SpringBoot实现websocket服务端及客户端的详细过程》文章介绍了WebSocket通信过程、服务端和客户端的实现,以及可能遇到的问题及解决方案,感兴趣的朋友一起看看吧... 目录一、WebSocket通信过程二、服务端实现1.pom文件添加依赖2.启用Springboot对WebSocket

SpringBoot中Get请求和POST请求接收参数示例详解

《SpringBoot中Get请求和POST请求接收参数示例详解》文章详细介绍了SpringBoot中Get请求和POST请求的参数接收方式,包括方法形参接收参数、实体类接收参数、HttpServle... 目录1、Get请求1.1 方法形参接收参数 这种方式一般适用参数比较少的情况,并且前后端参数名称必须

QT实现TCP客户端自动连接

《QT实现TCP客户端自动连接》这篇文章主要为大家详细介绍了QT中一个TCP客户端自动连接的测试模型,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录版本 1:没有取消按钮 测试效果测试代码版本 2:有取消按钮测试效果测试代码版本 1:没有取消按钮 测试效果缺陷:无法手动停

Nacos客户端本地缓存和故障转移方式

《Nacos客户端本地缓存和故障转移方式》Nacos客户端在从Server获得服务时,若出现故障,会通过ServiceInfoHolder和FailoverReactor进行故障转移,ServiceI... 目录1. ServiceInfoHolder本地缓存目录2. FailoverReactorinit

Java Websocket实例【服务端与客户端实现全双工通讯】

Java Websocket实例【服务端与客户端实现全双工通讯】 现很多网站为了实现即时通讯,所用的技术都是轮询(polling)。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发 出HTTP request,然后由服务器返回最新的数据给客服端的浏览器。这种传统的HTTP request 的模式带来很明显的缺点 – 浏 览器需要不断的向服务器发出请求,然而HTTP

JAVA用最简单的方法来构建一个高可用的服务端,提升系统可用性

一、什么是提升系统的高可用性 JAVA服务端,顾名思义就是23体验网为用户提供服务的。停工时间,就是不能向用户提供服务的时间。高可用,就是系统具有高度可用性,尽量减少停工时间。如何用最简单的方法来搭建一个高效率可用的服务端JAVA呢? 停工的原因一般有: 服务器故障。例如服务器宕机,服务器网络出现问题,机房或者机架出现问题等;访问量急剧上升,导致服务器压力过大导致访问量急剧上升的原因;时间和