Resilience4j.Circuitbreaker源码分析

2023-10-24 09:59

本文主要是介绍Resilience4j.Circuitbreaker源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Resilience4j.Circuitbreaker

源码分析

Resilience4jCircuitBreaker主要由6个部分组成:

  • 管理断路器实例的注册容器(CircuitBreakerRegistry)
  • 断路器的相关配置(CircuitBreakerConfig)
  • 断路器的各种状态(CircuitBreakerState)
  • 触发断路器状态变化的指标(CircuitBreakerMetrics)
  • 断路器行为变化产生的事件(CircuitBreakerEvent)
  • 断路器本身(CircuitBreaker)

CircuitBreaker六部分

他们之间的调用关系如下图:

image-20220914181931953

接下来将会详细介绍下各部分内容及他们之间的关系

CircuitBreakerRegistry 断路器容器

image-20220914202425795

源码如下:

public interface CircuitBrakerRegistry extends Registry<CircuitBreaker, CircuitBreakerConfig> {// 根据自定义配置创建CircuitBreakerRegistry实例static CircuitBreakerRegistry of(CircuitBreakerConfig circuitBreakerConfig) {return new InMemoryCircuitBreakerRegistry(circuitBreakerConfig);}// ...// 使用默认的配置 创建CircuitBreakerRegistry实例static CircuitBreakerRegistry ofDefaults() {return new InMemoryCircuitBreakerRegistry();}// ...// 根据name获取CircuitBreakerCircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config);// ...
}
public final class InMemoryCircuitBreakerRegistry extendsAbstractRegistry<CircuitBreaker, CircuitBreakerConfig> implements CircuitBreakerRegistry {public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {super(defaultConfig);}// ... 
}public class AbstractRegistry<E, C> implements Registry<E, C> {// CircuitBreaker 实例protected final RegistryStore<E> entryMap;// CircuitBreakerConfigprotected final ConcurrentMap<String, C> configurations;// tagsprotected final Map<String, String> registryTags;private final RegistryEventProcessor eventProcessor;
}// RegistryStore 默认实现为 InMemoryRegistryStore
// 主要作用就是利用ConcurrentHashMap来存储CircuitBreaker实例
public class InMemoryRegistryStore<E> implements RegistryStore<E> {private final ConcurrentMap<String, E> entryMap;public InMemoryRegistryStore() {this.entryMap = new ConcurrentHashMap<>();}
}

CircuitBreakerConfig 断路器的相关配置

  • Predicate<Throwable> recordExceptionPredicate=throwable -> true; 判断是否是需要记录的异常,默认所有异常都是可记录的(会拿来计算失败率)
  • Predicate<Throwable> ignoreExceptionPredicate = throwable -> false; 判断是否需要忽略异常,默认都不忽略
  • Predicate<Object> recordResultPredicate = object -> false 判断结果集是否是需要计算失败率的,默认都不算
  • Class<? extends Throwable>[] recordExceptions = new Class[0]需要记录的异常
  • Class<? extends Throwable>[] ignoreExceptions = new Class[0] 需要忽略的异常
  • float failureRateThreshold = 50 失败率阈值 默认50%
  • int permittedNumberOfCallsInHalfOpenState = 10 断路器在半开状态下允许的成功执行的请求数,默认10
  • int slidingWindowSize = 100 滑动窗口大小 默认100,如果窗口类型是基于计数的滑动窗口 那么该值得含义就是固定数组的大小存放每次请求的结果。如果窗口类型是基于时间的滑动窗口 那么该值得含义就是桶大小 例如10 就会有10个存储桶每个存储桶窗口大小是1s
  • SlidingWindowType slidingWindowType =SlidingWindowType.COUNT_BASED滑动窗口类型 默认是基于计数的COUNT_BASED 还有一种基于时间轮的 TIME_BASED
  • int minimumNumberOfCalls = 100最小的请求或者调用数, 默认100, 如果调用数小于该值, 即使全部失败, 断路器也不会生效
  • boolean writableStackTraceEnabled = true启用可写的堆栈跟踪。设置为false时,Exception#getStackTrace返回长度为零的数组。当断路器打开时,这可以用来减少垃圾日志,因为已知异常的原
  • boolean automaticTransitionFromOpenToHalfOpenEnabled = false是否允许断路器自动从开启状态超时后转换为半开状态,如果开启,将会启动一个定时任务来转换状态
  • IntervalFunction waitIntervalFunctionInOpenState 断路器打开状态下的持续时间,默认60s
  • Duration slowCallDurationThreshold = Duration.ofSeconds(60s) 慢调用的时间阈值, 如果执行时间大于该值则认为是慢调用, 默认60s, 慢调用到一定比例将会触发熔断
  • float slowCallRateThreshold = 100慢调用的阈值,百分比,默认100% 也就是所有的请求都比slowCallDurationThreshold大,就触发熔断
  • Duration maxWaitDurationInHalfOpenState = Duration.ofSeconds(0) 半开状态下的最大持续时间,达到该时间就自动转换为打开状态,默认为0 则是一直保持半开状态,直到 minimumNumberOfCalls成功或者失败,如果大于等于1ms则会启动个定时任务来处理状态转换自动转换为开启状态

