本文主要是介绍Spring Retry机制详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
介绍
Spring框架提供了Spring Retry
能让在项目工程中很方便的使用重试。
使用
1、引入pom
<dependency><groupId>org.springframework.retry</groupId><artifactId>spring-retry</artifactId><version>1.3.2</version></dependency>
2、启用retry
在Configuration
上加 @EnableRetry
。
3、示例
@Service
public class RetryService {private int times = 0;private Instant begin = null;@Retryable(value = BizException.class, maxAttempts = 3,backoff= @Backoff(value = 1500, maxDelay = 10000, multiplier = 2))public void service() {Instant instant = Instant.now();if(begin == null){begin = instant;}times++;System.out.println(StrUtil.format(" call times: {} at {}. ", times, begin.until(instant, ChronoUnit.MILLIS) ));if (times < 5) {throw new BizException(StrUtil.format(" call times: {} error. ", times));}}@Recoverpublic void recover(BizException e){System.out.println("service retry after Recover => " + e.getMessage());}}
@PostMapping(value = "/retry")public R retry() {try {retryService.service();} catch (Exception ex) {ex.printStackTrace();return R.fail(ex.getMessage());}return R.ok();}
输出:
call times: 1 at 0. #第1次调用call times: 2 at 1504. #重试第1次 ,间隔 1.5 * 2 call times: 3 at 4506. #重试第2次 ,间隔
service retry after Recover => call times: 3 error. # recover
重试次数改为7 ,输出:
call times: 1 at 0. call times: 2 at 1506. # value = 1.5 ,第1 次间隔call times: 3 at 4507. # 第2次间隔 1.5 * 2 = 3 call times: 4 at 10508. # 第3次,1.5 * 2 * 2 = 6 call times: 5 at 20509. #最大延迟就是10call times: 6 at 30511. call times: 7 at 40512.
4、注解说明
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Retryable {/** recover 方法名 */String recover() default "";/** 自定义 interceptor bean name */String interceptor() default "";Class<? extends Throwable>[] value() default {};Class<? extends Throwable>[] include() default {};Class<? extends Throwable>[] exclude() default {};/** 统计报表 唯一 label */String label() default "";boolean stateful() default false;int maxAttempts() default 3;/** maxAttempts 表达式,spel 表达式,例如: ${retry.attempts:5} */String maxAttemptsExpression() default "";Backoff backoff() default @Backoff();/**1、直接使用 异常的属性。message.contains('you can retry this')2、其他方法判断:格式#{@bean.methodName(#root)}。methodName的返回值为boolean类型。#root是异常类,即用户可以在代码中判断是否进行重试*/String exceptionExpression() default "";String[] listeners() default {};}
-
@EnableRetry
:启用重试,proxyTargetClass
属性为true
时(默认false
),使用CGLIB
代理。 -
@Retryable
:标记当前方法会使用重试机制。value
:指定抛出那些异常才会触发重试(可以配置多个异常类型) 默认为空。include
:就是value
,默认为空,当exclude
也为空时,默认所有异常都可以触发重试exclude
:指定哪些异常不触发重试(可以配置多个异常类型),默认为空maxAttempts
:最大重试次数,默认3次(包括第一次调用)backoff
:重试等待策略 默认使用@Backoff
注解
-
@Backoff
:重试回退策略(立即重试还是等待一会再重试)value
: 重试的间隔时间(毫秒),默认为1000L
。delay
:value
的别名maxDelay
:重试次数之间的最大时间间隔,默认为0,如果小于delay
的设置,则默认为30000L
multiplier
:delay
时间的间隔倍数,默认为0
,表示固定暂停1
秒后进行重试,如果把multiplier
设置为1.5
,则第一次重试为2
秒,第二次为3
秒,第三次为4.5
秒。
- 不设置参数时,默认使用
FixedBackOffPolicy
(固定时间等待策略),重试等待1000ms
- 只设置
delay
时,使用FixedBackOffPolicy
,重试等待指定的毫秒数- 当设置
delay
和maxDealy
时,重试等待在这两个值之间均态分布- 设置
delay
,maxDealy
和multiplier
时,使用ExponentialBackOffPolicy
(倍数等待策略)- 当设置
multiplier
不等于0时,同时也设置了random
时,使用ExponentialRandomBackOffPolicy
(随机倍数等待策略),从 [1, multiplier-1] 中的均匀分布中为每个延迟选择乘数
@Recover
标记方法为@Retryable
失败时的“兜底”处理方法
- 传参与
@Retryable
的配置的value
必须一样。@Recover
的标记方法的参数必须要与@Retryable
注解value
“形参”保持一致,第一入参为要重试的异常(一定要是@Retryable
方法里抛出的异常或者异常父类),其他参数与@Retryable
保持一致,返回值也要一样,否则无法执行!
@CircuitBreaker
:用于标记方法,实现熔断模式。include
指定处理的异常类。默认为空exclude
指定不需要处理的异常。默认为空vaue
指定要重试的异常。默认为空maxAttempts
最大重试次数。默认3次openTimeout
配置熔断器打开的超时时间,默认5s
,当超过openTimeout
之后熔断器电路变成半打开状态(只要有一次重试成功,则闭合电路)resetTimeout
配置熔断器重新闭合的超时时间,默认20s
,超过这个时间断路器关闭
注意事项
-
使用了
@Retryable
注解的方法直接实例化调用不会触发重试,要先将实现类实例化到Spring
容器中,然后通过注入等方式使用 -
Spring-Retry
是通过捕获异常的方式来触发重试的,@Retryable
标注方法产生的异常不能使用try-catch
捕获,要在方法上抛出异常,不然不会触发重试 -
查询可以进行重试,写操作要慎重,除非业务方支持重入
原理
引入
@EnableRetry
注解,引入Retry能力,导入了RetryConfiguration
类。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(proxyTargetClass = false)
@Import(RetryConfiguration.class)
@Documented
public @interface EnableRetry {boolean proxyTargetClass() default false;}
@Component
public class RetryConfiguration extends AbstractPointcutAdvisorimplements IntroductionAdvisor, BeanFactoryAware, InitializingBean {private Advice advice; //private Pointcut pointcut;private RetryContextCache retryContextCache;private List<RetryListener> retryListeners;private MethodArgumentsKeyGenerator methodArgumentsKeyGenerator;private NewMethodArgumentsIdentifier newMethodArgumentsIdentifier;private Sleeper sleeper;private BeanFactory beanFactory;@Overridepublic ClassFilter getClassFilter() {return this.pointcut.getClassFilter();}@Overridepublic Class<?>[] getInterfaces() {return new Class[] { org.springframework.retry.interceptor.Retryable.class };}@Overridepublic void validateInterfaces() throws IllegalArgumentException {}}
RetryConfiguration
继承 AbstractPointcutAdvisor
,实现了 IntroductionAdvisor
,它有一个pointcut
和一个advice
,在IOC
过程中会根据PointcutAdvisor
类来对Bean进行Pointcut
的过滤,然后生成对应的AOP
代理类,用advice来加强处理。
初始化
afterPropertiesSet
方法进行初始化。
@Overridepublic void afterPropertiesSet() throws Exception {//RetryContextCachethis.retryContextCache = findBean(RetryContextCache.class);this.methodArgumentsKeyGenerator = findBean(MethodArgumentsKeyGenerator.class);this.newMethodArgumentsIdentifier = findBean(NewMethodArgumentsIdentifier.class);//RetryListenerthis.retryListeners = findBeans(RetryListener.class);this.sleeper = findBean(Sleeper.class);Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);//注解为 RetryableretryableAnnotationTypes.add(Retryable.class);//构造 pointcutthis.pointcut = buildPointcut(retryableAnnotationTypes);//构造 advicethis.advice = buildAdvice();if (this.advice instanceof BeanFactoryAware) {((BeanFactoryAware) this.advice).setBeanFactory(this.beanFactory);}}
buildPointcut 和 buildAdvice
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> retryAnnotationType : retryAnnotationTypes) {//根据 注解,构造 PointcutPointcut filter = new AnnotationClassOrMethodPointcut(retryAnnotationType);if (result == null) {result = new ComposablePointcut(filter);}else {result.union(filter);}}return result;}protected Advice buildAdvice() {//构造一个 AnnotationAwareRetryOperationsInterceptorAnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();if (this.retryContextCache != null) {interceptor.setRetryContextCache(this.retryContextCache);}if (this.retryListeners != null) {interceptor.setListeners(this.retryListeners);}if (this.methodArgumentsKeyGenerator != null) {interceptor.setKeyGenerator(this.methodArgumentsKeyGenerator);}if (this.newMethodArgumentsIdentifier != null) {interceptor.setNewItemIdentifier(this.newMethodArgumentsIdentifier);}if (this.sleeper != null) {interceptor.setSleeper(this.sleeper);}return interceptor;}
pointcut
private final class AnnotationClassOrMethodPointcut extends StaticMethodMatcherPointcut {private final MethodMatcher methodResolver;AnnotationClassOrMethodPointcut(Class<? extends Annotation> annotationType) {this.methodResolver = new AnnotationMethodMatcher(annotationType);setClassFilter(new AnnotationClassOrMethodFilter(annotationType));}@Overridepublic boolean matches(Method method, Class<?> targetClass) {return getClassFilter().matches(targetClass) || this.methodResolver.matches(method, targetClass);}@Overridepublic boolean equals(Object other) {if (this == other) {return true;}if (!(other instanceof AnnotationClassOrMethodPointcut)) {return false;}AnnotationClassOrMethodPointcut otherAdvisor = (AnnotationClassOrMethodPointcut) other;return ObjectUtils.nullSafeEquals(this.methodResolver, otherAdvisor.methodResolver);}}private final class AnnotationClassOrMethodFilter extends AnnotationClassFilter {private final AnnotationMethodsResolver methodResolver;AnnotationClassOrMethodFilter(Class<? extends Annotation> annotationType) {super(annotationType, true);this.methodResolver = new AnnotationMethodsResolver(annotationType);}@Overridepublic boolean matches(Class<?> clazz) {// 类的方法上 标记了指定注解。return super.matches(clazz) || this.methodResolver.hasAnnotatedMethods(clazz);}}private static class AnnotationMethodsResolver {private Class<? extends Annotation> annotationType;public AnnotationMethodsResolver(Class<? extends Annotation> annotationType) {this.annotationType = annotationType;}public boolean hasAnnotatedMethods(Class<?> clazz) {final AtomicBoolean found = new AtomicBoolean(false);//遍历所有的方法,ReflectionUtils.doWithMethods(clazz, new MethodCallback() {@Overridepublic void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {if (found.get()) {return;}//method 有注解。Annotation annotation = AnnotationUtils.findAnnotation(method,AnnotationMethodsResolver.this.annotationType);if (annotation != null) {found.set(true);}}});return found.get();}}
AnnotationAwareRetryOperationsInterceptor
buildAdvice()
方法会构造一个AnnotationAwareRetryOperationsInterceptor
实例。用于做增强操作。
public class AnnotationAwareRetryOperationsInterceptor implements IntroductionInterceptor, BeanFactoryAware {private static final TemplateParserContext PARSER_CONTEXT = new TemplateParserContext();private static final SpelExpressionParser PARSER = new SpelExpressionParser();private static final MethodInterceptor NULL_INTERCEPTOR = new MethodInterceptor() {@Overridepublic Object invoke(MethodInvocation methodInvocation) throws Throwable {throw new OperationNotSupportedException("Not supported");}};private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();//用于缓存每个object的用于 增强的方法。private final ConcurrentReferenceHashMap<Object, ConcurrentMap<Method, MethodInterceptor>> delegates = new ConcurrentReferenceHashMap<Object, ConcurrentMap<Method, MethodInterceptor>>();private RetryContextCache retryContextCache = new MapRetryContextCache();private MethodArgumentsKeyGenerator methodArgumentsKeyGenerator;private NewMethodArgumentsIdentifier newMethodArgumentsIdentifier;private Sleeper sleeper;private BeanFactory beanFactory;private RetryListener[] globalListeners;}
invoke
@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {//构造代理MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());if (delegate != null) {return delegate.invoke(invocation);}else {return invocation.proceed();}}private MethodInterceptor getDelegate(Object target, Method method) {//缓存ConcurrentMap<Method, MethodInterceptor> cachedMethods = this.delegates.get(target);if (cachedMethods == null) {cachedMethods = new ConcurrentHashMap<Method, MethodInterceptor>();}MethodInterceptor delegate = cachedMethods.get(method);if (delegate == null) {MethodInterceptor interceptor = NULL_INTERCEPTOR;//获取方法的 Retryable 注解。Retryable retryable = AnnotatedElementUtils.findMergedAnnotation(method, Retryable.class);if (retryable == null) {//父类?retryable = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Retryable.class);}if (retryable == null) {//在类上查找retryable = findAnnotationOnTarget(target, method, Retryable.class);}if (retryable != null) {//如果有 interceptor,则直接使用if (StringUtils.hasText(retryable.interceptor())) {interceptor = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);}else if (retryable.stateful()) {interceptor = getStatefulInterceptor(target, method, retryable);}else {interceptor = getStatelessInterceptor(target, method, retryable);}}cachedMethods.putIfAbsent(method, interceptor);delegate = cachedMethods.get(method);}this.delegates.putIfAbsent(target, cachedMethods);return delegate == NULL_INTERCEPTOR ? null : delegate;}
getStatefulInterceptor 和 getStatelessInterceptor
private MethodInterceptor getStatelessInterceptor(Object target, Method method, Retryable retryable) {//生成一个RetryTemplateRetryTemplate template = createTemplate(retryable.listeners());//生成retryPolicy template.setRetryPolicy(getRetryPolicy(retryable));//生成backoffPolicy template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));//RetryOperationsInterceptorreturn RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label()).recoverer(getRecoverer(target, method)).build();}private MethodInterceptor getStatefulInterceptor(Object target, Method method, Retryable retryable) {RetryTemplate template = createTemplate(retryable.listeners());template.setRetryContextCache(this.retryContextCache);//CircuitBreaker circuit = AnnotatedElementUtils.findMergedAnnotation(method, CircuitBreaker.class);if (circuit == null) {circuit = findAnnotationOnTarget(target, method, CircuitBreaker.class);}if (circuit != null) {RetryPolicy policy = getRetryPolicy(circuit);CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy(policy);breaker.setOpenTimeout(getOpenTimeout(circuit));breaker.setResetTimeout(getResetTimeout(circuit));template.setRetryPolicy(breaker);template.setBackOffPolicy(new NoBackOffPolicy());String label = circuit.label();if (!StringUtils.hasText(label)) {label = method.toGenericString();}return RetryInterceptorBuilder.circuitBreaker().keyGenerator(new FixedKeyGenerator("circuit")).retryOperations(template).recoverer(getRecoverer(target, method)).label(label).build();}RetryPolicy policy = getRetryPolicy(retryable);template.setRetryPolicy(policy);template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));String label = retryable.label();return RetryInterceptorBuilder.stateful().keyGenerator(this.methodArgumentsKeyGenerator).newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier).retryOperations(template).label(label).recoverer(getRecoverer(target, method)).build();}
RetryOperationsInterceptor
invoke
@Overridepublic Object invoke(final MethodInvocation invocation) throws Throwable {String name;if (StringUtils.hasText(this.label)) {name = this.label;}else {name = invocation.getMethod().toGenericString();}final String label = name;// RetryCallback,主要调用了invocation的proceed()方法RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(invocation, label) {@Overridepublic Object doWithRetry(RetryContext context) throws Exception {context.setAttribute(RetryContext.NAME, this.label);/** If we don't copy the invocation carefully it won't keep a reference to* the other interceptors in the chain. We don't have a choice here but to* specialise to ReflectiveMethodInvocation (but how often would another* implementation come along?).*/if (this.invocation instanceof ProxyMethodInvocation) {context.setAttribute("___proxy___", ((ProxyMethodInvocation) this.invocation).getProxy());try {return ((ProxyMethodInvocation) this.invocation).invocableClone().proceed();}catch (Exception e) {throw e;}catch (Error e) {throw e;}catch (Throwable e) {throw new IllegalStateException(e);}}else {throw new IllegalStateException("MethodInvocation of the wrong type detected - this should not happen with Spring AOP, "+ "so please raise an issue if you see this exception");}}};// recovererif (this.recoverer != null) {ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(invocation.getArguments(), // 真实调用的 参数this.recoverer);try {Object recovered = this.retryOperations.execute(retryCallback, recoveryCallback);return recovered;}finally {RetryContext context = RetrySynchronizationManager.getContext();if (context != null) {context.removeAttribute("__proxy__");}}}//最终还是进入到retryOperations的execute方法,这个retryOperations就是在之前的builder set进来的RetryTemplate。 return this.retryOperations.execute(retryCallback);}
private static final class ItemRecovererCallback implements RecoveryCallback<Object> {private final Object[] args;private final MethodInvocationRecoverer<?> recoverer;/*** @param args the item that failed.*/private ItemRecovererCallback(Object[] args, MethodInvocationRecoverer<?> recoverer) {this.args = Arrays.asList(args).toArray();this.recoverer = recoverer;}@Overridepublic Object recover(RetryContext context) {// this.argsreturn this.recoverer.recover(this.args, context.getLastThrowable());}}
无论是RetryOperationsInterceptor
还是StatefulRetryOperationsInterceptor
,最终的拦截处理逻辑还是调用到RetryTemplate
的execute
方法,从名字也看出来,RetryTemplate
作为一个模板类,里面包含了重试统一逻辑。
RetryTemplate
RetryTemplate
的 execute
方法主要就是参数的不同。核心就是3个参数:RetryCallback
,RecoveryCallback
,RetryState
@Overridepublic final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E {return doExecute(retryCallback, null, null);}@Overridepublic final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,RecoveryCallback<T> recoveryCallback) throws E {return doExecute(retryCallback, recoveryCallback, null);}@Overridepublic final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState)throws E, ExhaustedRetryException {return doExecute(retryCallback, null, retryState);}@Overridepublic final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,RecoveryCallback<T> recoveryCallback, RetryState retryState) throws E, ExhaustedRetryException {return doExecute(retryCallback, recoveryCallback, retryState);}protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {RetryPolicy retryPolicy = this.retryPolicy;BackOffPolicy backOffPolicy = this.backOffPolicy;//新建一个RetryContext来保存本轮重试的上下文 RetryContext context = open(retryPolicy, state);if (this.logger.isTraceEnabled()) {this.logger.trace("RetryContext retrieved: " + context);}// Make sure the context is available globally for clients who need// it...RetrySynchronizationManager.register(context);Throwable lastException = null;boolean exhausted = false;try {//如果有注册RetryListener,则会调用它的open方法,给调用者一个通知。 boolean running = doOpenInterceptors(retryCallback, context);if (!running) {throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");}// Get or Start the backoff context...BackOffContext backOffContext = null;Object resource = context.getAttribute("backOffContext");if (resource instanceof BackOffContext) {backOffContext = (BackOffContext) resource;}if (backOffContext == null) {backOffContext = backOffPolicy.start(context);if (backOffContext != null) {context.setAttribute("backOffContext", backOffContext);}}//判断能否重试,就是调用RetryPolicy的canRetry方法来判断。 //这个循环会直到原方法不抛出异常,或不需要再重试 while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {try {if (this.logger.isDebugEnabled()) {this.logger.debug("Retry: count=" + context.getRetryCount());}//清除上次记录的异常// the close interceptors will not think we failed...lastException = null;//doWithRetry方法,一般来说就是原方法 return retryCallback.doWithRetry(context);}catch (Throwable e) {//记录异常lastException = e;try {//记录异常信息 registerThrowable(retryPolicy, state, context, e);}catch (Exception ex) {throw new TerminatedRetryException("Could not register throwable", ex);}finally {//调用RetryListener的onError方法 doOnErrorInterceptors(retryCallback, context, e);}//再次判断能否重试 if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {try {//如果可以重试则走退避策略 backOffPolicy.backOff(backOffContext);}catch (BackOffInterruptedException ex) {lastException = e;// back off was prevented by another thread - fail the retryif (this.logger.isDebugEnabled()) {this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());}throw ex;}}if (this.logger.isDebugEnabled()) {this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());}if (shouldRethrow(retryPolicy, context, state)) {if (this.logger.isDebugEnabled()) {this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());}throw RetryTemplate.<E>wrapIfNecessary(e);}}/** A stateful attempt that can retry may rethrow the exception before now,* but if we get this far in a stateful retry there's a reason for it,* like a circuit breaker or a rollback classifier.*/if (state != null && context.hasAttribute(GLOBAL_STATE)) {break;}} // END WHILEif (state == null && this.logger.isDebugEnabled()) {this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());}exhausted = true;//重试结束后如果有兜底Recovery方法则执行,否则抛异常 return handleRetryExhausted(recoveryCallback, context, state);} //END FIRST TRYcatch (Throwable e) {throw RetryTemplate.<E>wrapIfNecessary(e);}finally {//处理一些关闭逻辑close(retryPolicy, context, state, lastException == null || exhausted);//调用RetryListener的close方法 doCloseInterceptors(retryCallback, context, lastException);RetrySynchronizationManager.clear();}}
重试策略
用来判断当方法调用异常时是否需要重试。常用策略有:
SimpleRetryPolicy
:默认最多重试3次TimeoutRetryPolicy
:默认在1秒内失败都会重试ExpressionRetryPolicy
:符合表达式就会重试CircuitBreakerRetryPolicy
:增加了熔断的机制,如果不在熔断状态,则允许重试CompositeRetryPolicy
:可以组合多个重试策略NeverRetryPolicy
:从不重试(也是一种重试策略哈)AlwaysRetryPolicy
:总是重试
重试策略最重要的方法就是 canRetry
public interface RetryPolicy extends Serializable {boolean canRetry(RetryContext context);RetryContext open(RetryContext parent);void close(RetryContext context);void registerThrowable(RetryContext context, Throwable throwable);
}
//SimpleRetryPolicy@Overridepublic boolean canRetry(RetryContext context) {Throwable t = context.getLastThrowable();//判断抛出的异常是否符合重试的异常 return (t == null || retryForException(t)) && context.getRetryCount() < this.maxAttempts;}//ExpressionRetryPolicy extends SimpleRetryPolicy public ExpressionRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,boolean traverseCauses, String expressionString, boolean defaultValue) {super(maxAttempts, retryableExceptions, traverseCauses, defaultValue);Assert.notNull(expressionString, "'expressionString' cannot be null");this.expression = getExpression(expressionString);}
退避策略
控制下一次的间隔时间。常用策略有:
FixedBackOffPolicy
默认固定延迟1秒后执行下一次重试ExponentialBackOffPolicy
指数递增延迟执行重试,默认初始0.1秒,系数是2,那么下次延迟0.2秒,再下次就是延迟0.4秒,如此类推,最大30秒。ExponentialRandomBackOffPolicy
在上面那个策略上增加随机性UniformRandomBackOffPolicy
这个跟上面的区别就是,上面的延迟会不停递增,这个只会在固定的区间随机StatelessBackOffPolicy
这个说明是无状态的,所谓无状态就是对上次的退避无感知,从它下面的子类也能看出来
退避策略主要方法是 backOff
public interface BackOffPolicy {BackOffContext start(RetryContext context);void backOff(BackOffContext backOffContext) throws BackOffInterruptedException;}
FixedBackOffPolicy
public class FixedBackOffPolicy extends StatelessBackOffPolicy implements SleepingBackOffPolicy<FixedBackOffPolicy> {private static final long DEFAULT_BACK_OFF_PERIOD = 1000L;private volatile long backOffPeriod = DEFAULT_BACK_OFF_PERIOD;private Sleeper sleeper = new ThreadWaitSleeper();public FixedBackOffPolicy withSleeper(Sleeper sleeper) {FixedBackOffPolicy res = new FixedBackOffPolicy();res.setBackOffPeriod(backOffPeriod);res.setSleeper(sleeper);return res;}protected void doBackOff() throws BackOffInterruptedException {try {//sleep 指定时间 // 内部:Thread.sleep(backOffPeriod);sleeper.sleep(backOffPeriod);}catch (InterruptedException e) {throw new BackOffInterruptedException("Thread interrupted while sleeping", e);}}}
//ExponentialBackOffPolicy@Overridepublic void backOff(BackOffContext backOffContext) throws BackOffInterruptedException {ExponentialBackOffContext context = (ExponentialBackOffContext) backOffContext;try {// ExponentialBackOffContextlong sleepTime = context.getSleepAndIncrement();if (this.logger.isDebugEnabled()) {this.logger.debug("Sleeping for " + sleepTime);}this.sleeper.sleep(sleepTime);}catch (InterruptedException e) {throw new BackOffInterruptedException("Thread interrupted while sleeping", e);}}//ExponentialBackOffContextpublic synchronized long getSleepAndIncrement() {// this.interval:本次间隔时间。在上一次结束时计算。long sleep = this.interval;if (sleep > this.maxInterval) {sleep = this.maxInterval;}else {this.interval = getNextInterval();}return sleep;}protected long getNextInterval() {return (long) (this.interval * this.multiplier);}
RetryContext
RetryContext
主要用于记录一些状态。
public interface RetryContext extends AttributeAccessor {String NAME = "context.name";String STATE_KEY = "context.state";String CLOSED = "context.closed";String RECOVERED = "context.recovered";String EXHAUSTED = "context.exhausted";void setExhaustedOnly();boolean isExhaustedOnly();RetryContext getParent();int getRetryCount();Throwable getLastThrowable();}
每一个策略都有对应的Context
。在Spring Retry
里,其实每一个策略都是单例来的。单例则会导致重试策略之间才产生冲突,不是单例,则多出了很多策略对象出来,增加了使用者的负担,这不是一个好的设计。
Spring Retry
采用了一个更加轻量级的做法,就是针对每一个需要重试的方法只new一个上下文Context
对象,然后在重试时,把这个Context
传到策略里,策略再根据这个Context
做重试,而且Spring Retry
还对这个Context
做了cache
。这样就相当于对重试的上下文做了优化。
private RetryContext doOpenInternal(RetryPolicy retryPolicy, RetryState state) {RetryContext context = retryPolicy.open(RetrySynchronizationManager.getContext());if (state != null) {context.setAttribute(RetryContext.STATE_KEY, state.getKey());}if (context.hasAttribute(GLOBAL_STATE)) {registerContext(context, state);}return context;}
附录
参考
Spring retry
Guava retry
参考:https://java.jverson.com/tools/guava-retryer.html
添加依赖
<dependency><groupId>com.github.rholder</groupId><artifactId>guava-retrying</artifactId><version>2.0.0</version></dependency>
示例
public boolean guavaTestTask(String param) {// 构建重试实例 可以设置重试源且可以支持多个重试源 可以配置重试次数或重试超时时间,以及可以配置等待时间间隔Retryer<Boolean> retriever = RetryerBuilder.<Boolean>newBuilder()// 重试的异常类以及子类.retryIfExceptionOfType(ServiceException.class)// 根据返回值进行重试.retryIfResult(result -> !result)// 设置等待间隔时间,每次请求间隔1s.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))// 设置最大重试次数,尝试请求3次.withStopStrategy(StopStrategies.stopAfterAttempt(3)).build();try {//调用 真实的服务。return retriever.call(() -> randomResult(param));} catch (Exception e) {e.printStackTrace();}return false;}
原理
Retryer 接口
定义了执行方法重试的方法,并提供了多个配置方法来设置重试条件、等待策略、停止策略等。它包含一个call
方法,将需要重试的操作以Callable
形式传递给它。
public V call(Callable<V> callable) throws ExecutionException, RetryException {long startTime = System.nanoTime();//根据attemptNumber进行循环次数for (int attemptNumber = 1; ; attemptNumber++) {// 进入方法不等待,立即执行一次Attempt<V> attempt;try {// 执行callable中的具体业务// attemptTimeLimiter限制了每次尝试等待的时长V result = attemptTimeLimiter.call(callable);// 利用调用结果构造新的attemptattempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));} catch (Throwable t) {attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));}// 遍历自定义的监听器for (RetryListener listener : listeners) {listener.onRetry(attempt);}// 判断是否满足重试条件,来决定是否继续等待并进行重试if (!rejectionPredicate.apply(attempt)) {return attempt.get();}// 此时满足停止策略,因为还没有得到想要的结果,因此抛出异常if (stopStrategy.shouldStop(attempt)) {throw new RetryException(attemptNumber, attempt);} else {// 行默认的停止策略——线程休眠long sleepTime = waitStrategy.computeSleepTime(attempt);try {// 也可以执行定义的停止策略blockStrategy.block(sleepTime);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RetryException(attemptNumber, attempt);}}}
}
RetryerBuilder 类
创建Retryer
实例的构建器类。通过RetryerBuilder
配置重试策略、条件和其他参数,最终构建出一个Retryer
实例。
Guava-Retry和Spring Retry 比较
- 框架来源:
Guava-Retry
:Guava-Retry是Google Guava
库的一部分,它提供了一种用于重试操作的机制。Spring Retry
:Spring Retry是Spring
框架的一个模块,专门用于在Spring应用程序中实现重试逻辑。
- 库依赖:
Guava-Retry
:需要添加Guava库的依赖。Spring Retry
:需要添加spring-retry
模块的依赖。
- 配置和注解:
Guava-Retry
:重试逻辑通过构建Retryer
实例并定义重试条件、等待策略等来配置。Spring Retry
:Spring Retry提供了注解(如@Retryable
、@Recover
等)和编程式配置来实现重试逻辑。
- 重试策略:
Guava-Retry
:Guava-Retry
基于结果和异常类型来定义重试条件。使用RetryerBuilder
来自定义重试策略。Spring Retry
:Spring Retry
使用注解来定义重试条件和相关属性,如最大重试次数、重试间隔等。
- 等待策略:
Guava-Retry
:Guava-Retry
提供了不同的等待策略(如固定等待、指数等待等),自己组合。Spring Retry
:Spring Retry
通过注解或编程式配置来指定等待时间。
- 适用范围:
Guava-Retry
:可以用于任何Java应用程序,不仅限于Spring框架。Spring Retry
:专门设计用于Spring应用程序中,可以与其他Spring功能(如Spring AOP
)集成。
- 依赖性:
Guava-Retry
:相对较轻量级,如果只需要重试功能,可以考虑使用Guava库的一部分。Spring Retry
:如果已使用Spring框架,可以方便地集成Spring Retry,但可能需要更多的Spring依赖。
这篇关于Spring Retry机制详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!