一文弄懂@Async代理执行原理(从源码的角度深入理解@EnableAsync 注解开启原理)

本文主要是介绍一文弄懂@Async代理执行原理(从源码的角度深入理解@EnableAsync 注解开启原理),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

视频讲解:https://www.bilibili.com/video/BV1zi4y1e7fA


一直只知道 @Async是通过代理来实现的,在同一个方法里面调用为什么不可以,只是懵懂知道一点,抽时间刚好研究一下它的原理,发现和 @Transactional 的实现原理完全一样。


文章目录

  • 一、理论
  • 二、代理对象的生成过程
    • 2-1、开启异步 EnableAsync
    • 2-2、注入后置处理器 BeanPostProcessor
    • 2-3、代理之Advisor
      • 2-3-1、代理执行拦截器 interceptor
      • 2-3-2、拦截规则 pointcut
    • 2-4、代理对象生成
    • 2-5、扩展:bean后置处理方法怎么调用的
  • 三、代理之后异步调用的原理
    • 3-1、异步执行线程池的选择
    • 3-2、获取异步执行的结果
  • 四、继承异步

一、理论


想要使用 @Async 异步执行,首先需要开启异步功能,也就是在启动类上加开启异步的注解 @EnableAsync,然后在需要异步的方法上面加上异步注解即可 @Async

当在启动类上加了@EnableAsync 之后,它就会往容器里面注入一个 AsyncAnnotationBeanPostProcessor,它间接实现了BeanPostProcessor 和 BeanFactoryAware,重写了 setBeanFactory 和 postProcessAfterInitialization方法。

  • BeanFactoryAware 初始化了一个 AsyncAnnotationAdvisor 里面包含了 advice 和 pointcut
    • advice 里面定义了代理之后的逻辑 AnnotationAsyncExecutionInterceptor.invoke
    • pointcut 里面定义了对使用 @Async 注解的类和方法进行拦截
  • postProcessAfterInitialization 对满足pointcut 的bean 生成代理对象

二、代理对象的生成过程


2-1、开启异步 EnableAsync

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {Class<? extends Annotation> annotation() default Annotation.class;boolean proxyTargetClass() default false;// 默认使用 ProxyAdviceMode mode() default AdviceMode.PROXY;int order() default Ordered.LOWEST_PRECEDENCE;
}public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";@Override@Nullablepublic String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {case PROXY:return new String[] {ProxyAsyncConfiguration.class.getName()};case ASPECTJ:return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}}
}

2-2、注入后置处理器 BeanPostProcessor


@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");// 设置 beforeExistingAdvisors 为trueAsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();bpp.configure(this.executor, this.exceptionHandler);Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}
}

设置 beforeExistingAdvisors 为true,因为一个对象可能需要被多种规则代理,比如某个方法上同时加了 @Transactional 和 @Async,设置beforeExistingAdvisors 为 true的会让 @Async 放在最前面,下面代码会看到

public AsyncAnnotationBeanPostProcessor() {setBeforeExistingAdvisors(true);
}

AsyncAnnotationBeanPostProcessor类继承关系

在这里插入图片描述


2-3、代理之Advisor


AsyncAnnotationBeanPostProcessor 直接重写了 BeanFactoryAware的 setBeanFactory 方法,在该方法里面生成了 AsyncAnnotationAdvisor

@Override
public void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);if (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}advisor.setBeanFactory(beanFactory);this.advisor = advisor;
}

AsyncAnnotationAdvisor 创建的时候会生成 advice 也就是代理拦截器,和 pointcut 生成代理的拦截规则

public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);asyncAnnotationTypes.add(Async.class);try {asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}this.advice = buildAdvice(executor, exceptionHandler);this.pointcut = buildPointcut(asyncAnnotationTypes);
}

2-3-1、代理执行拦截器 interceptor

protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;
}

AnnotationAsyncExecutionInterceptor 的继承关系如下,在AsyncExecutionInterceptor 里面有个 invoke方法,代理之后执行就会走这个方法,下面详细讲解
在这里插入图片描述

2-3-2、拦截规则 pointcut


asyncAnnotationTypes 在上面可以看到传递的就是 Async.class ,这里生成了2个Pointcut,一个基于类,一个基于方法

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc);}result = result.union(mpc);}return (result != null ? result : Pointcut.TRUE);
}