CircuitBreakerState(断路器状态)

image-20220915165043210

六种状态

CircuitBreaker目前共用六种状态,三种常用状态:关闭 (CLOSED)、打开(OPEN)、半开(HALF_OPEN),三种特殊状态:禁用(DISABLE)、强制打开(FORCED_OPEN)、仅监控(METRICS_ONLY)

image-20220914213828370

状态转换图如下:

image-20220914213851609

  • CLOSED状态下,当失败率、慢调用率(参见CircuitBreakerConfig#failureRateThreshold#slowCallRateThreshold)超过阈值时,断路器状态由CLOSED转换为OPEN,默认失败率50%,慢调用率100%
  • OPEN状态下,当打开状态的持续时间(参见CircuitBreakerConfig#waitIntervalFunctionInOpenState )结束,断路器由OPEN转换为HALF_OPEN
  • HALF_OPEN状态下,此时有一部分请求可以执行(参见CircuitBreakerConfig#permittedNumberOfCallsInHalfOpenState),如果这些请求的失败率超过阈值时,断路器的状态将会由HALF_OPEN转换为OPEN,如果这些请求的失败率小于阈值,断路器的状态将会由HALF_OPEN转换为CLOSED

有限状态机实现 CircuitBreakerStateMachine

CircuitBreaker的各种状态之间转换是通过一个有限状态机CircuitBreakerStateMachine来实现的。

CircuitBreakerStateMachine类实现了CircuitBreaker接口,除了实现状态转换,还实现了熔断机制和事件发布机制。

  1. CircuitBreaker.State定义了状态的枚举值,也定义了每种状态是否允许发布事件
  2. CricuitBreakerStateMachine利用AtomicReference保证CircuitBreakerState引用的原子性, 初始化状态为关闭状态
  3. 状态转移核心方法stateTransition主要作用就是将A状态转换为B状态
public final class CircuitBreakerStateMachine implements CircuitBreaker {// 保证状态的原子性,初始状态为关闭状态private final AtomicReference<CircuitBreakerState> stateReference;private CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig, Clock clock, SchedulerFactory schedulerFactory, io.vavr.collection.Map<String, String> tags) {// ...省略// 初始值为关闭状态this.stateReference = new AtomicReference<>(new ClosedState());// ...}// 状态转换核心方法private void stateTransition(State newState,UnaryOperator<CircuitBreakerState> newStateGenerator) {// 先获取当前State然后执行状态更新方法,AtomicReference保证原子性CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {// StateTransition 定义了可以从 A状态到B状态的枚举, 如果不存在该case,transitionBetween方法将会抛出异常StateTransition.transitionBetween(getName(), currentState.getState(), newState);currentState.preTransitionHook();return newStateGenerator.apply(currentState);});publishStateTransitionEvent(StateTransition.transitionBetween(getName(), previousState.getState(), newState));}@Override // 转为DISABLEDpublic void transitionToDisabledState() {stateTransition(DISABLED, currentState -> new DisabledState());}@Override // 转为METRICS_ONLYpublic void transitionToMetricsOnlyState() {stateTransition(METRICS_ONLY, currentState -> new MetricsOnlyState());}@Override // 转为FORCE_OPENpublic void transitionToForcedOpenState() {stateTransition(FORCED_OPEN,currentState -> new ForcedOpenState(currentState.attempts() + 1));}@Override // 转为CLOSEDpublic void transitionToClosedState() {stateTransition(CLOSED, currentState -> new ClosedState());}@Override // 转为OPENpublic void transitionToOpenState() {stateTransition(OPEN,currentState -> new OpenState(currentState.attempts() + 1, currentState.getMetrics()));}@Override // 转为HALF_OPENpublic void transitionToHalfOpenState() {stateTransition(HALF_OPEN, currentState -> new HalfOpenState(currentState.attempts()));}
}

在这,我列举几个StateTransition

enum StateTransition {// CLOSED->CLOSEDCLOSED_TO_CLOSED(State.CLOSED, State.CLOSED),// CLOSED->OPENCLOSED_TO_OPEN(State.CLOSED, State.OPEN),CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),CLOSED_TO_METRICS_ONLY(State.CLOSED, State.METRICS_ONLY),CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),HALF_OPEN_TO_HALF_OPEN(State.HALF_OPEN, State.HALF_OPEN),HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),HALF_OPEN_TO_METRICS_ONLY(State.HALF_OPEN, State.METRICS_ONLY),HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),OPEN_TO_OPEN(State.OPEN, State.OPEN),OPEN_TO_CLOSED(State.OPEN, State.CLOSED),OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),OPEN_TO_DISABLED(State.OPEN, State.DISABLED),OPEN_TO_METRICS_ONLY(State.OPEN, State.METRICS_ONLY),OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),// 省略.....private final State fromState;private final State toState;StateTransition(State fromState, State toState) {this.fromState = fromState;this.toState = toState;}
}    

CircuitBreakerMetrics

首先,我们需要提前了解下我们调用结果也就是请求结果,总共有哪几种类型

enum Result {BELOW_THRESHOLDS, // 都低于阈值FAILURE_RATE_ABOVE_THRESHOLDS, // 失败率 高于阈值SLOW_CALL_RATE_ABOVE_THRESHOLDS, // 慢调用率 高于阈值ABOVE_THRESHOLDS, // 失败率、慢调用率 都高于阈值BELOW_MINIMUM_CALLS_THRESHOLD; // 总请求数低于最小请求数阈值public static boolean hasExceededThresholds(Result result) {return hasFailureRateExceededThreshold(result) ||hasSlowCallRateExceededThreshold(result);}public static boolean hasFailureRateExceededThreshold(Result result) {return result == ABOVE_THRESHOLDS || result == FAILURE_RATE_ABOVE_THRESHOLDS;}public static boolean hasSlowCallRateExceededThreshold(Result result) {return result == ABOVE_THRESHOLDS || result == SLOW_CALL_RATE_ABOVE_THRESHOLDS;}
}

CircuitBreakerMetrics 实现了 CircuitBreaker.Metrics 接口

// CircuitBreaker 指标
interface Metrics {float getFailureRate();// 返回失败率,如果总共请求数小于最小设定请求数 返回-1float getSlowCallRate();// 返回慢调用率int getNumberOfSlowCalls();// 返回慢调用数int getNumberOfSlowSuccessfulCalls();// 慢调用成功数int getNumberOfSlowFailedCalls();// 慢调用失败数int getNumberOfBufferedCalls();// 返回当前总请求数int getNumberOfFailedCalls();// 失败调用数long getNumberOfNotPermittedCalls();// 在打开状态下,返回当前不被允许请求调用的数量, 在关闭和半开下 始终为0int getNumberOfSuccessfulCalls();// 返回当前成功请求调用的数量
}class CircuitBreakerMetrics implements CircuitBreaker.Metrics {// 指标类 后续会讲private final Metrics metrics;// 失败率阈值private final float failureRateThreshold;// 慢调用阈值private final float slowCallRateThreshold;// 慢调用时间阈值 (纳秒级)private final long slowCallDurationThresholdInNanos;// 没执行数private final LongAdder numberOfNotPermittedCalls;private int minimumNumberOfCalls;/*** @param slidingWindowSize 滑动窗口大小,CircuitBreakerConfig 中配置* @param slidingWindowType 滑动窗口类型 有基于计数 和基于时间两种 默认 基于计数* @param circuitBreakerConfig config* @param clock 滑动窗口 为基于时间是可用 取值为 CircuitBreakerState中的clock*/private CircuitBreakerMetrics(int slidingWindowSize,CircuitBreakerConfig.SlidingWindowType slidingWindowType,CircuitBreakerConfig circuitBreakerConfig,Clock clock) {if (slidingWindowType == CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) {this.metrics = new FixedSizeSlidingWindowMetrics(slidingWindowSize);// 最小请求数 不能超过滑动窗口大小this.minimumNumberOfCalls = Math.min(circuitBreakerConfig.getMinimumNumberOfCalls(), slidingWindowSize);} else {this.metrics = new SlidingTimeWindowMetrics(slidingWindowSize, clock);this.minimumNumberOfCalls = circuitBreakerConfig.getMinimumNumberOfCalls();}this.failureRateThreshold = circuitBreakerConfig.getFailureRateThreshold();this.slowCallRateThreshold = circuitBreakerConfig.getSlowCallRateThreshold();this.slowCallDurationThresholdInNanos = circuitBreakerConfig.getSlowCallDurationThreshold().toNanos();this.numberOfNotPermittedCalls = new LongAdder();}// 调用成功public Result onSuccess(long duration, TimeUnit durationUnit) {Snapshot snapshot;// 超过慢调用时间阈值 及算做 慢成功 否则成功if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_SUCCESS);} else {snapshot = metrics.record(duration, durationUnit, Outcome.SUCCESS);}// 校验失败率或者慢调用率是否超过了阈值return checkIfThresholdsExceeded(snapshot);}// 调用失败public Result onError(long duration, TimeUnit durationUnit) {Snapshot snapshot;// 超过慢调用时间阈值 及算做 慢失败 否则失败if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_ERROR);} else {snapshot = metrics.record(duration, durationUnit, Outcome.ERROR);}// 校验失败率或者慢调用率是否超过了阈值return checkIfThresholdsExceeded(snapshot);}// 校验失败率或者慢调用率是否超过了阈值private Result checkIfThresholdsExceeded(Snapshot snapshot) {float failureRateInPercentage = getFailureRate(snapshot);float slowCallsInPercentage = getSlowCallRate(snapshot);// -1 表示 总请求数低于最小请求阈值 默认始终不熔断if (failureRateInPercentage == -1 || slowCallsInPercentage == -1) {return Result.BELOW_MINIMUM_CALLS_THRESHOLD;}// 失败和慢调用 阈值都满足if (failureRateInPercentage >= failureRateThreshold&& slowCallsInPercentage >= slowCallRateThreshold) {return Result.ABOVE_THRESHOLDS;}if (failureRateInPercentage >= failureRateThreshold) {return Result.FAILURE_RATE_ABOVE_THRESHOLDS;}if (slowCallsInPercentage >= slowCallRateThreshold) {return Result.SLOW_CALL_RATE_ABOVE_THRESHOLDS;}return Result.BELOW_THRESHOLDS;}private float getSlowCallRate(Snapshot snapshot) {int bufferedCalls = snapshot.getTotalNumberOfCalls();// 未达到最小请求数 返回-1if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {return -1.0f;}return snapshot.getSlowCallRate();}private float getFailureRate(Snapshot snapshot) {int bufferedCalls = snapshot.getTotalNumberOfCalls();// 未达到最小请求数 返回-1if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {return -1.0f;}return snapshot.getFailureRate();}
}

可以看到指标的 统计都是基于io.github.resilience4j.core.metrics.Metrics类,他有两种实现,一种是基于计数的滑动窗口,一种是基于时间的滑动窗口

public interface Metrics {/*** 负责记录一次请求的信息* @param duration     the duration of the call 执行时间* @param durationUnit the time unit of the duration 时间单位* @param outcome      the outcome of the call 执行结果*/Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome);/*** 获取当前度量指标快照信息*/Snapshot getSnapshot();enum Outcome {SUCCESS, ERROR, SLOW_SUCCESS, SLOW_ERROR}
}
滑动窗口算法

上面提到过,CircuitBreaker 总共有两种滑动窗口算法,一种是基于计数的滑动窗口,一种是基于时间的滑动窗口,接下来将会详细介绍下这两种算法,不过在此之前,容我先介绍几个类

  • AbstractAggregation 一个抽象聚合类,定义了失败数、总执行时间、慢调用数、慢失败数、总调用数
  • Total Aggregation 在AbstractAggregation 的基础上 新定义了一个方法,可以removeBucket
  • Measurement 在AbstractAggregation基础上,新定义了一个reset方法 负责重置数据
  • PartialAggregation 在AbstractAggregation基础上,新定义了一个变量 epochSecond 负责记录当前的秒级别数,reset 方法依旧是重置所有数据
基于计数的滑动窗口实现:io.github.resilience4j.core.metrics.FixedSizeSlidingWindowMetrics

基于计数的滑动窗口由N个测量值的圆形数组实现。
如果计数窗口大小为10,则圆形阵列始终具有10个测量值。
滑动窗口以增量方式更新总聚合。当记录新的呼叫结果时,将更新总汇总。收回最旧的度量后,将从总聚合中减去该度量,然后重置存储桶。(逐项扣除)
快照的获取时间为常数O(1),因为快照是预先聚合的,并且与窗口大小无关。
此实现的空间需求(内存消耗)应为O(n)

public class FixedSizeSlidingWindowMetrics implements Metrics {// 滑动窗口大小private final int windowSize;// 指标聚合信息 包含 总执行时间,慢调用数,慢调用失败数,失败数,调用数private final TotalAggregation totalAggregation;// 指标数组private final Measurement[] measurements;// 当前滑动窗口位置int headIndex;public FixedSizeSlidingWindowMetrics(int windowSize) {this.windowSize = windowSize;this.measurements = new Measurement[this.windowSize];this.headIndex = 0;// 初始化指标数组for (int i = 0; i < this.windowSize; i++) {measurements[i] = new Measurement();}this.totalAggregation = new TotalAggregation();}// 加锁 保证线程安全 record 就是调用结束后 回调的@Overridepublic synchronized Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome) {totalAggregation.record(duration, durationUnit, outcome);moveWindowByOne().record(duration, durationUnit, outcome);return new SnapshotImpl(totalAggregation);}// 加锁 保证线程安全public synchronized Snapshot getSnapshot() {return new SnapshotImpl(totalAggregation);}private Measurement moveWindowByOne() {// 后移当前滑动窗口位置moveHeadIndexByOne();// 获取旧指标信息Measurement latestMeasurement = getLatestMeasurement();// 从总聚合中减去旧指标totalAggregation.removeBucket(latestMeasurement);// 重置当前指标latestMeasurement.reset();return latestMeasurement;}private Measurement getLatestMeasurement() {return measurements[headIndex];}// 后移当前滑动窗口位置void moveHeadIndexByOne() {this.headIndex = (headIndex + 1) % windowSize;}
}
基于时间的滑动窗口实现: io.github.resilience4j.core.metrics.SlidingTimeWindowMetrics

基于时间的滑动窗口是由N个部分集合(存储桶)的圆形数组实现的。
如果时间窗口大小为10秒,则圆形数组始终具有10个部分聚合(存储桶)。每个存储桶都会汇总在某个时期内发生的所有调用的结果。(部分聚集)。圆形数组的头存储区存储当前纪元的呼叫结果。其他部分聚合存储前几秒的呼叫结果。
滑动窗口不会单独存储呼叫结果(元组),而是以增量方式更新部分聚合(存储桶)和总聚合。
当记录新的呼叫结果时,总聚合将增量更新。当最旧的存储桶被收回时,该存储桶的部分总聚合将从总聚合中减去,然后重置该存储桶。(逐项扣除)
快照的获取时间为常数O(1),因为快照是预先聚合的,并且与时间窗口的大小无关。
此实现的空间需求(内存消耗)应接近常数O(n),因为调用结果(元组)不是单独存储的。仅创建N个部分聚合和1个总聚合。
部分聚合由3个整数组成,以计算失败的呼叫数,慢速呼叫数和总呼叫数。一个长存储所有呼叫的总持续时间。

CircuitBreakerEvent

Resilience4j的事件机制采用的是观察者设计模式,核心类在io.github.resilience4j.core包下

  • EventConsumer<T> 事件接口消费者(观察者)
  • EventPublisher<T>事件接口发布者(被观察者)
  • EventProcessor<T> 实现了EventPublisher接口,封装了 消费者EventConsumer注册接口和负责处理对应事件通知

CircuitBreakerEvent 事件类型

目前共有八种:

  • CircuitBreakerOnSuccessEvent 请求调用成功时发布的事件
  • CircuitBreakerOnErrorEvent 请求调用失败时发布的事件
  • CircuitBreakerOnResetEvent 断路器重置时发布的事件
  • CircuitBreakerOnFailureRateExceededEvent 当达到失败率时发布的事件
  • CircuitBreakerOnIgnoredErrorEvent 当忽略的调用异常发生时发布的事件
  • CircuitBreakerOnSlowCallRateExceededEvent 当达到慢调用率时发布的事件
  • CircuitBreakerOnCallNotPermittedEvent 当请求不被允许调用时发布的事件
  • CircuitBreakerOnStateTransitionEvent 断路器状态转换时发布的事件

对应枚举可以参考CircuitBreakerEvent.Type

enum Type {ERROR(false),IGNORED_ERROR(false),SUCCESS(false),NOT_PERMITTED(false),STATE_TRANSITION(true),RESET(true),FORCED_OPEN(false),DISABLED(false),FAILURE_RATE_EXCEEDED(false),SLOW_CALL_RATE_EXCEEDED(false);public final boolean forcePublish;Type(boolean forcePublish) {this.forcePublish = forcePublish;}
}

image-20220914173149270

EventPublisher 中定义了八种注册事件处理方法

interface EventPublisher extendsio.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);EventPublisher onFailureRateExceeded(EventConsumer<CircuitBreakerOnFailureRateExceededEvent> eventConsumer);EventPublisher onSlowCallRateExceeded(EventConsumer<CircuitBreakerOnSlowCallRateExceededEvent> eventConsumer);
}

