spring源码------@EnableAsync注解以及@Async注解如何配合完成方法异步调用的分析

本文主要是介绍spring源码------@EnableAsync注解以及@Async注解如何配合完成方法异步调用的分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

        • 1.`@EnableAsync`以及`@Async`的说明
          • 1.1 `@Async`
          • 1.2 `@EnableAsync`
        • 2. 源码分析
          • 2.1 基于`@Import`扩展的`AsyncConfigurationSelector`
          • 2.2 配置异步方法执行相关配置的`ProxyAsyncConfiguration`
          • 2.3 创建切点以及增强类的`AsyncAnnotationBeanPostProcessor`及其父类
            • 2.3.1 创建增强类的`AsyncAnnotationBeanPostProcessor`
            • 2.3.2 创建增强跟切点的`AsyncAnnotationAdvisor`
            • 2.3.3 创建拦截器`AnnotationAsyncExecutionInterceptor`
            • 2.3.4 创建切点
          • 2.4 方法拦截增强的逻辑
            • 2.4.1 拦截方法的`AsyncExecutionInterceptor`
            • 2.4.2 获取方法调度用的执行器
            • 2.4.3 进行方法的异步调用——使用线程池

1.@EnableAsync以及@Async的说明
1.1 @Async

 spring从3.0版本开始支持异步的方法调用,只需要在方法或者类上面加上一个@Async注解就可以了,当注解在类上面的时候,则表示整个类都作为异步方法。但是需要注意的是,当一个方法所在类上面已经存在@Configuration注解的时候,这个时候@Async注解是不支持的,至于为什么会不支持因为@Configuration注解会对当前类进行代理增强,这个时候返回的类是一个经过代理的类,这个时候的方法可能已经不是原方法了,至于代理增强的相关的东西可以看看
spring源码------@Configuration跟@Component及其派生注解@Service等的区别以及spring对其代理增强的原理这个里面有对@Configuration注解的解析。

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {//指定对应的Executor的beanNameString value() default "";}
1.2 @EnableAsync

 spring在3.1版本的时候加上了是否开启异步方法支持的注解@EnableAsync。这个注解是基于@Import注解进行扩展的,关于@Import可以看看spring源码解析------@Import注解解析与ImportSelector,ImportBeanDefinitionRegistrar以及DeferredImportSelector区别了解以下。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {//用户定义的需要进行异步执行的注解,默认只有Async注解Class<? extends Annotation> annotation() default Annotation.class;//是否创建基于CGLIB的代理对象boolean proxyTargetClass() default false;//代理模式选择 PROXY表示jdk的代理AdviceMode mode() default AdviceMode.PROXY;//对应的AsyncAnnotationBeanPostProcessor的拦截顺序int order() default Ordered.LOWEST_PRECEDENCE;}
2. 源码分析
2.1 基于@Import扩展的AsyncConfigurationSelector

AsyncConfigurationSelector类继承了AdviceModeImportSelector类,而AdviceModeImportSelector类在前面的一篇文章@EnableCaching,@Cacheable,@CacheEvict,@CachePut的实现原理中提到过。这个类主要是根据@Import注解的扩展注解的类型的mode属性来设置代理类型。

public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {	public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";protected String getAdviceModeAttributeName() {return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;}public final String[] selectImports(AnnotationMetadata importingClassMetadata) {//获取AdviceModeImportSelector中的注解泛型的Class对象Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");//从传入的importingClassMetadata对象中获取对应的Class类型的注解的内部属性AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);if (attributes == null) {throw new IllegalArgumentException(String.format("@%s is not present on importing class '%s' as expected",annType.getSimpleName(), importingClassMetadata.getClassName()));}//获取属性中的代理类型属性“mode”的值AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());//根据代理类型获取需要注入的beanString[] imports = selectImports(adviceMode);if (imports == null) {throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);}return imports;}
}

 其中selectImports方法由子类来实现,这里进入AsyncConfigurationSelector来看看实现的逻辑

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME ="org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";@Override@Nullablepublic String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {//默认的是jdk PROXY的模式case PROXY://返回ProxyAsyncConfiguration类,注入到容器中,配置异步方法相关的配置return new String[] {ProxyAsyncConfiguration.class.getName()};//使用AspectJ的模式case ASPECTJ://返回AspectJ关于异步方法相关的配置类AspectJAsyncConfigurationreturn new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}}
}

 可以看到这里就是选择不同的代理模式情况下,需要注入不同的配置类。而默认情况下@EnableAsync注解中的是PROXY模式,这里我们也就这种模式进行分析,所以接下来就是ProxyAsyncConfiguration类了。

2.2 配置异步方法执行相关配置的ProxyAsyncConfiguration

 spring对于方法的异步执行的逻辑跟@EnableCache注解类似的,都是基于对方法拦截完成的,这里的拦截涉及到了切点跟增强类。而ProxyAsyncConfiguration中就配置了这两点。

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");//创建AsyncAnnotationBeanPostProcessor,实现了AbstractAdvisingBeanPostProcessor(内部定义了Advisor对象可以可以注册到容器中)AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();//设置executor跟exceptionHandler,spring都是空实现,也就是这两个值都是null,后面会设置默认的对象,也可以自己指定实现,bpp.configure(this.executor, this.exceptionHandler);//获取@EnableAsync注解中的annotation属性,这个annotation表示贴有这些注解的方法就是需要拦截的进行异步执行的方法Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");//如果annotation值不是默认的,则将这个加入到AsyncAnnotationBeanPostProcessor,后面进行拦截用if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}//是否对目标类进行代理bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));//设置AsyncAnnotationBeanPostProcessor的顺序,默认是最低的,这样可以在其他的 post-processors执行后在处理bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}

 可以看到主要逻辑如下:

  1. 创建AsyncAnnotationBeanPostProcessor
  2. 配置异步执行方法的执行器executor,以及错误异常处理的exceptionHandler
  3. 然后解析@EnableAsync注解的annotation属性,或许需要拦截代理的方法,然后设置进入。
  4. 设置是否代理目标对象
  5. 设置拦截器所处的顺序

 接下来就是进入AsyncAnnotationBeanPostProcessor了解相关的配置和处理异步相关的类。

2.3 创建切点以及增强类的AsyncAnnotationBeanPostProcessor及其父类
2.3.1 创建增强类的AsyncAnnotationBeanPostProcessor

 在AsyncAnnotationBeanPostProcessor的构造器中有一个逻辑就是将当前类会创建的增强类作为所有增强类的第一个

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {public AsyncAnnotationBeanPostProcessor() {//是否将这个advisor放在advisor列表的第一个,也就是最开始拦截setBeforeExistingAdvisors(true);}
}

 这个属性是在其父类中被用到的,这里简单的展示以下部分的代码。

//实现了BeanPostProcessor,在bean初始化之后调用
public Object postProcessAfterInitialization(Object bean, String beanName) {......if (bean instanceof Advised) {Advised advised = (Advised) bean;if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {// Add our local Advisor to the existing proxy's Advisor chain...//是否将当前的Advised放在调用链的最前面if (this.beforeExistingAdvisors) {advised.addAdvisor(0, this.advisor);}else {advised.addAdvisor(this.advisor);}return bean;}}......
}

AsyncAnnotationBeanPostProcessor在这里还间接的实现了BeanFactoryAware接口的setBeanFactory方法,而这个方法里面就创建了增强相关的advisor。

	public void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);//创建AsyncAnnotationAdvisor,AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);//将需要拦截的方法上的注解加入到advisorif (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}//设置容器到advisoradvisor.setBeanFactory(beanFactory);this.advisor = advisor;}

 这里的创建的AsyncAnnotationAdvisor间接实现了PointcutAdvisorAdvisor,会将内部的Advice以及Pointcut注入到容器中,关于这两个类可以看看aop相关文章了解一下。这里直接进入到AsyncAnnotationAdvisor中查看。

2.3.2 创建增强跟切点的AsyncAnnotationAdvisor

 在AsyncAnnotationAdvisor的构造器中,就包含了advice跟pointcut的创建入口。

	public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);//在用户自定义的拦截注解上额外加入Async到需要拦截的注解集合asyncAnnotationTypes.add(Async.class);try {//增加对java的ejv的Asynchronous注解的支持asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}//创建advicethis.advice = buildAdvice(executor, exceptionHandler);//设置pointcutthis.pointcut = buildPointcut(asyncAnnotationTypes);}

 这里主要就是将表示需要拦截的注解加入到需要对应的集合中,然后就开始对advice跟pointcut的对象的创建。这里先看advice。

2.3.3 创建拦截器AnnotationAsyncExecutionInterceptor
	protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {//设置拦截器AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);//设置executor跟exceptionHandlerinterceptor.configure(executor, exceptionHandler);return interceptor;}

 这里的configure就是配置用来执行异步调用方法的执行器Executor以及处理调用出异常的时候处理类AsyncUncaughtExceptionHandler

	public void configure(@Nullable Supplier<Executor> defaultExecutor,@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {//如果defaultExecutor为null,也就是没有指定Executor,设置默认的SimpleAsyncTaskExecutor(在子类AsyncExecutionInterceptor中重载getDefaultExecutor方法)this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));//如果没有指定exceptionHandler默认为SimpleAsyncUncaughtExceptionHandlerthis.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);}

 其中getDefaultExecutor方法子类AsyncExecutionInterceptor进行了重载

	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {//先调用父类的方法从容器中获取,Executor defaultExecutor = super.getDefaultExecutor(beanFactory);//如果没有设置Executor 则使用SimpleAsyncTaskExecutorreturn (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());}
2.3.4 创建切点
	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {//创建Pointcut,会设置classFilter为AnnotationClassFilter,methodMatcher为TrueMethodMatcherPointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);//创建Pointcut,会设置classFilter为AnnotationClassFilter,methodMatcher为AnnotationMethodMatcherPointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc);}result = result.union(mpc);}return (result != null ? result : Pointcut.TRUE);}

 这里主要包含两种,一种是AnnotationClassFilterTrueMethodMatcher结合的,另外一种是AnnotationClassFilterAnnotationMethodMatcher结合的。对于这三个类这里不进行分析,只说明其作用。作用就是对当前拦截的方法进行匹配判断,是否包含前面设置的那些需要拦截的注解,如果包含则说明,这个方法是需要进行异步执行的,没有则表示不匹配则会跳过这个方法。

2.4 方法拦截增强的逻辑

 先总结一下上面那些步骤做了啥,主要就是做了下面的几点:

  1. 设置需要拦截的注解集合asyncAnnotationType
  2. 设置方法的拦截处理器AsyncExecutionInterceptor
  3. 进行异步调用的执行器Executor,错误处理器SimpleAsyncUncaughtExceptionHandler

 接下来就对这些进行分析。

2.4.1 拦截方法的AsyncExecutionInterceptor

 这里直接进入实现了MethodInterceptor接口的invoke方法。

	public Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);//根据方法相关信息,获取executor,这个executor可以指定也可以是默认的AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable<Object> task = () -> {try {//进入下一个拦截器链Object result = invocation.proceed();if (result instanceof Future) {//获取结果返回return ((Future<?>) result).get();}}catch (ExecutionException ex) {//进入到SimpleAsyncUncaughtExceptionHandler进行错误处理handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {//进入到SimpleAsyncUncaughtExceptionHandler进行错误处理handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};//使用选定的执行程序实际执行给定任务的委托,使用的是CompletableFuturereturn doSubmit(task, executor, invocation.getMethod().getReturnType());}

 这里主要逻辑如下:

  1. 从目标类中获取最匹配的需要执行的方法
  2. 获取异步调度方法用的executor
  3. 执行完拦截链获取到最终结果,中间如果出错,则最后交给SimpleAsyncUncaughtExceptionHandler处理
  4. 使用executor来完成异步调用

 这里进入到determineAsyncExecutor看看怎么选择执行器的。

2.4.2 获取方法调度用的执行器

determineAsyncExecutor方法在MethodInterceptor的父类AsyncExecutionAspectSupport中。

	protected AsyncTaskExecutor determineAsyncExecutor(Method method) {//缓存的AsyncTaskExecutor跟method的map集合AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;//获取贴有Async注解的方法,最终的是现在注册的AnnotationAsyncExecutionInterceptor类中,寻找Async注解中的value字段String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {//如果value有指定Executor则在容器中寻找关联的执行器targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {//使用默认的Executor,SimpleAsyncTaskExecutortargetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}//转换为AsyncTaskExecutor类型executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));//保存到缓存中this.executors.put(method, executor);}return executor;}

 这里主要逻辑就是先获取配置指定,如果没有指定则用默认的。而获取配置的逻辑在AsyncExecutionInterceptor的子类AnnotationAsyncExecutionInterceptor中实现的。

	protected String getExecutorQualifier(Method method) {//获取方法上的Async注解Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);if (async == null) {//注解为空,则获取声明这个方法的类上面有没有这个注解async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);}//返回Async注解value或者nullreturn (async != null ? async.value() : null);}

 其主要逻辑就是获取方法或者类上面有没有@Async注解,有则代表是需要异步调用的方法,没有则不是,是的还需要进一步获取注解中value属性,看有没有指定执行器并返回。

2.4.3 进行方法的异步调用——使用线程池

 在AsyncExecutionInterceptor中获取到了AsyncTaskExecutor类型的执行器之后,就是信息异步的方法调用了。这个逻辑在doSubmit方法中。

	protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {//使用的java8 的新的Future相关API,CompletableFuture来完成if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {//提交任务return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {//提交任务return executor.submit(task);}else {//提交任务executor.submit(task);return null;}}

 可以看到,最后的所有异步调用还是离不开线程池。到这里整个@EnableAsync注解以及@Async注解的分析就完结了。

这篇关于spring源码------@EnableAsync注解以及@Async注解如何配合完成方法异步调用的分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/783318

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听