dubbo 服务消费原理分析之引用服务配置

2024-09-07 06:28

本文主要是介绍dubbo 服务消费原理分析之引用服务配置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、服务监听ContextRefreshedEvent
    • 1、AbstractApplicationContext.refresh
    • 2、AbstractApplicationContext.finishRefresh
    • 3、DubboDeployApplicationListener.onApplicationEvent
    • 4、DefaultModuleDeployer .referServices
    • 5、SimpleReferenceCache.get
  • 二、引用服务 ReferenceConfig
    • 1、时序图
    • 2、ReferenceConfig.get
    • 3、ReferenceConfig.init
    • 4、ReferenceConfig.createProxy
    • 5、ReferenceConfig.createInvokerForRemote
  • 三、注册协议 RegistryProtocol
    • 1、RegistryProtocol.refer
    • 2、RegistryProtocol.doRefer
    • 3、RegistryProtocol.interceptInvoker


前言

文章基于3.1.0版本进行分析

		<dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>3.1.0</version></dependency>

一、服务监听ContextRefreshedEvent

在springboot 中,refresh 是 AbstractApplicationContext 中的一个方法,负责初始化 ApplicationContext 容器,容器必须调用 refresh 才能正常工作是进行,refresh的最后处理了 finishRefresh 方法,改方法会广播一个ContextRefreshedEvent容器刷新完成事件,所有监听了该事件的bean都会去执行相关逻辑处理。

1、AbstractApplicationContext.refresh