2-4、代理对象生成


AsyncAnnotationBeanPostProcessor 的继承关系上面已经给出,在AbstractAdvisingBeanPostProcessor 重写了 BeanPostProcessor的 bean后置处理方法postProcessAfterInitialization ,在这个方法里面完成了代理对象的生成

public Object postProcessAfterInitialization(Object bean, String beanName) {if (this.advisor != null && !(bean instanceof AopInfrastructureBean)) {// 如果当前对象是代理对象,会走这个。 比如方法上面有 @Transactional ,走到这里之前就已经是代理对象了if (bean instanceof Advised) {Advised advised = (Advised)bean;if (!advised.isFrozen() && this.isEligible(AopUtils.getTargetClass(bean))) {// 这个字段已经为 true, 所以当前这个 advisor 会被放在最前,// 想想,异步要放在最前这个是合理的if (this.beforeExistingAdvisors) {advised.addAdvisor(0, this.advisor);} else {advised.addAdvisor(this.advisor);}return bean;}}// 正常情况我们的bean不是一个代理对象,所以走的是这个// 这个就没什么好说的了,直接基于规则生成代理对象了// isEligible 方法判断当前 bean是不是要被当前的 advisor 拦截代理if (this.isEligible(bean, beanName)) {ProxyFactory proxyFactory = this.prepareProxyFactory(bean, beanName);if (!proxyFactory.isProxyTargetClass()) {this.evaluateProxyInterfaces(bean.getClass(), proxyFactory);}proxyFactory.addAdvisor(this.advisor);this.customizeProxyFactory(proxyFactory);ClassLoader classLoader = this.getProxyClassLoader();if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {classLoader = ((SmartClassLoader)classLoader).getOriginalClassLoader();}return proxyFactory.getProxy(classLoader);} else {return bean;}} else {return bean;}
}

org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#isEligible(java.lang.Class<?>)

protected boolean isEligible(Class<?> targetClass) {Boolean eligible = (Boolean)this.eligibleBeans.get(targetClass);if (eligible != null) {return eligible;} else if (this.advisor == null) {return false;} else {eligible = AopUtils.canApply(this.advisor, targetClass);this.eligibleBeans.put(targetClass, eligible);return eligible;}
}

2-5、扩展:bean后置处理方法怎么调用的


BeanPostProcessor 接口里面只有两个方法,bean的前置处理和后置处理

public interface BeanPostProcessor {@Nullabledefault Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}@Nullabledefault Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {return bean;}
}

postProcessAfterInitialization 被调用的地方也只有一个 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory#applyBeanPostProcessorsAfterInitialization,简单理解Spring在bean完成初始化所有动作之后就开始进行调用这个方法进行bean的后置处理

@Override
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)throws BeansException {Object result = existingBean;// 找到所有的BeanPostProcessors, 一个个去匹配看是否能匹配的上for (BeanPostProcessor processor : getBeanPostProcessors()) {// 后置处理Object current = processor.postProcessAfterInitialization(result, beanName);if (current == null) {return result;}result = current;}return result;
}

每个bean都会走这个方法,我们可以用debug条件固定我们想要看到的bean
在这里插入图片描述


三、代理之后异步调用的原理


相对于 @Transactional 代理,@Async 代理的拦截操作要简单的多,简单来说就是把当前任务丢到线程池里面去运行


再来看一下异步代理拦截的继承关系图
在这里插入图片描述


在 AsyncExecutionInterceptor 里面有个 invoke 方法,代理之后的对象执行都走这个方法

public Object invoke(MethodInvocation invocation) throws Throwable {// 找到当前要执行的类和方法Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);// 找到执行当前方法的线程池AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");} else {// 组装当前任务Callable<Object> task = () -> {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future)result).get();}} catch (ExecutionException var4) {this.handleError(var4.getCause(), userDeclaredMethod, invocation.getArguments());} catch (Throwable var5) {this.handleError(var5, userDeclaredMethod, invocation.getArguments());}return null;};// 由线程池去执行任务return this.doSubmit(task, executor, invocation.getMethod().getReturnType());}
}

