Dubbo 的集群容错模式:Forking Cluster

2024-06-14 06:08

本文主要是介绍Dubbo 的集群容错模式:Forking Cluster,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

集群容错系列文章:
Failover Cluster 失败自动切换
Failfast Cluster 快速失败,抛出异常
Failsafe Cluster 快速失败,不抛出异常
Failback Cluster 失败后定时重试
Forking Cluster 并行调用多个实例,只要一个成功就返回
Broadcast Cluster 广播调用所有实例,有一个报错则抛出异常
Available Cluster 可用的实例
Mergeable Cluster 合并结果

本文简单介绍 Dubbo 中的 Forking Cluster(并行调用多个服务器,只要一个成功就返回)。

简介

并行调用多个实例,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。通过 timeout=”1000” 来设置调用超时时间。

如何使用

<dubbo:service cluster="forking" forks="2" timeout="1000" />

<dubbo:reference cluster="forking" forks="2" timeout="1000" />

实现逻辑

  1. 计算目前需要的并发数,通过负载均衡算法选中被调用实例列表
  2. 并发地调用实例列表,并将处理结果成功的放到阻塞队列中
  3. 获取处理结果队列中的第一个结果,判断是否是异常,是异常则抛出,不是异常则返回结果

源代码

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {/*** Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}* which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.*/// 使用 Cached 线程池private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));public ForkingClusterInvoker(Directory<T> directory) {super(directory);}@Override@SuppressWarnings({"unchecked", "rawtypes"})public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {try {checkInvokers(invokers, invocation);final List<Invoker<T>> selected;// 并行数量,默认是 2final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);// 调用超时, 默认是 1sfinal int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);if (forks <= 0 || forks >= invokers.size()) {// 当并行数配置超过调用实例数量,则默认为调用实例数selected = invokers;} else {selected = new ArrayList<Invoker<T>>();for (int i = 0; i < forks; i++) {// TODO. Add some comment here, refer chinese version for more details.// 通过负载均衡算法选中实例Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);if (!selected.contains(invoker)) {//Avoid add the same invoker several times.// 添加到选中的实例列表中selected.add(invoker);}}}// 将选中的实例添加到上下文中RpcContext.getContext().setInvokers((List) selected);// 记录调用异常的次数final AtomicInteger count = new AtomicInteger();// 使用阻塞队列来存放调用实例的执行结果final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();// 遍历所有被选中的调用者,一个子线程执行一个调用实例for (final Invoker<T> invoker : selected) {executor.execute(new Runnable() {@Overridepublic void run() {try {// 执行调用实例Result result = invoker.invoke(invocation);// 将结果放到阻塞队列中ref.offer(result);} catch (Throwable e) {// 记录调用异常的次数int value = count.incrementAndGet();if (value >= selected.size()) {// 根据队列先入先出原则,因为该算法是只要有一个调用成功就好,// 所以只有当全部的调用实例都失败,才记录到队列中,因为下面取结果会判断是不是异常ref.offer(e);}}}});}try {// 取出结果Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);// 没有一个实例执行成功,抛出异常if (ret instanceof Throwable) {Throwable e = (Throwable) ret;throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);}return (Result) ret;} catch (InterruptedException e) {throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);}} finally {// 清除上下文信息// clear attachments which is binding to current thread.RpcContext.getContext().clearAttachments();}}
}


做个有梦想的程序猿
个人公众号

这篇关于Dubbo 的集群容错模式:Forking Cluster的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1059593

相关文章

Spring boot整合dubbo+zookeeper的详细过程

《Springboot整合dubbo+zookeeper的详细过程》本文讲解SpringBoot整合Dubbo与Zookeeper实现API、Provider、Consumer模式,包含依赖配置、... 目录Spring boot整合dubbo+zookeeper1.创建父工程2.父工程引入依赖3.创建ap

Jenkins分布式集群配置方式

《Jenkins分布式集群配置方式》:本文主要介绍Jenkins分布式集群配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.安装jenkins2.配置集群总结Jenkins是一个开源项目,它提供了一个容易使用的持续集成系统,并且提供了大量的plugin满

Java设计模式---迭代器模式(Iterator)解读

《Java设计模式---迭代器模式(Iterator)解读》:本文主要介绍Java设计模式---迭代器模式(Iterator),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录1、迭代器(Iterator)1.1、结构1.2、常用方法1.3、本质1、解耦集合与遍历逻辑2、统一

Java 线程安全与 volatile与单例模式问题及解决方案

《Java线程安全与volatile与单例模式问题及解决方案》文章主要讲解线程安全问题的五个成因(调度随机、变量修改、非原子操作、内存可见性、指令重排序)及解决方案,强调使用volatile关键字... 目录什么是线程安全线程安全问题的产生与解决方案线程的调度是随机的多个线程对同一个变量进行修改线程的修改操

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

使用Python进行GRPC和Dubbo协议的高级测试

《使用Python进行GRPC和Dubbo协议的高级测试》GRPC(GoogleRemoteProcedureCall)是一种高性能、开源的远程过程调用(RPC)框架,Dubbo是一种高性能的分布式服... 目录01 GRPC测试安装gRPC编写.proto文件实现服务02 Dubbo测试1. 安装Dubb

SpringBoot连接Redis集群教程

《SpringBoot连接Redis集群教程》:本文主要介绍SpringBoot连接Redis集群教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 依赖2. 修改配置文件3. 创建RedisClusterConfig4. 测试总结1. 依赖 <de

Redis Cluster模式配置

《RedisCluster模式配置》:本文主要介绍RedisCluster模式配置,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录分片 一、分片的本质与核心价值二、分片实现方案对比 ‌三、分片算法详解1. ‌范围分片(顺序分片)‌2. ‌哈希分片3. ‌虚

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

Dubbo之SPI机制的实现原理和优势分析

《Dubbo之SPI机制的实现原理和优势分析》:本文主要介绍Dubbo之SPI机制的实现原理和优势,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Dubbo中SPI机制的实现原理和优势JDK 中的 SPI 机制解析Dubbo 中的 SPI 机制解析总结Dubbo中