dubbo 服务消费原理分析之引用服务配置

2024-09-07 06:28

本文主要是介绍dubbo 服务消费原理分析之引用服务配置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、服务监听ContextRefreshedEvent
    • 1、AbstractApplicationContext.refresh
    • 2、AbstractApplicationContext.finishRefresh
    • 3、DubboDeployApplicationListener.onApplicationEvent
    • 4、DefaultModuleDeployer .referServices
    • 5、SimpleReferenceCache.get
  • 二、引用服务 ReferenceConfig
    • 1、时序图
    • 2、ReferenceConfig.get
    • 3、ReferenceConfig.init
    • 4、ReferenceConfig.createProxy
    • 5、ReferenceConfig.createInvokerForRemote
  • 三、注册协议 RegistryProtocol
    • 1、RegistryProtocol.refer
    • 2、RegistryProtocol.doRefer
    • 3、RegistryProtocol.interceptInvoker


前言

文章基于3.1.0版本进行分析

		<dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>3.1.0</version></dependency>

一、服务监听ContextRefreshedEvent

在springboot 中,refresh 是 AbstractApplicationContext 中的一个方法,负责初始化 ApplicationContext 容器,容器必须调用 refresh 才能正常工作是进行,refresh的最后处理了 finishRefresh 方法,改方法会广播一个ContextRefreshedEvent容器刷新完成事件,所有监听了该事件的bean都会去执行相关逻辑处理。

1、AbstractApplicationContext.refresh

