dubbo 服务消费原理分析之引用服务配置

2024-09-07 06:28

本文主要是介绍dubbo 服务消费原理分析之引用服务配置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、服务监听ContextRefreshedEvent
    • 1、AbstractApplicationContext.refresh
    • 2、AbstractApplicationContext.finishRefresh
    • 3、DubboDeployApplicationListener.onApplicationEvent
    • 4、DefaultModuleDeployer .referServices
    • 5、SimpleReferenceCache.get
  • 二、引用服务 ReferenceConfig
    • 1、时序图
    • 2、ReferenceConfig.get
    • 3、ReferenceConfig.init
    • 4、ReferenceConfig.createProxy
    • 5、ReferenceConfig.createInvokerForRemote
  • 三、注册协议 RegistryProtocol
    • 1、RegistryProtocol.refer
    • 2、RegistryProtocol.doRefer
    • 3、RegistryProtocol.interceptInvoker


前言

文章基于3.1.0版本进行分析

		<dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>3.1.0</version></dependency>

一、服务监听ContextRefreshedEvent

在springboot 中,refresh 是 AbstractApplicationContext 中的一个方法,负责初始化 ApplicationContext 容器,容器必须调用 refresh 才能正常工作是进行,refresh的最后处理了 finishRefresh 方法,改方法会广播一个ContextRefreshedEvent容器刷新完成事件,所有监听了该事件的bean都会去执行相关逻辑处理。

1、AbstractApplicationContext.refresh