public void refresh() throws BeansException, IllegalStateException {synchronized(this.startupShutdownMonitor) {// 省略无关代码***// 初始化生命周期处理器,调用生命周期处理器onRefresh方法,发布ContextRefreshedEvent事件,JMX相关处理this.finishRefresh();// 省略无关代码***}}

2、AbstractApplicationContext.finishRefresh

	protected void finishRefresh() {// 清除上下文资源缓存(如扫描中的ASM元数据) scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// 广播刷新完成事件publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.if (!NativeDetector.inNativeImage()) {LiveBeansView.registerApplicationContext(this);}}

3、DubboDeployApplicationListener.onApplicationEvent

dubbo很好的结合了spring的这一个拓展点,在这个拓展点开始实现服务的发布。可以看到,DubboDeployApplicationListener实现了ContextRefreshedEvent的消息监听

public class DubboDeployApplicationListener implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, Ordered {private static final Logger logger = LoggerFactory.getLogger(DubboDeployApplicationListener.class);private ApplicationContext applicationContext;private ApplicationModel applicationModel;private ModuleModel moduleModel;@Overridepublic void onApplicationEvent(ApplicationContextEvent event) {if (nullSafeEquals(applicationContext, event.getSource())) {if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}}private void onContextRefreshedEvent(ContextRefreshedEvent event) {// 获取配置的deployer 进行发布,默认是 DefaultModuleDeployer ModuleDeployer deployer = moduleModel.getDeployer();Assert.notNull(deployer, "Module deployer is null");// start moduleFuture future = deployer.start();// if the module does not start in background, await finishif (!deployer.isBackground()) {try {// 等待发布结束future.get();} catch (InterruptedException e) {logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());} catch (Exception e) {logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);}}}}

4、DefaultModuleDeployer .referServices

DefaultModuleDeployer 中,真正核心的是ReferenceConfig,ReferenceConfig才是去实际发布的动作

public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {// DefaultApplicationDeployerprivate ApplicationDeployer applicationDeployer;@Overridepublic synchronized Future start() throws IllegalStateException {...// 不管是 DefaultApplicationDeployer 还是DefaultModuleDeployer的initialize方法,都是处理相关配置文件// 其功能等价于监听器 DubboConfigApplicationListenerapplicationDeployer.initialize();initialize();// 真正触发 服务注册功能exportServices();// prepare application instance// exclude internal module to avoid wait itselfif (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {applicationDeployer.prepareInternalModule();}// 服务消费// refer servicesreferServices();...return startFuture;}private void referServices() {configManager.getReferences().forEach(rc -> {try {ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;// 刷新配置if (!referenceConfig.isRefreshed()) {referenceConfig.refresh();}// 如果还没初始化if (rc.shouldInit()) {// 是否异步注入if (referAsync || rc.shouldReferAsync()) {ExecutorService executor = executorRepository.getServiceReferExecutor();CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {referenceCache.get(rc);} catch (Throwable t) {logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);}}, executor);asyncReferringFutures.add(future);} else {// 查询缓存中是否存在代理对象 对应的实现类SimpleReferenceCachereferenceCache.get(rc);}}} catch (Throwable t) {logger.error(getIdentifier() + " refer catch error.");referenceCache.destroy(rc);throw t;}});}
}

5、SimpleReferenceCache.get

SimpleReferenceCache,一个用于缓存引用ReferenceConfigBase的util工具类。
ReferenceConfigBase是一个重对象,对于频繁创建ReferenceConfigBase的框架来说,有必要缓存这些对象。
如果需要使用复杂的策略,可以实现并使用自己的ReferenceConfigBase缓存
这个Cache是引用服务的开始如果我们想在代码中自定义一些服务引用的逻辑,可以直接创建SimpleReferenceCache类型对象然后调用其get方法进行引用服务。

	public <T> T get(ReferenceConfigBase<T> rc) {String key = generator.generateKey(rc);// 服务类型 如果是泛化调用则这个类型为GenericServiceClass<?> type = rc.getInterfaceClass();boolean singleton = rc.getSingleton() == null || rc.getSingleton();T proxy = null;// Check existing proxy of the same 'key' and 'type' first.// 单例if (singleton) {// 缓存数据找proxy = get(key, (Class<T>) type);} else {logger.warn("Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");}// 不存在消费的代理对象,创建rc.get(),最后走到ReferenceConfig.get()if (proxy == null) {// 获取或者创建值,为引用类型referencesOfType对象(类型为Map<Class<?>, List<ReferenceConfigBase<?>>>)缓存对象生成值(值不存咋时候会生成一个)List<ReferenceConfigBase<?>> referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>()));// 每次走到这里都会添加一个ReferenceConfigBase 引用配置对象(单例的从缓存中拿到就可以直接返回了)referencesOfType.add(rc);// 与前面一样 前面是类型映射,这里是key映射List<ReferenceConfigBase<?>> referenceConfigList = referenceKeyMap.computeIfAbsent(key, _k -> Collections.synchronizedList(new ArrayList<>()));referenceConfigList.add(rc);// 开始引用服务proxy = rc.get();}return proxy;}

二、引用服务 ReferenceConfig

服务发现和引用就是从这里开始的

1、时序图

2、ReferenceConfig.get

获取引用的代理对象

	public T get() {if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}if (ref == null) {// ensure start module, compatible with old api usage// 如果使用方直接调用了ReferenceConfigBase的get方法或者缓存对象SimpleReferenceCache类型的对象的get方法来引用服务端的时候就会造成很多配置没有初始化// 这个代码其实就是启动模块进行一些基础配置的初始化操作,比如元数据中心默认配置选择,注册中心默认配置选择这些都是比较重要的getScopeModel().getDeployer().start();synchronized (this) {if (ref == null) {init();}}}return ref;}

主要包括

  • checkAndUpdateSubConfigs() – 检查并更新缺省配置
  • init() – 消费者服务的初始化,核心逻辑。

3、ReferenceConfig.init

初始化代理对象

    protected synchronized void init() {// 初始化标记变量保证只初始化一次,这里又是加锁又是加标记变量的if (initialized && ref != null) {return;}try {// 刷新配置if (!this.isRefreshed()) {this.refresh();}// init serviceMetadata// 初始化元数据信息 如版本号,分组,服务接口名initServiceMetadata(consumer);// //继续初始化元数据信息 服务接口类型和keyserviceMetadata.setServiceType(getServiceInterfaceClass());// TODO, uncomment this line once service key is unifiedserviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));// 参数配置转成MAPMap<String, String> referenceParameters = appendConfig();// init service-application mapping// 初始化路径,参数转化成url,dubbo主要从url上面读取参数initServiceAppsMapping(referenceParameters);// 获取Module级别的服务存储仓库,其内部保存着服务提供者和服务消费者的缓存ModuleServiceRepository repository = getScopeModel().getServiceRepository();ServiceDescriptor serviceDescriptor;if (CommonConstants.NATIVE_STUB.equals(getProxy())) {serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);repository.registerService(serviceDescriptor);} else {serviceDescriptor = repository.registerService(interfaceClass);}// 创建消费者模型对象consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor,getScopeModel(), serviceMetadata, createAsyncMethodInfo(), interfaceClassLoader);// Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.consumerModel.setConfig(this);repository.registerConsumer(consumerModel);serviceMetadata.getAttachments().putAll(referenceParameters);// 创建引用的代理对象 核心代码ref = createProxy(referenceParameters);serviceMetadata.setTarget(ref);serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);consumerModel.setDestroyCaller(getDestroyRunner());consumerModel.setProxyObject(ref);consumerModel.initMethodModels();checkInvokerAvailable();} catch (Throwable t) {// *** 省略部分代码throw t;}initialized = true;}

主要流程

  • 服务引用前初始化serviceMetadata服务元数据
  • 获取服务仓库ModuleServiceRepository,并注册service 和 consumer
  • 调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象,并赋值给ref,核心逻辑

4、ReferenceConfig.createProxy

创建代理对象,实际调用需要通过这个代理对象进行调用

	private T createProxy(Map<String, String> referenceParameters) {// 本地引用if (shouldJvmRefer(referenceParameters)) {createInvokerForLocal(referenceParameters);} else {urls.clear();meshModeHandleUrl(referenceParameters);if (StringUtils.isNotEmpty(url)) {// user specified URL, could be peer-to-peer address, or register center's address.// url参数不为空,且可以配置多个。可以是点对点,也可以是注册中心,然后添加到urlsparseUrl(referenceParameters);} else {// if protocols not in jvm checkRegistryif (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {// 从注册表中获取URL并将其聚合。这个其实就是初始化一下注册中心的url配置aggregateUrlFromRegistry(referenceParameters);}}// 创建远程引用,创建远程引用调用器,承担服务调用的核心逻辑createInvokerForRemote();}if (logger.isInfoEnabled()) {logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +(Boolean.parseBoolean(referenceParameters.get(GENERIC_KEY)) ?" it's GenericService reference" : " it's not GenericService reference"));}URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,referenceParameters.get(INTERFACE_KEY), referenceParameters);consumerUrl = consumerUrl.setScopeModel(getScopeModel());consumerUrl = consumerUrl.setServiceModel(consumerModel);MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());// create service proxy// 构建一个代理对象,代理客户端的请求return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));}

5、ReferenceConfig.createInvokerForRemote

远程引用或者直连引用情况下,将会调用该方法,创建远程引用Invoker。对于一个注册中心url和多个注册中心url的处理是不一样的,一个注册中心对应一个invoker,最后封装到集群路由invoker

	private void createInvokerForRemote() {// 只有一个,秩序创建一个invokerif (urls.size() == 1) {URL curUrl = urls.get(0);// 自适应扩展类,这里和服务发布类似,先进入了RegistryProtocol.referinvoker = protocolSPI.refer(interfaceClass, curUrl);if (!UrlUtils.isRegistry(curUrl)) {List<Invoker<?>> invokers = new ArrayList<>();invokers.add(invoker);invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);}} else {List<Invoker<?>> invokers = new ArrayList<>();URL registryUrl = null;// 创建多个invokerfor (URL url : urls) {// For multi-registry scenarios, it is not checked whether each referInvoker is available.// Because this invoker may become available later.// invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl = url;}}// 创建一个集群路由invokerif (registryUrl != null) {// registry url is available// for multi-subscription scenario, use 'zone-aware' policy by defaultString cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker// (RegistryDirectory, routing happens here) -> Invokerinvoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);} else {// not a registry url, must be direct invoke.// 直连if (CollectionUtils.isEmpty(invokers)) {throw new IllegalArgumentException("invokers == null");}URL curUrl = invokers.get(0).getUrl();String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);}}}

服务发布的时候讲到 protocolSPI,protocolSPI.refer形成的调用链为
-Protocol$Adaptie -> ProtocolSerializationWrapper -> ProtocolFilterWrapper -> QosProtocolWrapper -> ProtocolListenerWrapper -> RegistryProtocol -> RegistryProtocol

三、注册协议 RegistryProtocol

应用级服务远程协议以service-discovery-registry开头,其对应的Protocol实现就是RegistryProtocol

1、RegistryProtocol.refer

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 组装配置中心的地址url = getRegistryUrl(url);// 获取用于操作Zookeeper的Registry类型 Registry registry = getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);String group = qs.get(GROUP_KEY);if (StringUtils.isNotEmpty(group)) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);}}// 降级容错的逻辑处理对象 类型为Cluster 实际类型为MockClusterWrapper 内部包装的是FailoverCluster// 后续调用服务失败时候会先失效转移再降级Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));// 主要干活的地方 生成引用 invokerreturn doRefer(cluster, registry, type, url, qs);}

生成配置中心的url

zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-springboot-start-consumer
&dubbo=2.0.2
&pid=8896
&qos.enable=false
&release=3.1.0
&timestamp=1725379058285

2、RegistryProtocol.doRefer

    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());consumerAttribute.remove(REFER_KEY);String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);// 消费者url信息URL consumerUrl = new ServiceConfigURL (p,null,null,parameters.get(REGISTER_IP_KEY),0, getPath(parameters, type),parameters,consumerAttribute);url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);// 带迁移性质的Invoker对象ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);// 执行迁移规则创建应用级优先的服务发现Invoker对象return interceptInvoker(migrationInvoker, url, consumerUrl);}

consumerUrl如下

consumer://192.168.0.101/org.sjl.dubbo.AsyncProvider?application=dubbo-springboot-start-consumer
&background=false
&dubbo=2.0.2
&interface=org.sjl.dubbo.AsyncProvider
&methods=sayHiAsync,sayHello,sayHelloAsync&pid=33100
&qos.enable=false
&register.ip=192.168.0.101
&release=3.1.0
&side=consumer
&sticky=false
&timeout=30000
&timestamp=1725379319215

3、RegistryProtocol.interceptInvoker

    protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {// 获取激活的注册协议监听器扩展里面registry.protocol.listener,这里激活的类型为MigrationRuleListenerList<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);if (CollectionUtils.isEmpty(listeners)) {return invoker;}for (RegistryProtocolListener listener : listeners) {// MigrationRuleListener// 迁移规则应用级引用listener.onRefer(this, invoker, consumerUrl, url);}return invoker;}

接下来就进入了迁移规则的应用级服务发现了,参考 dubbo 服务消费原理分析之应用级服务发现

这篇关于dubbo 服务消费原理分析之引用服务配置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1144310

相关文章

springboot security之前后端分离配置方式

《springbootsecurity之前后端分离配置方式》:本文主要介绍springbootsecurity之前后端分离配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的... 目录前言自定义配置认证失败自定义处理登录相关接口匿名访问前置文章总结前言spring boot secu

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

一文详解SpringBoot响应压缩功能的配置与优化

《一文详解SpringBoot响应压缩功能的配置与优化》SpringBoot的响应压缩功能基于智能协商机制,需同时满足很多条件,本文主要为大家详细介绍了SpringBoot响应压缩功能的配置与优化,需... 目录一、核心工作机制1.1 自动协商触发条件1.2 压缩处理流程二、配置方案详解2.1 基础YAML

springboot简单集成Security配置的教程

《springboot简单集成Security配置的教程》:本文主要介绍springboot简单集成Security配置的教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录集成Security安全框架引入依赖编写配置类WebSecurityConfig(自定义资源权限规则

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI

SpringBoot中封装Cors自动配置方式

《SpringBoot中封装Cors自动配置方式》:本文主要介绍SpringBoot中封装Cors自动配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot封装Cors自动配置背景实现步骤1. 创建 GlobalCorsProperties

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

Spring Boot结成MyBatis-Plus最全配置指南

《SpringBoot结成MyBatis-Plus最全配置指南》本文主要介绍了SpringBoot结成MyBatis-Plus最全配置指南,包括依赖引入、配置数据源、Mapper扫描、基本CRUD操... 目录前言详细操作一.创建项目并引入相关依赖二.配置数据源信息三.编写相关代码查zsRArly询数据库数

SpringBoot配置Ollama实现本地部署DeepSeek

《SpringBoot配置Ollama实现本地部署DeepSeek》本文主要介绍了在本地环境中使用Ollama配置DeepSeek模型,并在IntelliJIDEA中创建一个Sprin... 目录前言详细步骤一、本地配置DeepSeek二、SpringBoot项目调用本地DeepSeek前言随着人工智能技

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An