public void refresh() throws BeansException, IllegalStateException {synchronized(this.startupShutdownMonitor) {// 省略无关代码***// 初始化生命周期处理器,调用生命周期处理器onRefresh方法,发布ContextRefreshedEvent事件,JMX相关处理this.finishRefresh();// 省略无关代码***}}

2、AbstractApplicationContext.finishRefresh

	protected void finishRefresh() {// 清除上下文资源缓存(如扫描中的ASM元数据) scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// 广播刷新完成事件publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.if (!NativeDetector.inNativeImage()) {LiveBeansView.registerApplicationContext(this);}}

3、DubboDeployApplicationListener.onApplicationEvent

dubbo很好的结合了spring的这一个拓展点,在这个拓展点开始实现服务的发布。可以看到,DubboDeployApplicationListener实现了ContextRefreshedEvent的消息监听

public class DubboDeployApplicationListener implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, Ordered {private static final Logger logger = LoggerFactory.getLogger(DubboDeployApplicationListener.class);private ApplicationContext applicationContext;private ApplicationModel applicationModel;private ModuleModel moduleModel;@Overridepublic void onApplicationEvent(ApplicationContextEvent event) {if (nullSafeEquals(applicationContext, event.getSource())) {if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}}private void onContextRefreshedEvent(ContextRefreshedEvent event) {// 获取配置的deployer 进行发布,默认是 DefaultModuleDeployer ModuleDeployer deployer = moduleModel.getDeployer();Assert.notNull(deployer, "Module deployer is null");// start moduleFuture future = deployer.start();// if the module does not start in background, await finishif (!deployer.isBackground()) {try {// 等待发布结束future.get();} catch (InterruptedException e) {logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());} catch (Exception e) {logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);}}}}

4、DefaultModuleDeployer .referServices

DefaultModuleDeployer 中,真正核心的是ReferenceConfig,ReferenceConfig才是去实际发布的动作

public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {// DefaultApplicationDeployerprivate ApplicationDeployer applicationDeployer;@Overridepublic synchronized Future start() throws IllegalStateException {...// 不管是 DefaultApplicationDeployer 还是DefaultModuleDeployer的initialize方法,都是处理相关配置文件// 其功能等价于监听器 DubboConfigApplicationListenerapplicationDeployer.initialize();initialize();// 真正触发 服务注册功能exportServices();// prepare application instance// exclude internal module to avoid wait itselfif (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {applicationDeployer.prepareInternalModule();}// 服务消费// refer servicesreferServices();...return startFuture;}private void referServices() {configManager.getReferences().forEach(rc -> {try {ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;// 刷新配置if (!referenceConfig.isRefreshed()) {referenceConfig.refresh();}// 如果还没初始化if (rc.shouldInit()) {// 是否异步注入if (referAsync || rc.shouldReferAsync()) {ExecutorService executor = executorRepository.getServiceReferExecutor();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {referenceCache.get(rc);} catch (Throwable t) {logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);}}, executor);asyncReferringFutures.add(future);} else {// 查询缓存中是否存在代理对象 对应的实现类SimpleReferenceCachereferenceCache.get(rc);}}} catch (Throwable t) {logger.error(getIdentifier() + " refer catch error.");referenceCache.destroy(rc);throw t;}});}
}

5、SimpleReferenceCache.get

SimpleReferenceCache,一个用于缓存引用ReferenceConfigBase的util工具类。
ReferenceConfigBase是一个重对象,对于频繁创建ReferenceConfigBase的框架来说,有必要缓存这些对象。
如果需要使用复杂的策略,可以实现并使用自己的ReferenceConfigBase缓存
这个Cache是引用服务的开始如果我们想在代码中自定义一些服务引用的逻辑,可以直接创建SimpleReferenceCache类型对象然后调用其get方法进行引用服务。

	public <T> T get(ReferenceConfigBase<T> rc) {String key = generator.generateKey(rc);// 服务类型 如果是泛化调用则这个类型为GenericServiceClass<?> type = rc.getInterfaceClass();boolean singleton = rc.getSingleton() == null || rc.getSingleton();T proxy = null;// Check existing proxy of the same 'key' and 'type' first.// 单例if (singleton) {// 缓存数据找proxy = get(key, (Class<T>) type);} else {logger.warn("Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");}// 不存在消费的代理对象,创建rc.get(),最后走到ReferenceConfig.get()if (proxy == null) {// 获取或者创建值,为引用类型referencesOfType对象(类型为Map<Class<?>, List<ReferenceConfigBase<?>>>)缓存对象生成值(值不存咋时候会生成一个)List<ReferenceConfigBase<?>> referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>()));// 每次走到这里都会添加一个ReferenceConfigBase 引用配置对象(单例的从缓存中拿到就可以直接返回了)referencesOfType.add(rc);// 与前面一样 前面是类型映射,这里是key映射List<ReferenceConfigBase<?>> referenceConfigList = referenceKeyMap.computeIfAbsent(key, _k -> Collections.synchronizedList(new ArrayList<>()));referenceConfigList.add(rc);// 开始引用服务proxy = rc.get();}return proxy;}

二、引用服务 ReferenceConfig

服务发现和引用就是从这里开始的

1、时序图

2、ReferenceConfig.get

获取引用的代理对象

	public T get() {if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}if (ref == null) {// ensure start module, compatible with old api usage// 如果使用方直接调用了ReferenceConfigBase的get方法或者缓存对象SimpleReferenceCache类型的对象的get方法来引用服务端的时候就会造成很多配置没有初始化// 这个代码其实就是启动模块进行一些基础配置的初始化操作,比如元数据中心默认配置选择,注册中心默认配置选择这些都是比较重要的getScopeModel().getDeployer().start();synchronized (this) {if (ref == null) {init();}}}return ref;}

主要包括

  • checkAndUpdateSubConfigs() – 检查并更新缺省配置
  • init() – 消费者服务的初始化,核心逻辑。

3、ReferenceConfig.init

初始化代理对象

    protected synchronized void init() {// 初始化标记变量保证只初始化一次,这里又是加锁又是加标记变量的if (initialized && ref != null) {return;}try {// 刷新配置if (!this.isRefreshed()) {this.refresh();}// init serviceMetadata// 初始化元数据信息 如版本号,分组,服务接口名initServiceMetadata(consumer);// //继续初始化元数据信息 服务接口类型和keyserviceMetadata.setServiceType(getServiceInterfaceClass());// TODO, uncomment this line once service key is unifiedserviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));// 参数配置转成MAPMap<String, String> referenceParameters = appendConfig();// init service-application mapping// 初始化路径,参数转化成url,dubbo主要从url上面读取参数initServiceAppsMapping(referenceParameters);// 获取Module级别的服务存储仓库,其内部保存着服务提供者和服务消费者的缓存ModuleServiceRepository repository = getScopeModel().getServiceRepository();ServiceDescriptor serviceDescriptor;if (CommonConstants.NATIVE_STUB.equals(getProxy())) {serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);repository.registerService(serviceDescriptor);} else {serviceDescriptor = repository.registerService(interfaceClass);}// 创建消费者模型对象consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor,getScopeModel(), serviceMetadata, createAsyncMethodInfo(), interfaceClassLoader);// Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.consumerModel.setConfig(this);repository.registerConsumer(consumerModel);serviceMetadata.getAttachments().putAll(referenceParameters);// 创建引用的代理对象 核心代码ref = createProxy(referenceParameters);serviceMetadata.setTarget(ref);serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);consumerModel.setDestroyCaller(getDestroyRunner());consumerModel.setProxyObject(ref);consumerModel.initMethodModels();checkInvokerAvailable();} catch (Throwable t) {// *** 省略部分代码throw t;}initialized = true;}

主要流程

  • 服务引用前初始化serviceMetadata服务元数据
  • 获取服务仓库ModuleServiceRepository,并注册service 和 consumer
  • 调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象,并赋值给ref,核心逻辑

4、ReferenceConfig.createProxy

创建代理对象,实际调用需要通过这个代理对象进行调用

	private T createProxy(Map<String, String> referenceParameters) {// 本地引用if (shouldJvmRefer(referenceParameters)) {createInvokerForLocal(referenceParameters);} else {urls.clear();meshModeHandleUrl(referenceParameters);if (StringUtils.isNotEmpty(url)) {// user specified URL, could be peer-to-peer address, or register center's address.// url参数不为空,且可以配置多个。可以是点对点,也可以是注册中心,然后添加到urlsparseUrl(referenceParameters);} else {// if protocols not in jvm checkRegistryif (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {// 从注册表中获取URL并将其聚合。这个其实就是初始化一下注册中心的url配置aggregateUrlFromRegistry(referenceParameters);}}// 创建远程引用,创建远程引用调用器,承担服务调用的核心逻辑createInvokerForRemote();}if (logger.isInfoEnabled()) {logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +(Boolean.parseBoolean(referenceParameters.get(GENERIC_KEY)) ?" it's GenericService reference" : " it's not GenericService reference"));}URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,referenceParameters.get(INTERFACE_KEY), referenceParameters);consumerUrl = consumerUrl.setScopeModel(getScopeModel());consumerUrl = consumerUrl.setServiceModel(consumerModel);MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());// create service proxy// 构建一个代理对象,代理客户端的请求return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));}

5、ReferenceConfig.createInvokerForRemote

远程引用或者直连引用情况下,将会调用该方法,创建远程引用Invoker。对于一个注册中心url和多个注册中心url的处理是不一样的,一个注册中心对应一个invoker,最后封装到集群路由invoker

	private void createInvokerForRemote() {// 只有一个,秩序创建一个invokerif (urls.size() == 1) {URL curUrl = urls.get(0);// 自适应扩展类,这里和服务发布类似,先进入了RegistryProtocol.referinvoker = protocolSPI.refer(interfaceClass, curUrl);if (!UrlUtils.isRegistry(curUrl)) {List<Invoker<?>> invokers = new ArrayList<>();invokers.add(invoker);invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);}} else {List<Invoker<?>> invokers = new ArrayList<>();URL registryUrl = null;// 创建多个invokerfor (URL url : urls) {// For multi-registry scenarios, it is not checked whether each referInvoker is available.// Because this invoker may become available later.// invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl = url;}}// 创建一个集群路由invokerif (registryUrl != null) {// registry url is available// for multi-subscription scenario, use 'zone-aware' policy by defaultString cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker// (RegistryDirectory, routing happens here) -> Invokerinvoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);} else {// not a registry url, must be direct invoke.// 直连if (CollectionUtils.isEmpty(invokers)) {throw new IllegalArgumentException("invokers == null");}URL curUrl = invokers.get(0).getUrl();String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);}}}

服务发布的时候讲到 protocolSPI,protocolSPI.refer形成的调用链为
-Protocol$Adaptie -> ProtocolSerializationWrapper -> ProtocolFilterWrapper -> QosProtocolWrapper -> ProtocolListenerWrapper -> RegistryProtocol -> RegistryProtocol

三、注册协议 RegistryProtocol

应用级服务远程协议以service-discovery-registry开头,其对应的Protocol实现就是RegistryProtocol

1、RegistryProtocol.refer

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 组装配置中心的地址url = getRegistryUrl(url);// 获取用于操作Zookeeper的Registry类型 Registry registry = getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);String group = qs.get(GROUP_KEY);if (StringUtils.isNotEmpty(group)) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);}}// 降级容错的逻辑处理对象 类型为Cluster 实际类型为MockClusterWrapper 内部包装的是FailoverCluster// 后续调用服务失败时候会先失效转移再降级Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));// 主要干活的地方 生成引用 invokerreturn doRefer(cluster, registry, type, url, qs);}

生成配置中心的url

zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-springboot-start-consumer
&dubbo=2.0.2
&pid=8896
&qos.enable=false
&release=3.1.0
&timestamp=1725379058285

2、RegistryProtocol.doRefer

    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());consumerAttribute.remove(REFER_KEY);String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);// 消费者url信息URL consumerUrl = new ServiceConfigURL (p,null,null,parameters.get(REGISTER_IP_KEY),0, getPath(parameters, type),parameters,consumerAttribute);url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);// 带迁移性质的Invoker对象ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);// 执行迁移规则创建应用级优先的服务发现Invoker对象return interceptInvoker(migrationInvoker, url, consumerUrl);}

consumerUrl如下

consumer://192.168.0.101/org.sjl.dubbo.AsyncProvider?application=dubbo-springboot-start-consumer
&background=false
&dubbo=2.0.2
&interface=org.sjl.dubbo.AsyncProvider
&methods=sayHiAsync,sayHello,sayHelloAsync&pid=33100
&qos.enable=false
&register.ip=192.168.0.101
&release=3.1.0
&side=consumer
&sticky=false
&timeout=30000
&timestamp=1725379319215

3、RegistryProtocol.interceptInvoker

    protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {// 获取激活的注册协议监听器扩展里面registry.protocol.listener,这里激活的类型为MigrationRuleListenerList<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);if (CollectionUtils.isEmpty(listeners)) {return invoker;}for (RegistryProtocolListener listener : listeners) {// MigrationRuleListener// 迁移规则应用级引用listener.onRefer(this, invoker, consumerUrl, url);}return invoker;}

接下来就进入了迁移规则的应用级服务发现了,参考 dubbo 服务消费原理分析之应用级服务发现

这篇关于dubbo 服务消费原理分析之引用服务配置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1144310

相关文章

如何使用Nginx配置将80端口重定向到443端口

《如何使用Nginx配置将80端口重定向到443端口》这篇文章主要为大家详细介绍了如何将Nginx配置为将HTTP(80端口)请求重定向到HTTPS(443端口),文中的示例代码讲解详细,有需要的小伙... 目录1. 创建或编辑Nginx配置文件2. 配置HTTP重定向到HTTPS3. 配置HTTPS服务器

SpringBoot中配置Redis连接池的完整指南

《SpringBoot中配置Redis连接池的完整指南》这篇文章主要为大家详细介绍了SpringBoot中配置Redis连接池的完整指南,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以... 目录一、添加依赖二、配置 Redis 连接池三、测试 Redis 操作四、完整示例代码(一)pom.

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

Linux内核参数配置与验证详细指南

《Linux内核参数配置与验证详细指南》在Linux系统运维和性能优化中,内核参数(sysctl)的配置至关重要,本文主要来聊聊如何配置与验证这些Linux内核参数,希望对大家有一定的帮助... 目录1. 引言2. 内核参数的作用3. 如何设置内核参数3.1 临时设置(重启失效)3.2 永久设置(重启仍生效

IDEA自动生成注释模板的配置教程

《IDEA自动生成注释模板的配置教程》本文介绍了如何在IntelliJIDEA中配置类和方法的注释模板,包括自动生成项目名称、包名、日期和时间等内容,以及如何定制参数和返回值的注释格式,需要的朋友可以... 目录项目场景配置方法类注释模板定义类开头的注释步骤类注释效果方法注释模板定义方法开头的注释步骤方法注

Spring Boot循环依赖原理、解决方案与最佳实践(全解析)

《SpringBoot循环依赖原理、解决方案与最佳实践(全解析)》循环依赖指两个或多个Bean相互直接或间接引用,形成闭环依赖关系,:本文主要介绍SpringBoot循环依赖原理、解决方案与最... 目录一、循环依赖的本质与危害1.1 什么是循环依赖?1.2 核心危害二、Spring的三级缓存机制2.1 三

如何在Mac上安装并配置JDK环境变量详细步骤

《如何在Mac上安装并配置JDK环境变量详细步骤》:本文主要介绍如何在Mac上安装并配置JDK环境变量详细步骤,包括下载JDK、安装JDK、配置环境变量、验证JDK配置以及可选地设置PowerSh... 目录步骤 1:下载JDK步骤 2:安装JDK步骤 3:配置环境变量1. 编辑~/.zshrc(对于zsh

C#中async await异步关键字用法和异步的底层原理全解析

《C#中asyncawait异步关键字用法和异步的底层原理全解析》:本文主要介绍C#中asyncawait异步关键字用法和异步的底层原理全解析,本文给大家介绍的非常详细,对大家的学习或工作具有一... 目录C#异步编程一、异步编程基础二、异步方法的工作原理三、代码示例四、编译后的底层实现五、总结C#异步编程

售价599元起! 华为路由器X1/Pro发布 配置与区别一览

《售价599元起!华为路由器X1/Pro发布配置与区别一览》华为路由器X1/Pro发布,有朋友留言问华为路由X1和X1Pro怎么选择,关于这个问题,本期图文将对这二款路由器做了期参数对比,大家看... 华为路由 X1 系列已经正式发布并开启预售,将在 4 月 25 日 10:08 正式开售,两款产品分别为华

SQL server配置管理器找不到如何打开它

《SQLserver配置管理器找不到如何打开它》最近遇到了SQLserver配置管理器打不开的问题,尝试在开始菜单栏搜SQLServerManager无果,于是将自己找到的方法总结分享给大家,对SQ... 目录方法一:桌面图标进入方法二:运行窗口进入方法三:查找文件路径方法四:检查 SQL Server 安