public void refresh() throws BeansException, IllegalStateException {synchronized(this.startupShutdownMonitor) {// 省略无关代码***// 初始化生命周期处理器,调用生命周期处理器onRefresh方法,发布ContextRefreshedEvent事件,JMX相关处理this.finishRefresh();// 省略无关代码***}}

2、AbstractApplicationContext.finishRefresh

	protected void finishRefresh() {// 清除上下文资源缓存(如扫描中的ASM元数据) scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// 广播刷新完成事件publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.if (!NativeDetector.inNativeImage()) {LiveBeansView.registerApplicationContext(this);}}

3、DubboDeployApplicationListener.onApplicationEvent

dubbo很好的结合了spring的这一个拓展点,在这个拓展点开始实现服务的发布。可以看到,DubboDeployApplicationListener实现了ContextRefreshedEvent的消息监听

public class DubboDeployApplicationListener implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, Ordered {private static final Logger logger = LoggerFactory.getLogger(DubboDeployApplicationListener.class);private ApplicationContext applicationContext;private ApplicationModel applicationModel;private ModuleModel moduleModel;@Overridepublic void onApplicationEvent(ApplicationContextEvent event) {if (nullSafeEquals(applicationContext, event.getSource())) {if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}}private void onContextRefreshedEvent(ContextRefreshedEvent event) {// 获取配置的deployer 进行发布,默认是 DefaultModuleDeployer ModuleDeployer deployer = moduleModel.getDeployer();Assert.notNull(deployer, "Module deployer is null");// start moduleFuture future = deployer.start();// if the module does not start in background, await finishif (!deployer.isBackground()) {try {// 等待发布结束future.get();} catch (InterruptedException e) {logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());} catch (Exception e) {logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);}}}}

4、DefaultModuleDeployer .referServices

DefaultModuleDeployer 中,真正核心的是ReferenceConfig,ReferenceConfig才是去实际发布的动作

public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {// DefaultApplicationDeployerprivate ApplicationDeployer applicationDeployer;@Overridepublic synchronized Future start() throws IllegalStateException {...// 不管是 DefaultApplicationDeployer 还是DefaultModuleDeployer的initialize方法,都是处理相关配置文件// 其功能等价于监听器 DubboConfigApplicationListenerapplicationDeployer.initialize();initialize();// 真正触发 服务注册功能exportServices();// prepare application instance// exclude internal module to avoid wait itselfif (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {applicationDeployer.prepareInternalModule();}// 服务消费// refer servicesreferServices();...return startFuture;}private void referServices() {configManager.getReferences().forEach(rc -> {try {ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;// 刷新配置if (!referenceConfig.isRefreshed()) {referenceConfig.refresh();}// 如果还没初始化if (rc.shouldInit()) {// 是否异步注入if (referAsync || rc.shouldReferAsync()) {ExecutorService executor = executorRepository.getServiceReferExecutor();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {referenceCache.get(rc);} catch (Throwable t) {logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);}}, executor);asyncReferringFutures.add(future);} else {// 查询缓存中是否存在代理对象 对应的实现类SimpleReferenceCachereferenceCache.get(rc);}}} catch (Throwable t) {logger.error(getIdentifier() + " refer catch error.");referenceCache.destroy(rc);throw t;}});}
}

5、SimpleReferenceCache.get

SimpleReferenceCache,一个用于缓存引用ReferenceConfigBase的util工具类。
ReferenceConfigBase是一个重对象,对于频繁创建ReferenceConfigBase的框架来说,有必要缓存这些对象。
如果需要使用复杂的策略,可以实现并使用自己的ReferenceConfigBase缓存
这个Cache是引用服务的开始如果我们想在代码中自定义一些服务引用的逻辑,可以直接创建SimpleReferenceCache类型对象然后调用其get方法进行引用服务。

	public <T> T get(ReferenceConfigBase<T> rc) {String key = generator.generateKey(rc);// 服务类型 如果是泛化调用则这个类型为GenericServiceClass<?> type = rc.getInterfaceClass();boolean singleton = rc.getSingleton() == null || rc.getSingleton();T proxy = null;// Check existing proxy of the same 'key' and 'type' first.// 单例if (singleton) {// 缓存数据找proxy = get(key, (Class<T>) type);} else {logger.warn("Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");}// 不存在消费的代理对象,创建rc.get(),最后走到ReferenceConfig.get()if (proxy == null) {// 获取或者创建值,为引用类型referencesOfType对象(类型为Map<Class<?>, List<ReferenceConfigBase<?>>>)缓存对象生成值(值不存咋时候会生成一个)List<ReferenceConfigBase<?>> referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>()));// 每次走到这里都会添加一个ReferenceConfigBase 引用配置对象(单例的从缓存中拿到就可以直接返回了)referencesOfType.add(rc);// 与前面一样 前面是类型映射,这里是key映射List<ReferenceConfigBase<?>> referenceConfigList = referenceKeyMap.computeIfAbsent(key, _k -> Collections.synchronizedList(new ArrayList<>()));referenceConfigList.add(rc);// 开始引用服务proxy = rc.get();}return proxy;}

二、引用服务 ReferenceConfig

服务发现和引用就是从这里开始的

1、时序图

2、ReferenceConfig.get

获取引用的代理对象

	public T get() {if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}if (ref == null) {// ensure start module, compatible with old api usage// 如果使用方直接调用了ReferenceConfigBase的get方法或者缓存对象SimpleReferenceCache类型的对象的get方法来引用服务端的时候就会造成很多配置没有初始化// 这个代码其实就是启动模块进行一些基础配置的初始化操作,比如元数据中心默认配置选择,注册中心默认配置选择这些都是比较重要的getScopeModel().getDeployer().start();synchronized (this) {if (ref == null) {init();}}}return ref;}

主要包括

  • checkAndUpdateSubConfigs() – 检查并更新缺省配置
  • init() – 消费者服务的初始化,核心逻辑。

3、ReferenceConfig.init

初始化代理对象

    protected synchronized void init() {// 初始化标记变量保证只初始化一次,这里又是加锁又是加标记变量的if (initialized && ref != null) {return;}try {// 刷新配置if (!this.isRefreshed()) {this.refresh();}// init serviceMetadata// 初始化元数据信息 如版本号,分组,服务接口名initServiceMetadata(consumer);// //继续初始化元数据信息 服务接口类型和keyserviceMetadata.setServiceType(getServiceInterfaceClass());// TODO, uncomment this line once service key is unifiedserviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));// 参数配置转成MAPMap<String, String> referenceParameters = appendConfig();// init service-application mapping// 初始化路径,参数转化成url,dubbo主要从url上面读取参数initServiceAppsMapping(referenceParameters);// 获取Module级别的服务存储仓库,其内部保存着服务提供者和服务消费者的缓存ModuleServiceRepository repository = getScopeModel().getServiceRepository();ServiceDescriptor serviceDescriptor;if (CommonConstants.NATIVE_STUB.equals(getProxy())) {serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);repository.registerService(serviceDescriptor);} else {serviceDescriptor = repository.registerService(interfaceClass);}// 创建消费者模型对象consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor,getScopeModel(), serviceMetadata, createAsyncMethodInfo(), interfaceClassLoader);// Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.consumerModel.setConfig(this);repository.registerConsumer(consumerModel);serviceMetadata.getAttachments().putAll(referenceParameters);// 创建引用的代理对象 核心代码ref = createProxy(referenceParameters);serviceMetadata.setTarget(ref);serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);consumerModel.setDestroyCaller(getDestroyRunner());consumerModel.setProxyObject(ref);consumerModel.initMethodModels();checkInvokerAvailable();} catch (Throwable t) {// *** 省略部分代码throw t;}initialized = true;}

主要流程

  • 服务引用前初始化serviceMetadata服务元数据
  • 获取服务仓库ModuleServiceRepository,并注册service 和 consumer
  • 调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象,并赋值给ref,核心逻辑

4、ReferenceConfig.createProxy

创建代理对象,实际调用需要通过这个代理对象进行调用

	private T createProxy(Map<String, String> referenceParameters) {// 本地引用if (shouldJvmRefer(referenceParameters)) {createInvokerForLocal(referenceParameters);} else {urls.clear();meshModeHandleUrl(referenceParameters);if (StringUtils.isNotEmpty(url)) {// user specified URL, could be peer-to-peer address, or register center's address.// url参数不为空,且可以配置多个。可以是点对点,也可以是注册中心,然后添加到urlsparseUrl(referenceParameters);} else {// if protocols not in jvm checkRegistryif (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {// 从注册表中获取URL并将其聚合。这个其实就是初始化一下注册中心的url配置aggregateUrlFromRegistry(referenceParameters);}}// 创建远程引用,创建远程引用调用器,承担服务调用的核心逻辑createInvokerForRemote();}if (logger.isInfoEnabled()) {logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +(Boolean.parseBoolean(referenceParameters.get(GENERIC_KEY)) ?" it's GenericService reference" : " it's not GenericService reference"));}URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,referenceParameters.get(INTERFACE_KEY), referenceParameters);consumerUrl = consumerUrl.setScopeModel(getScopeModel());consumerUrl = consumerUrl.setServiceModel(consumerModel);MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());// create service proxy// 构建一个代理对象,代理客户端的请求return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));}

5、ReferenceConfig.createInvokerForRemote

远程引用或者直连引用情况下,将会调用该方法,创建远程引用Invoker。对于一个注册中心url和多个注册中心url的处理是不一样的,一个注册中心对应一个invoker,最后封装到集群路由invoker

	private void createInvokerForRemote() {// 只有一个,秩序创建一个invokerif (urls.size() == 1) {URL curUrl = urls.get(0);// 自适应扩展类,这里和服务发布类似,先进入了RegistryProtocol.referinvoker = protocolSPI.refer(interfaceClass, curUrl);if (!UrlUtils.isRegistry(curUrl)) {List<Invoker<?>> invokers = new ArrayList<>();invokers.add(invoker);invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);}} else {List<Invoker<?>> invokers = new ArrayList<>();URL registryUrl = null;// 创建多个invokerfor (URL url : urls) {// For multi-registry scenarios, it is not checked whether each referInvoker is available.// Because this invoker may become available later.// invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl = url;}}// 创建一个集群路由invokerif (registryUrl != null) {// registry url is available// for multi-subscription scenario, use 'zone-aware' policy by defaultString cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker// (RegistryDirectory, routing happens here) -> Invokerinvoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);} else {// not a registry url, must be direct invoke.// 直连if (CollectionUtils.isEmpty(invokers)) {throw new IllegalArgumentException("invokers == null");}URL curUrl = invokers.get(0).getUrl();String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);}}}

服务发布的时候讲到 protocolSPI,protocolSPI.refer形成的调用链为
-Protocol$Adaptie -> ProtocolSerializationWrapper -> ProtocolFilterWrapper -> QosProtocolWrapper -> ProtocolListenerWrapper -> RegistryProtocol -> RegistryProtocol

三、注册协议 RegistryProtocol

应用级服务远程协议以service-discovery-registry开头,其对应的Protocol实现就是RegistryProtocol

1、RegistryProtocol.refer

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 组装配置中心的地址url = getRegistryUrl(url);// 获取用于操作Zookeeper的Registry类型 Registry registry = getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);String group = qs.get(GROUP_KEY);if (StringUtils.isNotEmpty(group)) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);}}// 降级容错的逻辑处理对象 类型为Cluster 实际类型为MockClusterWrapper 内部包装的是FailoverCluster// 后续调用服务失败时候会先失效转移再降级Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));// 主要干活的地方 生成引用 invokerreturn doRefer(cluster, registry, type, url, qs);}

生成配置中心的url

zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-springboot-start-consumer
&dubbo=2.0.2
&pid=8896
&qos.enable=false
&release=3.1.0
&timestamp=1725379058285

2、RegistryProtocol.doRefer

    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());consumerAttribute.remove(REFER_KEY);String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);// 消费者url信息URL consumerUrl = new ServiceConfigURL (p,null,null,parameters.get(REGISTER_IP_KEY),0, getPath(parameters, type),parameters,consumerAttribute);url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);// 带迁移性质的Invoker对象ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);// 执行迁移规则创建应用级优先的服务发现Invoker对象return interceptInvoker(migrationInvoker, url, consumerUrl);}

consumerUrl如下

consumer://192.168.0.101/org.sjl.dubbo.AsyncProvider?application=dubbo-springboot-start-consumer
&background=false
&dubbo=2.0.2
&interface=org.sjl.dubbo.AsyncProvider
&methods=sayHiAsync,sayHello,sayHelloAsync&pid=33100
&qos.enable=false
&register.ip=192.168.0.101
&release=3.1.0
&side=consumer
&sticky=false
&timeout=30000
&timestamp=1725379319215

3、RegistryProtocol.interceptInvoker

    protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {// 获取激活的注册协议监听器扩展里面registry.protocol.listener,这里激活的类型为MigrationRuleListenerList<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);if (CollectionUtils.isEmpty(listeners)) {return invoker;}for (RegistryProtocolListener listener : listeners) {// MigrationRuleListener// 迁移规则应用级引用listener.onRefer(this, invoker, consumerUrl, url);}return invoker;}

接下来就进入了迁移规则的应用级服务发现了,参考 dubbo 服务消费原理分析之应用级服务发现

这篇关于dubbo 服务消费原理分析之引用服务配置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1144310

相关文章

Zookeeper安装和配置说明

一、Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。 ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境; ■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例; ■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现

CentOS7安装配置mysql5.7 tar免安装版

一、CentOS7.4系统自带mariadb # 查看系统自带的Mariadb[root@localhost~]# rpm -qa|grep mariadbmariadb-libs-5.5.44-2.el7.centos.x86_64# 卸载系统自带的Mariadb[root@localhost ~]# rpm -e --nodeps mariadb-libs-5.5.44-2.el7

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

NameNode内存生产配置

Hadoop2.x 系列,配置 NameNode 内存 NameNode 内存默认 2000m ,如果服务器内存 4G , NameNode 内存可以配置 3g 。在 hadoop-env.sh 文件中配置如下。 HADOOP_NAMENODE_OPTS=-Xmx3072m Hadoop3.x 系列,配置 Nam

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

wolfSSL参数设置或配置项解释

1. wolfCrypt Only 解释:wolfCrypt是一个开源的、轻量级的、可移植的加密库,支持多种加密算法和协议。选择“wolfCrypt Only”意味着系统或应用将仅使用wolfCrypt库进行加密操作,而不依赖其他加密库。 2. DTLS Support 解释:DTLS(Datagram Transport Layer Security)是一种基于UDP的安全协议,提供类似于

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、