Dubbo 的集群容错模式:Forking Cluster

2024-06-14 06:08

本文主要是介绍Dubbo 的集群容错模式:Forking Cluster,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

集群容错系列文章:
Failover Cluster 失败自动切换
Failfast Cluster 快速失败,抛出异常
Failsafe Cluster 快速失败,不抛出异常
Failback Cluster 失败后定时重试
Forking Cluster 并行调用多个实例,只要一个成功就返回
Broadcast Cluster 广播调用所有实例,有一个报错则抛出异常
Available Cluster 可用的实例
Mergeable Cluster 合并结果

本文简单介绍 Dubbo 中的 Forking Cluster(并行调用多个服务器,只要一个成功就返回)。

简介

并行调用多个实例,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。通过 timeout=”1000” 来设置调用超时时间。

如何使用

<dubbo:service cluster="forking" forks="2" timeout="1000" />

<dubbo:reference cluster="forking" forks="2" timeout="1000" />

实现逻辑

  1. 计算目前需要的并发数,通过负载均衡算法选中被调用实例列表
  2. 并发地调用实例列表,并将处理结果成功的放到阻塞队列中
  3. 获取处理结果队列中的第一个结果,判断是否是异常,是异常则抛出,不是异常则返回结果

源代码

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {/*** Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}* which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.*/// 使用 Cached 线程池private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));public ForkingClusterInvoker(Directory<T> directory) {super(directory);}@Override@SuppressWarnings({"unchecked", "rawtypes"})public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {try {checkInvokers(invokers, invocation);final List<Invoker<T>> selected;// 并行数量,默认是 2final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);// 调用超时, 默认是 1sfinal int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);if (forks <= 0 || forks >= invokers.size()) {// 当并行数配置超过调用实例数量,则默认为调用实例数selected = invokers;} else {selected = new ArrayList<Invoker<T>>();for (int i = 0; i < forks; i++) {// TODO. Add some comment here, refer chinese version for more details.// 通过负载均衡算法选中实例Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);if (!selected.contains(invoker)) {//Avoid add the same invoker several times.// 添加到选中的实例列表中selected.add(invoker);}}}// 将选中的实例添加到上下文中RpcContext.getContext().setInvokers((List) selected);// 记录调用异常的次数final AtomicInteger count = new AtomicInteger();// 使用阻塞队列来存放调用实例的执行结果final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();// 遍历所有被选中的调用者,一个子线程执行一个调用实例for (final Invoker<T> invoker : selected) {executor.execute(new Runnable() {@Overridepublic void run() {try {// 执行调用实例Result result = invoker.invoke(invocation);// 将结果放到阻塞队列中ref.offer(result);} catch (Throwable e) {// 记录调用异常的次数int value = count.incrementAndGet();if (value >= selected.size()) {// 根据队列先入先出原则,因为该算法是只要有一个调用成功就好,// 所以只有当全部的调用实例都失败,才记录到队列中,因为下面取结果会判断是不是异常ref.offer(e);}}}});}try {// 取出结果Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);// 没有一个实例执行成功,抛出异常if (ret instanceof Throwable) {Throwable e = (Throwable) ret;throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);}return (Result) ret;} catch (InterruptedException e) {throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);}} finally {// 清除上下文信息// clear attachments which is binding to current thread.RpcContext.getContext().clearAttachments();}}
}


做个有梦想的程序猿
个人公众号

这篇关于Dubbo 的集群容错模式:Forking Cluster的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1059593

相关文章

Go语言实现桥接模式

《Go语言实现桥接模式》桥接模式是一种结构型设计模式,它将抽象部分与实现部分分离,使它们可以独立地变化,本文就来介绍一下了Go语言实现桥接模式,感兴趣的可以了解一下... 目录简介核心概念为什么使用桥接模式?应用场景案例分析步骤一:定义实现接口步骤二:创建具体实现类步骤三:定义抽象类步骤四:创建扩展抽象类步

MySQL MHA集群详解(数据库高可用)

《MySQLMHA集群详解(数据库高可用)》MHA(MasterHighAvailability)是开源MySQL高可用管理工具,用于自动故障检测与转移,支持异步或半同步复制的MySQL主从架构,本... 目录mysql 高可用方案:MHA 详解与实战1. MHA 简介2. MHA 的组件组成(1)MHA

python调用dubbo接口的实现步骤

《python调用dubbo接口的实现步骤》本文主要介绍了python调用dubbo接口的实现步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编... 目录 ​​其他实现方式与注意事项​​ ​​高级技巧与集成​​用 python 提供 Dubbo 接口

C++中的解释器模式实例详解

《C++中的解释器模式实例详解》这篇文章总结了C++标准库中的算法分类,还介绍了sort和stable_sort的区别,以及remove和erase的结合使用,结合实例代码给大家介绍的非常详细,感兴趣... 目录1、非修改序列算法1.1 find 和 find_if1.2 count 和 count_if1

Redis中群集三种模式的实现

《Redis中群集三种模式的实现》Redis群集有三种模式,分别是主从同步/复制、哨兵模式、Cluster,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录1. Redis三种模式概述2、Redis 主从复制2.1 主从复制的作用2.2 主从复制流程2

golang实现nacos获取配置和服务注册-支持集群详解

《golang实现nacos获取配置和服务注册-支持集群详解》文章介绍了如何在Go语言中使用Nacos获取配置和服务注册,支持集群初始化,客户端结构体中的IpAddresses可以配置多个地址,新客户... 目录golang nacos获取配置和服务注册-支持集群初始化客户端可选参数配置new一个客户端 支

MySQL集群高可用架构的两种使用小结

《MySQL集群高可用架构的两种使用小结》本文介绍了MySQL的两种高可用解决方案:组复制(MGR)和MasterHighAvailability(MHA),文中通过示例代码介绍的非常详细,对大家的学... 目录一、mysql高可用之组复制(MGR)1.1 组复制核心特性与优势1.2 组复制架构原理1.3

深入理解MySQL流模式

《深入理解MySQL流模式》MySQL的Binlog流模式是一种实时读取二进制日志的技术,允许下游系统几乎无延迟地获取数据库变更事件,适用于需要极低延迟复制的场景,感兴趣的可以了解一下... 目录核心概念一句话总结1. 背景知识:什么是 Binlog?2. 传统方式 vs. 流模式传统文件方式 (非流式)流

Docker + Redis 部署集群的实现步骤

《Docker+Redis部署集群的实现步骤》本文详细介绍了在三台服务器上部署高可用Redis集群的完整流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录一、环境准备1. 服务器规划(3 台服务器)2. 防火墙配置(三台服务器均执行)3. 安装 docke

springBoot (springCloud2025)集成redisCluster 集群的操作方法

《springBoot(springCloud2025)集成redisCluster集群的操作方法》文章介绍了如何使用SpringBoot集成RedisCluster集群,并详细说明了pom.xm... 目录pom.XMLapplication.yamlcluster配置类其他配置类连接池配置类Redis