CircuitBreaker 断路器

最后 我们再回到我们的主角 CircuitBreaker来瞅瞅。

image-20220915161436952

注:该图省略了很多方法,具体使用什么请具体分析,这里只列举了一些常用的,用于分析;State、Metrics、StateTransition、EventPublisher 在前面都有提到。具体往前面翻翻吧

public interface CircuitBreaker {static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker,CheckedFunction0<T> supplier) {return () -> {// 尝试获取执行权限 如果没权限会抛出 CallNotPermittedException 异常 (此时处于熔断状态)circuitBreaker.acquirePermission();final long start = circuitBreaker.getCurrentTimestamp();try {// 执行T result = supplier.apply();long duration = circuitBreaker.getCurrentTimestamp() - start;// 执行成功后 则记录状态。内部会根据config配置的 recordResultPredicate 检测 result 是否是需要统计失败率circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);return result;} catch (Exception exception) {// Do not handle java.lang.Errorlong duration = circuitBreaker.getCurrentTimestamp() - start;// 异常状态下,记录失败状态,计算失败率circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);throw exception;}};}default <T> T executeCheckedSupplier(CheckedFunction0<T> checkedSupplier) throws Throwable {return decorateCheckedSupplier(this, checkedSupplier).apply();}// 这里负责创建一个CircuitBreaker 实例static CircuitBreaker of(String name, CircuitBreakerConfig circuitBreakerConfig) {// 熟悉吗 return new CircuitBreakerStateMachine(name, circuitBreakerConfig);}// 接下来是几个权限接口boolean tryAcquirePermission();// 和tryAcquirePermission区别不同在于 没权限 该方法会抛出 CallNotPermittedException 异常void acquirePermission();// 释放权限void releasePermission();// decorateCheckedSupplier 方法中会调这三个方法哦void onSuccess(long duration, TimeUnit durationUnit);void onError(long duration, TimeUnit durationUnit, Throwable throwable);void onResult(long duration, TimeUnit durationUnit, Object result);// 该方法负责将状态重置为CLOSED状态void reset();
}

接下来分析下 CircuitBreakerStateMachine 中的几个方法

public final class CircuitBreakerStateMachine implements CircuitBreaker {// 保证状态的原子性,初始状态为关闭状态private final AtomicReference<CircuitBreakerState> stateReference;// xxxPermission方法就没必要分析了 都是调用 CircuitBreakerState的方法public boolean tryAcquirePermission() {boolean callPermitted = stateReference.get().tryAcquirePermission();if (!callPermitted) {publishCallNotPermittedEvent();}return callPermitted;}public void releasePermission() {stateReference.get().releasePermission();}public void acquirePermission() {try {stateReference.get().acquirePermission();} catch (Exception e) {publishCallNotPermittedEvent();throw e;}}public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {Throwable cause = throwable.getCause();handleThrowable(duration, durationUnit, cause);} else {handleThrowable(duration, durationUnit, throwable);}}private void handleThrowable(long duration, TimeUnit durationUnit, Throwable throwable) {if (circuitBreakerConfig.getIgnoreExceptionPredicate().test(throwable)) {releasePermission();publishCircuitIgnoredErrorEvent(name, duration, durationUnit, throwable);} else if (circuitBreakerConfig.getRecordExceptionPredicate().test(throwable)) {publishCircuitErrorEvent(name, duration, durationUnit, throwable);stateReference.get().onError(duration, durationUnit, throwable);} else {publishSuccessEvent(duration, durationUnit);stateReference.get().onSuccess(duration, durationUnit);}}public void onSuccess(long duration, TimeUnit durationUnit) {publishSuccessEvent(duration, durationUnit);stateReference.get().onSuccess(duration, durationUnit);}public void onResult(long duration, TimeUnit durationUnit, @Nullable Object result) {if (result != null && circuitBreakerConfig.getRecordResultPredicate().test(result)) {ResultRecordedAsFailureException failure = new ResultRecordedAsFailureException(name, result);publishCircuitErrorEvent(name, duration, durationUnit, failure);stateReference.get().onError(duration, durationUnit, failure);} else {onSuccess(duration, durationUnit);}}
}

这篇关于Resilience4j.Circuitbreaker源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/274383

相关文章

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专

Spring中Bean有关NullPointerException异常的原因分析

《Spring中Bean有关NullPointerException异常的原因分析》在Spring中使用@Autowired注解注入的bean不能在静态上下文中访问,否则会导致NullPointerE... 目录Spring中Bean有关NullPointerException异常的原因问题描述解决方案总结

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

python-nmap实现python利用nmap进行扫描分析

《python-nmap实现python利用nmap进行扫描分析》Nmap是一个非常用的网络/端口扫描工具,如果想将nmap集成进你的工具里,可以使用python-nmap这个python库,它提供了... 目录前言python-nmap的基本使用PortScanner扫描PortScannerAsync异

Oracle数据库执行计划的查看与分析技巧

《Oracle数据库执行计划的查看与分析技巧》在Oracle数据库中,执行计划能够帮助我们深入了解SQL语句在数据库内部的执行细节,进而优化查询性能、提升系统效率,执行计划是Oracle数据库优化器为... 目录一、什么是执行计划二、查看执行计划的方法(一)使用 EXPLAIN PLAN 命令(二)通过 S