在这里插入图片描述


3-1、异步执行线程池的选择


在使用异步注解的时候我们有两种写法

  1. @Async 使用默认的线程池去执行
  2. @Async(“myExecutor”) 使用自定义线程池去执行
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {// 如果之前这个方法已经执行过了,就从缓存中拿到上次执行的线程池AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);if (executor == null) {// 获取当前方法 @Async 中的  value值String qualifier = this.getExecutorQualifier(method);Executor targetExecutor;// 如果 value不为空if (StringUtils.hasLength(qualifier)) {// 从bean容器中拿到这个线程池targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);} else {// 使用默认的线程池targetExecutor = (Executor)this.defaultExecutor.get();}if (targetExecutor == null) {return null;}executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);// 缓存一下this.executors.put(method, executor);}return (AsyncTaskExecutor)executor;
}

从BeanFactory 中拿到自定义的线程池

protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {if (beanFactory == null) {throw new IllegalStateException("BeanFactory must be set on " + this.getClass().getSimpleName() + " to access qualified executor '" + qualifier + "'");} else {return (Executor)BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);}
}

使用默认的线程池

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {// 这个会报错,会找到2个return (Executor)beanFactory.getBean(TaskExecutor.class);} catch (NoUniqueBeanDefinitionException var6) {try {// 最终是走这个拿到名为 taskExecutor 的线程池return (Executor)beanFactory.getBean("taskExecutor", Executor.class);} catch (NoSuchBeanDefinitionException var4) {// ...}} catch (NoSuchBeanDefinitionException var7) {// ...}}return null;
}

通过ApplicationContext拿到这个 taskExecutor,发现它的配置核心线程数8,最大线程数无限大
在这里插入图片描述


3-2、获取异步执行的结果


任务提交doSubmit方法如下,它使用了call方法,返回了一个 CompletableFuture

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {return task.call();} catch (Throwable var2) {throw new CompletionException(var2);}}, executor);} else if (ListenableFuture.class.isAssignableFrom(returnType)) {return ((AsyncListenableTaskExecutor)executor).submitListenable(task);} else if (Future.class.isAssignableFrom(returnType)) {return executor.submit(task);} else {executor.submit(task);return null;}
}

获取返回值样例

 @GetMapping("/one")public String fun() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = asyncService.funA();System.out.println("A的返回值: " + stringCompletableFuture.get());return "ok";
}@Async
public CompletableFuture<String> funA() {System.out.println("AsyncServiceImpl.funA : " + Thread.currentThread().getName());return  CompletableFuture.completedFuture("funA");
}

四、继承异步


上面的结论没有提到一种特殊的场景,继承

public class Father {@Asyncpublic void fun1(){System.out.println("father : "  + Thread.currentThread().getName());}
}@Service
public class Son extends Father {@Overridepublic void fun1(){super.fun1();System.out.println("son : " + Thread.currentThread().getName());}
}

测试代码:

    @Autowiredprivate Son son;@GetMapping("/two")public String two() {System.out.println("main : " + Thread.currentThread().getName());son.fun1();return "ok";}

执行结果:

main : main
father : task-1
son : task-1

是不是挺意外的? 理论上我们觉得 main和son 是一个线程, father是另外一个线程,但事实不是这样的。

这涉及到 CGLIB代理对象字节码生成逻辑挺复杂的,下次研究后再单独讲,这里先记三个点

  1. 使用 super调用父类方法的时候,和调用本类方法是一样的,不会异步
  2. 子类会继承父类方法上的 @Async,不管是否在重写的方法里面使用 super 关键字
  3. 如果子类不重写 fun1方法,直接调用父类的 fun1 也是异步

注:这一切将会在后面 CGLIB 代理对象生成的字节码里面揭晓

这篇关于一文弄懂@Async代理执行原理(从源码的角度深入理解@EnableAsync 注解开启原理)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/500339

相关文章

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

认识、理解、分类——acm之搜索

普通搜索方法有两种:1、广度优先搜索;2、深度优先搜索; 更多搜索方法: 3、双向广度优先搜索; 4、启发式搜索(包括A*算法等); 搜索通常会用到的知识点:状态压缩(位压缩,利用hash思想压缩)。

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL