本文主要是介绍spring源码------@EnableAsync注解以及@Async注解如何配合完成方法异步调用的分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 1.`@EnableAsync`以及`@Async`的说明
- 1.1 `@Async`
- 1.2 `@EnableAsync`
- 2. 源码分析
- 2.1 基于`@Import`扩展的`AsyncConfigurationSelector`
- 2.2 配置异步方法执行相关配置的`ProxyAsyncConfiguration`
- 2.3 创建切点以及增强类的`AsyncAnnotationBeanPostProcessor`及其父类
- 2.3.1 创建增强类的`AsyncAnnotationBeanPostProcessor`
- 2.3.2 创建增强跟切点的`AsyncAnnotationAdvisor`
- 2.3.3 创建拦截器`AnnotationAsyncExecutionInterceptor`
- 2.3.4 创建切点
- 2.4 方法拦截增强的逻辑
- 2.4.1 拦截方法的`AsyncExecutionInterceptor`
- 2.4.2 获取方法调度用的执行器
- 2.4.3 进行方法的异步调用——使用线程池
1.@EnableAsync
以及@Async
的说明
1.1 @Async
spring从3.0版本开始支持异步的方法调用,只需要在方法或者类上面加上一个@Async
注解就可以了,当注解在类上面的时候,则表示整个类都作为异步方法。但是需要注意的是,当一个方法所在类上面已经存在@Configuration
注解的时候,这个时候@Async
注解是不支持的,至于为什么会不支持因为@Configuration
注解会对当前类进行代理增强,这个时候返回的类是一个经过代理的类,这个时候的方法可能已经不是原方法了,至于代理增强的相关的东西可以看看
spring源码------@Configuration跟@Component及其派生注解@Service等的区别以及spring对其代理增强的原理这个里面有对@Configuration
注解的解析。
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {//指定对应的Executor的beanNameString value() default "";}
1.2 @EnableAsync
spring在3.1版本的时候加上了是否开启异步方法支持的注解@EnableAsync
。这个注解是基于@Import
注解进行扩展的,关于@Import
可以看看spring源码解析------@Import注解解析与ImportSelector,ImportBeanDefinitionRegistrar以及DeferredImportSelector区别了解以下。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {//用户定义的需要进行异步执行的注解,默认只有Async注解Class<? extends Annotation> annotation() default Annotation.class;//是否创建基于CGLIB的代理对象boolean proxyTargetClass() default false;//代理模式选择 PROXY表示jdk的代理AdviceMode mode() default AdviceMode.PROXY;//对应的AsyncAnnotationBeanPostProcessor的拦截顺序int order() default Ordered.LOWEST_PRECEDENCE;}
2. 源码分析
2.1 基于@Import
扩展的AsyncConfigurationSelector
AsyncConfigurationSelector
类继承了AdviceModeImportSelector
类,而AdviceModeImportSelector
类在前面的一篇文章@EnableCaching,@Cacheable,@CacheEvict,@CachePut的实现原理中提到过。这个类主要是根据@Import
注解的扩展注解的类型的mode
属性来设置代理类型。
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector { public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";protected String getAdviceModeAttributeName() {return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;}public final String[] selectImports(AnnotationMetadata importingClassMetadata) {//获取AdviceModeImportSelector中的注解泛型的Class对象Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");//从传入的importingClassMetadata对象中获取对应的Class类型的注解的内部属性AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);if (attributes == null) {throw new IllegalArgumentException(String.format("@%s is not present on importing class '%s' as expected",annType.getSimpleName(), importingClassMetadata.getClassName()));}//获取属性中的代理类型属性“mode”的值AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());//根据代理类型获取需要注入的beanString[] imports = selectImports(adviceMode);if (imports == null) {throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);}return imports;}
}
其中selectImports
方法由子类来实现,这里进入AsyncConfigurationSelector
来看看实现的逻辑
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME ="org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";@Override@Nullablepublic String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {//默认的是jdk PROXY的模式case PROXY://返回ProxyAsyncConfiguration类,注入到容器中,配置异步方法相关的配置return new String[] {ProxyAsyncConfiguration.class.getName()};//使用AspectJ的模式case ASPECTJ://返回AspectJ关于异步方法相关的配置类AspectJAsyncConfigurationreturn new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}}
}
可以看到这里就是选择不同的代理模式情况下,需要注入不同的配置类。而默认情况下@EnableAsync
注解中的是PROXY
模式,这里我们也就这种模式进行分析,所以接下来就是ProxyAsyncConfiguration
类了。
2.2 配置异步方法执行相关配置的ProxyAsyncConfiguration
spring对于方法的异步执行的逻辑跟@EnableCache
注解类似的,都是基于对方法拦截完成的,这里的拦截涉及到了切点跟增强类。而ProxyAsyncConfiguration
中就配置了这两点。
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");//创建AsyncAnnotationBeanPostProcessor,实现了AbstractAdvisingBeanPostProcessor(内部定义了Advisor对象可以可以注册到容器中)AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();//设置executor跟exceptionHandler,spring都是空实现,也就是这两个值都是null,后面会设置默认的对象,也可以自己指定实现,bpp.configure(this.executor, this.exceptionHandler);//获取@EnableAsync注解中的annotation属性,这个annotation表示贴有这些注解的方法就是需要拦截的进行异步执行的方法Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");//如果annotation值不是默认的,则将这个加入到AsyncAnnotationBeanPostProcessor,后面进行拦截用if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}//是否对目标类进行代理bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));//设置AsyncAnnotationBeanPostProcessor的顺序,默认是最低的,这样可以在其他的 post-processors执行后在处理bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}
可以看到主要逻辑如下:
- 创建
AsyncAnnotationBeanPostProcessor
, - 配置异步执行方法的执行器executor,以及错误异常处理的exceptionHandler
- 然后解析
@EnableAsync
注解的annotation
属性,或许需要拦截代理的方法,然后设置进入。 - 设置是否代理目标对象
- 设置拦截器所处的顺序
接下来就是进入AsyncAnnotationBeanPostProcessor
了解相关的配置和处理异步相关的类。
2.3 创建切点以及增强类的AsyncAnnotationBeanPostProcessor
及其父类
2.3.1 创建增强类的AsyncAnnotationBeanPostProcessor
在AsyncAnnotationBeanPostProcessor
的构造器中有一个逻辑就是将当前类会创建的增强类作为所有增强类的第一个
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {public AsyncAnnotationBeanPostProcessor() {//是否将这个advisor放在advisor列表的第一个,也就是最开始拦截setBeforeExistingAdvisors(true);}
}
这个属性是在其父类中被用到的,这里简单的展示以下部分的代码。
//实现了BeanPostProcessor,在bean初始化之后调用
public Object postProcessAfterInitialization(Object bean, String beanName) {......if (bean instanceof Advised) {Advised advised = (Advised) bean;if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {// Add our local Advisor to the existing proxy's Advisor chain...//是否将当前的Advised放在调用链的最前面if (this.beforeExistingAdvisors) {advised.addAdvisor(0, this.advisor);}else {advised.addAdvisor(this.advisor);}return bean;}}......
}
AsyncAnnotationBeanPostProcessor
在这里还间接的实现了BeanFactoryAware
接口的setBeanFactory
方法,而这个方法里面就创建了增强相关的advisor。
public void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);//创建AsyncAnnotationAdvisor,AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);//将需要拦截的方法上的注解加入到advisorif (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}//设置容器到advisoradvisor.setBeanFactory(beanFactory);this.advisor = advisor;}
这里的创建的AsyncAnnotationAdvisor
间接实现了PointcutAdvisor
跟Advisor
,会将内部的Advice
以及Pointcut
注入到容器中,关于这两个类可以看看aop相关文章了解一下。这里直接进入到AsyncAnnotationAdvisor
中查看。
2.3.2 创建增强跟切点的AsyncAnnotationAdvisor
在AsyncAnnotationAdvisor
的构造器中,就包含了advice跟pointcut的创建入口。
public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);//在用户自定义的拦截注解上额外加入Async到需要拦截的注解集合asyncAnnotationTypes.add(Async.class);try {//增加对java的ejv的Asynchronous注解的支持asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}//创建advicethis.advice = buildAdvice(executor, exceptionHandler);//设置pointcutthis.pointcut = buildPointcut(asyncAnnotationTypes);}
这里主要就是将表示需要拦截的注解加入到需要对应的集合中,然后就开始对advice跟pointcut的对象的创建。这里先看advice。
2.3.3 创建拦截器AnnotationAsyncExecutionInterceptor
protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {//设置拦截器AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);//设置executor跟exceptionHandlerinterceptor.configure(executor, exceptionHandler);return interceptor;}
这里的configure就是配置用来执行异步调用方法的执行器Executor
以及处理调用出异常的时候处理类AsyncUncaughtExceptionHandler
public void configure(@Nullable Supplier<Executor> defaultExecutor,@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {//如果defaultExecutor为null,也就是没有指定Executor,设置默认的SimpleAsyncTaskExecutor(在子类AsyncExecutionInterceptor中重载getDefaultExecutor方法)this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));//如果没有指定exceptionHandler默认为SimpleAsyncUncaughtExceptionHandlerthis.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);}
其中getDefaultExecutor
方法子类AsyncExecutionInterceptor
进行了重载
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {//先调用父类的方法从容器中获取,Executor defaultExecutor = super.getDefaultExecutor(beanFactory);//如果没有设置Executor 则使用SimpleAsyncTaskExecutorreturn (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());}
2.3.4 创建切点
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {//创建Pointcut,会设置classFilter为AnnotationClassFilter,methodMatcher为TrueMethodMatcherPointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);//创建Pointcut,会设置classFilter为AnnotationClassFilter,methodMatcher为AnnotationMethodMatcherPointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc);}result = result.union(mpc);}return (result != null ? result : Pointcut.TRUE);}
这里主要包含两种,一种是AnnotationClassFilter
跟TrueMethodMatcher
结合的,另外一种是AnnotationClassFilter
跟AnnotationMethodMatcher
结合的。对于这三个类这里不进行分析,只说明其作用。作用就是对当前拦截的方法进行匹配判断,是否包含前面设置的那些需要拦截的注解,如果包含则说明,这个方法是需要进行异步执行的,没有则表示不匹配则会跳过这个方法。
2.4 方法拦截增强的逻辑
先总结一下上面那些步骤做了啥,主要就是做了下面的几点:
- 设置需要拦截的注解集合
asyncAnnotationType
- 设置方法的拦截处理器
AsyncExecutionInterceptor
, - 进行异步调用的执行器
Executor
,错误处理器SimpleAsyncUncaughtExceptionHandler
接下来就对这些进行分析。
2.4.1 拦截方法的AsyncExecutionInterceptor
这里直接进入实现了MethodInterceptor
接口的invoke
方法。
public Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);//根据方法相关信息,获取executor,这个executor可以指定也可以是默认的AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable<Object> task = () -> {try {//进入下一个拦截器链Object result = invocation.proceed();if (result instanceof Future) {//获取结果返回return ((Future<?>) result).get();}}catch (ExecutionException ex) {//进入到SimpleAsyncUncaughtExceptionHandler进行错误处理handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {//进入到SimpleAsyncUncaughtExceptionHandler进行错误处理handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};//使用选定的执行程序实际执行给定任务的委托,使用的是CompletableFuturereturn doSubmit(task, executor, invocation.getMethod().getReturnType());}
这里主要逻辑如下:
- 从目标类中获取最匹配的需要执行的方法
- 获取异步调度方法用的executor
- 执行完拦截链获取到最终结果,中间如果出错,则最后交给
SimpleAsyncUncaughtExceptionHandler
处理 - 使用executor来完成异步调用
这里进入到determineAsyncExecutor
看看怎么选择执行器的。
2.4.2 获取方法调度用的执行器
determineAsyncExecutor
方法在MethodInterceptor
的父类AsyncExecutionAspectSupport
中。
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {//缓存的AsyncTaskExecutor跟method的map集合AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;//获取贴有Async注解的方法,最终的是现在注册的AnnotationAsyncExecutionInterceptor类中,寻找Async注解中的value字段String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {//如果value有指定Executor则在容器中寻找关联的执行器targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {//使用默认的Executor,SimpleAsyncTaskExecutortargetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}//转换为AsyncTaskExecutor类型executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));//保存到缓存中this.executors.put(method, executor);}return executor;}
这里主要逻辑就是先获取配置指定,如果没有指定则用默认的。而获取配置的逻辑在AsyncExecutionInterceptor
的子类AnnotationAsyncExecutionInterceptor
中实现的。
protected String getExecutorQualifier(Method method) {//获取方法上的Async注解Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);if (async == null) {//注解为空,则获取声明这个方法的类上面有没有这个注解async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);}//返回Async注解value或者nullreturn (async != null ? async.value() : null);}
其主要逻辑就是获取方法或者类上面有没有@Async
注解,有则代表是需要异步调用的方法,没有则不是,是的还需要进一步获取注解中value
属性,看有没有指定执行器并返回。
2.4.3 进行方法的异步调用——使用线程池
在AsyncExecutionInterceptor
中获取到了AsyncTaskExecutor
类型的执行器之后,就是信息异步的方法调用了。这个逻辑在doSubmit
方法中。
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {//使用的java8 的新的Future相关API,CompletableFuture来完成if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {//提交任务return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {//提交任务return executor.submit(task);}else {//提交任务executor.submit(task);return null;}}
可以看到,最后的所有异步调用还是离不开线程池。到这里整个@EnableAsync
注解以及@Async
注解的分析就完结了。
这篇关于spring源码------@EnableAsync注解以及@Async注解如何配合完成方法异步调用的分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!