Dubbo 的集群容错模式:Forking Cluster

2024-06-14 06:08

本文主要是介绍Dubbo 的集群容错模式:Forking Cluster,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

集群容错系列文章:
Failover Cluster 失败自动切换
Failfast Cluster 快速失败,抛出异常
Failsafe Cluster 快速失败,不抛出异常
Failback Cluster 失败后定时重试
Forking Cluster 并行调用多个实例,只要一个成功就返回
Broadcast Cluster 广播调用所有实例,有一个报错则抛出异常
Available Cluster 可用的实例
Mergeable Cluster 合并结果

本文简单介绍 Dubbo 中的 Forking Cluster(并行调用多个服务器,只要一个成功就返回)。

简介

并行调用多个实例,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。通过 timeout=”1000” 来设置调用超时时间。

如何使用

<dubbo:service cluster="forking" forks="2" timeout="1000" />

<dubbo:reference cluster="forking" forks="2" timeout="1000" />

实现逻辑

  1. 计算目前需要的并发数,通过负载均衡算法选中被调用实例列表
  2. 并发地调用实例列表,并将处理结果成功的放到阻塞队列中
  3. 获取处理结果队列中的第一个结果,判断是否是异常,是异常则抛出,不是异常则返回结果

源代码

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {/*** Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}* which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.*/// 使用 Cached 线程池private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));public ForkingClusterInvoker(Directory<T> directory) {super(directory);}@Override@SuppressWarnings({"unchecked", "rawtypes"})public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {try {checkInvokers(invokers, invocation);final List<Invoker<T>> selected;// 并行数量,默认是 2final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);// 调用超时, 默认是 1sfinal int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);if (forks <= 0 || forks >= invokers.size()) {// 当并行数配置超过调用实例数量,则默认为调用实例数selected = invokers;} else {selected = new ArrayList<Invoker<T>>();for (int i = 0; i < forks; i++) {// TODO. Add some comment here, refer chinese version for more details.// 通过负载均衡算法选中实例Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);if (!selected.contains(invoker)) {//Avoid add the same invoker several times.// 添加到选中的实例列表中selected.add(invoker);}}}// 将选中的实例添加到上下文中RpcContext.getContext().setInvokers((List) selected);// 记录调用异常的次数final AtomicInteger count = new AtomicInteger();// 使用阻塞队列来存放调用实例的执行结果final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();// 遍历所有被选中的调用者,一个子线程执行一个调用实例for (final Invoker<T> invoker : selected) {executor.execute(new Runnable() {@Overridepublic void run() {try {// 执行调用实例Result result = invoker.invoke(invocation);// 将结果放到阻塞队列中ref.offer(result);} catch (Throwable e) {// 记录调用异常的次数int value = count.incrementAndGet();if (value >= selected.size()) {// 根据队列先入先出原则,因为该算法是只要有一个调用成功就好,// 所以只有当全部的调用实例都失败,才记录到队列中,因为下面取结果会判断是不是异常ref.offer(e);}}}});}try {// 取出结果Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);// 没有一个实例执行成功,抛出异常if (ret instanceof Throwable) {Throwable e = (Throwable) ret;throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);}return (Result) ret;} catch (InterruptedException e) {throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);}} finally {// 清除上下文信息// clear attachments which is binding to current thread.RpcContext.getContext().clearAttachments();}}
}


做个有梦想的程序猿
个人公众号

这篇关于Dubbo 的集群容错模式:Forking Cluster的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1059593

相关文章

centos7基于keepalived+nginx部署k8s1.26.0高可用集群

《centos7基于keepalived+nginx部署k8s1.26.0高可用集群》Kubernetes是一个开源的容器编排平台,用于自动化地部署、扩展和管理容器化应用程序,在生产环境中,为了确保集... 目录一、初始化(所有节点都执行)二、安装containerd(所有节点都执行)三、安装docker-

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

Nacos集群数据同步方式

《Nacos集群数据同步方式》文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用... 目录引言负责节点(发起同步)DistroProtocolDistroSyncChangeTask获取同步数据getDis

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

模版方法模式template method

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/template-method 超类中定义了一个算法的框架, 允许子类在不修改结构的情况下重写算法的特定步骤。 上层接口有默认实现的方法和子类需要自己实现的方法

【iOS】MVC模式

MVC模式 MVC模式MVC模式demo MVC模式 MVC模式全称为model(模型)view(视图)controller(控制器),他分为三个不同的层分别负责不同的职责。 View:该层用于存放视图,该层中我们可以对页面及控件进行布局。Model:模型一般都拥有很好的可复用性,在该层中,我们可以统一管理一些数据。Controlller:该层充当一个CPU的功能,即该应用程序