Resilience4j.Circuitbreaker源码分析

2023-10-24 09:59

本文主要是介绍Resilience4j.Circuitbreaker源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Resilience4j.Circuitbreaker

源码分析

Resilience4jCircuitBreaker主要由6个部分组成:

  • 管理断路器实例的注册容器(CircuitBreakerRegistry)
  • 断路器的相关配置(CircuitBreakerConfig)
  • 断路器的各种状态(CircuitBreakerState)
  • 触发断路器状态变化的指标(CircuitBreakerMetrics)
  • 断路器行为变化产生的事件(CircuitBreakerEvent)
  • 断路器本身(CircuitBreaker)

CircuitBreaker六部分

他们之间的调用关系如下图:

image-20220914181931953

接下来将会详细介绍下各部分内容及他们之间的关系

CircuitBreakerRegistry 断路器容器

image-20220914202425795

源码如下:

public interface CircuitBrakerRegistry extends Registry<CircuitBreaker, CircuitBreakerConfig> {// 根据自定义配置创建CircuitBreakerRegistry实例static CircuitBreakerRegistry of(CircuitBreakerConfig circuitBreakerConfig) {return new InMemoryCircuitBreakerRegistry(circuitBreakerConfig);}// ...// 使用默认的配置 创建CircuitBreakerRegistry实例static CircuitBreakerRegistry ofDefaults() {return new InMemoryCircuitBreakerRegistry();}// ...// 根据name获取CircuitBreakerCircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config);// ...
}
public final class InMemoryCircuitBreakerRegistry extendsAbstractRegistry<CircuitBreaker, CircuitBreakerConfig> implements CircuitBreakerRegistry {public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {super(defaultConfig);}// ... 
}public class AbstractRegistry<E, C> implements Registry<E, C> {// CircuitBreaker 实例protected final RegistryStore<E> entryMap;// CircuitBreakerConfigprotected final ConcurrentMap<String, C> configurations;// tagsprotected final Map<String, String> registryTags;private final RegistryEventProcessor eventProcessor;
}// RegistryStore 默认实现为 InMemoryRegistryStore
// 主要作用就是利用ConcurrentHashMap来存储CircuitBreaker实例
public class InMemoryRegistryStore<E> implements RegistryStore<E> {private final ConcurrentMap<String, E> entryMap;public InMemoryRegistryStore() {this.entryMap = new ConcurrentHashMap<>();}
}

CircuitBreakerConfig 断路器的相关配置

  • Predicate<Throwable> recordExceptionPredicate=throwable -> true; 判断是否是需要记录的异常,默认所有异常都是可记录的(会拿来计算失败率)
  • Predicate<Throwable> ignoreExceptionPredicate = throwable -> false; 判断是否需要忽略异常,默认都不忽略
  • Predicate<Object> recordResultPredicate = object -> false 判断结果集是否是需要计算失败率的,默认都不算
  • Class<? extends Throwable>[] recordExceptions = new Class[0]需要记录的异常
  • Class<? extends Throwable>[] ignoreExceptions = new Class[0] 需要忽略的异常
  • float failureRateThreshold = 50 失败率阈值 默认50%
  • int permittedNumberOfCallsInHalfOpenState = 10 断路器在半开状态下允许的成功执行的请求数,默认10
  • int slidingWindowSize = 100 滑动窗口大小 默认100,如果窗口类型是基于计数的滑动窗口 那么该值得含义就是固定数组的大小存放每次请求的结果。如果窗口类型是基于时间的滑动窗口 那么该值得含义就是桶大小 例如10 就会有10个存储桶每个存储桶窗口大小是1s
  • SlidingWindowType slidingWindowType =SlidingWindowType.COUNT_BASED滑动窗口类型 默认是基于计数的COUNT_BASED 还有一种基于时间轮的 TIME_BASED
  • int minimumNumberOfCalls = 100最小的请求或者调用数, 默认100, 如果调用数小于该值, 即使全部失败, 断路器也不会生效
  • boolean writableStackTraceEnabled = true启用可写的堆栈跟踪。设置为false时,Exception#getStackTrace返回长度为零的数组。当断路器打开时,这可以用来减少垃圾日志,因为已知异常的原
  • boolean automaticTransitionFromOpenToHalfOpenEnabled = false是否允许断路器自动从开启状态超时后转换为半开状态,如果开启,将会启动一个定时任务来转换状态
  • IntervalFunction waitIntervalFunctionInOpenState 断路器打开状态下的持续时间,默认60s
  • Duration slowCallDurationThreshold = Duration.ofSeconds(60s) 慢调用的时间阈值, 如果执行时间大于该值则认为是慢调用, 默认60s, 慢调用到一定比例将会触发熔断
  • float slowCallRateThreshold = 100慢调用的阈值,百分比,默认100% 也就是所有的请求都比slowCallDurationThreshold大,就触发熔断
  • Duration maxWaitDurationInHalfOpenState = Duration.ofSeconds(0) 半开状态下的最大持续时间,达到该时间就自动转换为打开状态,默认为0 则是一直保持半开状态,直到 minimumNumberOfCalls成功或者失败,如果大于等于1ms则会启动个定时任务来处理状态转换自动转换为开启状态

CircuitBreakerState(断路器状态)

image-20220915165043210

六种状态

CircuitBreaker目前共用六种状态,三种常用状态:关闭 (CLOSED)、打开(OPEN)、半开(HALF_OPEN),三种特殊状态:禁用(DISABLE)、强制打开(FORCED_OPEN)、仅监控(METRICS_ONLY)

image-20220914213828370

状态转换图如下:

image-20220914213851609

  • CLOSED状态下,当失败率、慢调用率(参见CircuitBreakerConfig#failureRateThreshold#slowCallRateThreshold)超过阈值时,断路器状态由CLOSED转换为OPEN,默认失败率50%,慢调用率100%
  • OPEN状态下,当打开状态的持续时间(参见CircuitBreakerConfig#waitIntervalFunctionInOpenState )结束,断路器由OPEN转换为HALF_OPEN
  • HALF_OPEN状态下,此时有一部分请求可以执行(参见CircuitBreakerConfig#permittedNumberOfCallsInHalfOpenState),如果这些请求的失败率超过阈值时,断路器的状态将会由HALF_OPEN转换为OPEN,如果这些请求的失败率小于阈值,断路器的状态将会由HALF_OPEN转换为CLOSED

有限状态机实现 CircuitBreakerStateMachine

CircuitBreaker的各种状态之间转换是通过一个有限状态机CircuitBreakerStateMachine来实现的。

CircuitBreakerStateMachine类实现了CircuitBreaker接口,除了实现状态转换,还实现了熔断机制和事件发布机制。

  1. CircuitBreaker.State定义了状态的枚举值,也定义了每种状态是否允许发布事件
  2. CricuitBreakerStateMachine利用AtomicReference保证CircuitBreakerState引用的原子性, 初始化状态为关闭状态
  3. 状态转移核心方法stateTransition主要作用就是将A状态转换为B状态
public final class CircuitBreakerStateMachine implements CircuitBreaker {// 保证状态的原子性,初始状态为关闭状态private final AtomicReference<CircuitBreakerState> stateReference;private CircuitBreakerStateMachine(String name, CircuitBreakerConfig circuitBreakerConfig, Clock clock, SchedulerFactory schedulerFactory, io.vavr.collection.Map<String, String> tags) {// ...省略// 初始值为关闭状态this.stateReference = new AtomicReference<>(new ClosedState());// ...}// 状态转换核心方法private void stateTransition(State newState,UnaryOperator<CircuitBreakerState> newStateGenerator) {// 先获取当前State然后执行状态更新方法,AtomicReference保证原子性CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {// StateTransition 定义了可以从 A状态到B状态的枚举, 如果不存在该case,transitionBetween方法将会抛出异常StateTransition.transitionBetween(getName(), currentState.getState(), newState);currentState.preTransitionHook();return newStateGenerator.apply(currentState);});publishStateTransitionEvent(StateTransition.transitionBetween(getName(), previousState.getState(), newState));}@Override // 转为DISABLEDpublic void transitionToDisabledState() {stateTransition(DISABLED, currentState -> new DisabledState());}@Override // 转为METRICS_ONLYpublic void transitionToMetricsOnlyState() {stateTransition(METRICS_ONLY, currentState -> new MetricsOnlyState());}@Override // 转为FORCE_OPENpublic void transitionToForcedOpenState() {stateTransition(FORCED_OPEN,currentState -> new ForcedOpenState(currentState.attempts() + 1));}@Override // 转为CLOSEDpublic void transitionToClosedState() {stateTransition(CLOSED, currentState -> new ClosedState());}@Override // 转为OPENpublic void transitionToOpenState() {stateTransition(OPEN,currentState -> new OpenState(currentState.attempts() + 1, currentState.getMetrics()));}@Override // 转为HALF_OPENpublic void transitionToHalfOpenState() {stateTransition(HALF_OPEN, currentState -> new HalfOpenState(currentState.attempts()));}
}

在这,我列举几个StateTransition

enum StateTransition {// CLOSED->CLOSEDCLOSED_TO_CLOSED(State.CLOSED, State.CLOSED),// CLOSED->OPENCLOSED_TO_OPEN(State.CLOSED, State.OPEN),CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),CLOSED_TO_METRICS_ONLY(State.CLOSED, State.METRICS_ONLY),CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),HALF_OPEN_TO_HALF_OPEN(State.HALF_OPEN, State.HALF_OPEN),HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),HALF_OPEN_TO_METRICS_ONLY(State.HALF_OPEN, State.METRICS_ONLY),HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),OPEN_TO_OPEN(State.OPEN, State.OPEN),OPEN_TO_CLOSED(State.OPEN, State.CLOSED),OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),OPEN_TO_DISABLED(State.OPEN, State.DISABLED),OPEN_TO_METRICS_ONLY(State.OPEN, State.METRICS_ONLY),OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),// 省略.....private final State fromState;private final State toState;StateTransition(State fromState, State toState) {this.fromState = fromState;this.toState = toState;}
}    

CircuitBreakerMetrics

首先,我们需要提前了解下我们调用结果也就是请求结果,总共有哪几种类型

enum Result {BELOW_THRESHOLDS, // 都低于阈值FAILURE_RATE_ABOVE_THRESHOLDS, // 失败率 高于阈值SLOW_CALL_RATE_ABOVE_THRESHOLDS, // 慢调用率 高于阈值ABOVE_THRESHOLDS, // 失败率、慢调用率 都高于阈值BELOW_MINIMUM_CALLS_THRESHOLD; // 总请求数低于最小请求数阈值public static boolean hasExceededThresholds(Result result) {return hasFailureRateExceededThreshold(result) ||hasSlowCallRateExceededThreshold(result);}public static boolean hasFailureRateExceededThreshold(Result result) {return result == ABOVE_THRESHOLDS || result == FAILURE_RATE_ABOVE_THRESHOLDS;}public static boolean hasSlowCallRateExceededThreshold(Result result) {return result == ABOVE_THRESHOLDS || result == SLOW_CALL_RATE_ABOVE_THRESHOLDS;}
}

CircuitBreakerMetrics 实现了 CircuitBreaker.Metrics 接口

// CircuitBreaker 指标
interface Metrics {float getFailureRate();// 返回失败率,如果总共请求数小于最小设定请求数 返回-1float getSlowCallRate();// 返回慢调用率int getNumberOfSlowCalls();// 返回慢调用数int getNumberOfSlowSuccessfulCalls();// 慢调用成功数int getNumberOfSlowFailedCalls();// 慢调用失败数int getNumberOfBufferedCalls();// 返回当前总请求数int getNumberOfFailedCalls();// 失败调用数long getNumberOfNotPermittedCalls();// 在打开状态下,返回当前不被允许请求调用的数量, 在关闭和半开下 始终为0int getNumberOfSuccessfulCalls();// 返回当前成功请求调用的数量
}class CircuitBreakerMetrics implements CircuitBreaker.Metrics {// 指标类 后续会讲private final Metrics metrics;// 失败率阈值private final float failureRateThreshold;// 慢调用阈值private final float slowCallRateThreshold;// 慢调用时间阈值 (纳秒级)private final long slowCallDurationThresholdInNanos;// 没执行数private final LongAdder numberOfNotPermittedCalls;private int minimumNumberOfCalls;/*** @param slidingWindowSize 滑动窗口大小,CircuitBreakerConfig 中配置* @param slidingWindowType 滑动窗口类型 有基于计数 和基于时间两种 默认 基于计数* @param circuitBreakerConfig config* @param clock 滑动窗口 为基于时间是可用 取值为 CircuitBreakerState中的clock*/private CircuitBreakerMetrics(int slidingWindowSize,CircuitBreakerConfig.SlidingWindowType slidingWindowType,CircuitBreakerConfig circuitBreakerConfig,Clock clock) {if (slidingWindowType == CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) {this.metrics = new FixedSizeSlidingWindowMetrics(slidingWindowSize);// 最小请求数 不能超过滑动窗口大小this.minimumNumberOfCalls = Math.min(circuitBreakerConfig.getMinimumNumberOfCalls(), slidingWindowSize);} else {this.metrics = new SlidingTimeWindowMetrics(slidingWindowSize, clock);this.minimumNumberOfCalls = circuitBreakerConfig.getMinimumNumberOfCalls();}this.failureRateThreshold = circuitBreakerConfig.getFailureRateThreshold();this.slowCallRateThreshold = circuitBreakerConfig.getSlowCallRateThreshold();this.slowCallDurationThresholdInNanos = circuitBreakerConfig.getSlowCallDurationThreshold().toNanos();this.numberOfNotPermittedCalls = new LongAdder();}// 调用成功public Result onSuccess(long duration, TimeUnit durationUnit) {Snapshot snapshot;// 超过慢调用时间阈值 及算做 慢成功 否则成功if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_SUCCESS);} else {snapshot = metrics.record(duration, durationUnit, Outcome.SUCCESS);}// 校验失败率或者慢调用率是否超过了阈值return checkIfThresholdsExceeded(snapshot);}// 调用失败public Result onError(long duration, TimeUnit durationUnit) {Snapshot snapshot;// 超过慢调用时间阈值 及算做 慢失败 否则失败if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_ERROR);} else {snapshot = metrics.record(duration, durationUnit, Outcome.ERROR);}// 校验失败率或者慢调用率是否超过了阈值return checkIfThresholdsExceeded(snapshot);}// 校验失败率或者慢调用率是否超过了阈值private Result checkIfThresholdsExceeded(Snapshot snapshot) {float failureRateInPercentage = getFailureRate(snapshot);float slowCallsInPercentage = getSlowCallRate(snapshot);// -1 表示 总请求数低于最小请求阈值 默认始终不熔断if (failureRateInPercentage == -1 || slowCallsInPercentage == -1) {return Result.BELOW_MINIMUM_CALLS_THRESHOLD;}// 失败和慢调用 阈值都满足if (failureRateInPercentage >= failureRateThreshold&& slowCallsInPercentage >= slowCallRateThreshold) {return Result.ABOVE_THRESHOLDS;}if (failureRateInPercentage >= failureRateThreshold) {return Result.FAILURE_RATE_ABOVE_THRESHOLDS;}if (slowCallsInPercentage >= slowCallRateThreshold) {return Result.SLOW_CALL_RATE_ABOVE_THRESHOLDS;}return Result.BELOW_THRESHOLDS;}private float getSlowCallRate(Snapshot snapshot) {int bufferedCalls = snapshot.getTotalNumberOfCalls();// 未达到最小请求数 返回-1if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {return -1.0f;}return snapshot.getSlowCallRate();}private float getFailureRate(Snapshot snapshot) {int bufferedCalls = snapshot.getTotalNumberOfCalls();// 未达到最小请求数 返回-1if (bufferedCalls == 0 || bufferedCalls < minimumNumberOfCalls) {return -1.0f;}return snapshot.getFailureRate();}
}

可以看到指标的 统计都是基于io.github.resilience4j.core.metrics.Metrics类,他有两种实现,一种是基于计数的滑动窗口,一种是基于时间的滑动窗口

public interface Metrics {/*** 负责记录一次请求的信息* @param duration     the duration of the call 执行时间* @param durationUnit the time unit of the duration 时间单位* @param outcome      the outcome of the call 执行结果*/Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome);/*** 获取当前度量指标快照信息*/Snapshot getSnapshot();enum Outcome {SUCCESS, ERROR, SLOW_SUCCESS, SLOW_ERROR}
}
滑动窗口算法

上面提到过,CircuitBreaker 总共有两种滑动窗口算法,一种是基于计数的滑动窗口,一种是基于时间的滑动窗口,接下来将会详细介绍下这两种算法,不过在此之前,容我先介绍几个类

  • AbstractAggregation 一个抽象聚合类,定义了失败数、总执行时间、慢调用数、慢失败数、总调用数
  • Total Aggregation 在AbstractAggregation 的基础上 新定义了一个方法,可以removeBucket
  • Measurement 在AbstractAggregation基础上,新定义了一个reset方法 负责重置数据
  • PartialAggregation 在AbstractAggregation基础上,新定义了一个变量 epochSecond 负责记录当前的秒级别数,reset 方法依旧是重置所有数据
基于计数的滑动窗口实现:io.github.resilience4j.core.metrics.FixedSizeSlidingWindowMetrics

基于计数的滑动窗口由N个测量值的圆形数组实现。
如果计数窗口大小为10,则圆形阵列始终具有10个测量值。
滑动窗口以增量方式更新总聚合。当记录新的呼叫结果时,将更新总汇总。收回最旧的度量后,将从总聚合中减去该度量,然后重置存储桶。(逐项扣除)
快照的获取时间为常数O(1),因为快照是预先聚合的,并且与窗口大小无关。
此实现的空间需求(内存消耗)应为O(n)

public class FixedSizeSlidingWindowMetrics implements Metrics {// 滑动窗口大小private final int windowSize;// 指标聚合信息 包含 总执行时间,慢调用数,慢调用失败数,失败数,调用数private final TotalAggregation totalAggregation;// 指标数组private final Measurement[] measurements;// 当前滑动窗口位置int headIndex;public FixedSizeSlidingWindowMetrics(int windowSize) {this.windowSize = windowSize;this.measurements = new Measurement[this.windowSize];this.headIndex = 0;// 初始化指标数组for (int i = 0; i < this.windowSize; i++) {measurements[i] = new Measurement();}this.totalAggregation = new TotalAggregation();}// 加锁 保证线程安全 record 就是调用结束后 回调的@Overridepublic synchronized Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome) {totalAggregation.record(duration, durationUnit, outcome);moveWindowByOne().record(duration, durationUnit, outcome);return new SnapshotImpl(totalAggregation);}// 加锁 保证线程安全public synchronized Snapshot getSnapshot() {return new SnapshotImpl(totalAggregation);}private Measurement moveWindowByOne() {// 后移当前滑动窗口位置moveHeadIndexByOne();// 获取旧指标信息Measurement latestMeasurement = getLatestMeasurement();// 从总聚合中减去旧指标totalAggregation.removeBucket(latestMeasurement);// 重置当前指标latestMeasurement.reset();return latestMeasurement;}private Measurement getLatestMeasurement() {return measurements[headIndex];}// 后移当前滑动窗口位置void moveHeadIndexByOne() {this.headIndex = (headIndex + 1) % windowSize;}
}
基于时间的滑动窗口实现: io.github.resilience4j.core.metrics.SlidingTimeWindowMetrics

基于时间的滑动窗口是由N个部分集合(存储桶)的圆形数组实现的。
如果时间窗口大小为10秒,则圆形数组始终具有10个部分聚合(存储桶)。每个存储桶都会汇总在某个时期内发生的所有调用的结果。(部分聚集)。圆形数组的头存储区存储当前纪元的呼叫结果。其他部分聚合存储前几秒的呼叫结果。
滑动窗口不会单独存储呼叫结果(元组),而是以增量方式更新部分聚合(存储桶)和总聚合。
当记录新的呼叫结果时,总聚合将增量更新。当最旧的存储桶被收回时,该存储桶的部分总聚合将从总聚合中减去,然后重置该存储桶。(逐项扣除)
快照的获取时间为常数O(1),因为快照是预先聚合的,并且与时间窗口的大小无关。
此实现的空间需求(内存消耗)应接近常数O(n),因为调用结果(元组)不是单独存储的。仅创建N个部分聚合和1个总聚合。
部分聚合由3个整数组成,以计算失败的呼叫数,慢速呼叫数和总呼叫数。一个长存储所有呼叫的总持续时间。

CircuitBreakerEvent

Resilience4j的事件机制采用的是观察者设计模式,核心类在io.github.resilience4j.core包下

  • EventConsumer<T> 事件接口消费者(观察者)
  • EventPublisher<T>事件接口发布者(被观察者)
  • EventProcessor<T> 实现了EventPublisher接口,封装了 消费者EventConsumer注册接口和负责处理对应事件通知

CircuitBreakerEvent 事件类型

目前共有八种:

  • CircuitBreakerOnSuccessEvent 请求调用成功时发布的事件
  • CircuitBreakerOnErrorEvent 请求调用失败时发布的事件
  • CircuitBreakerOnResetEvent 断路器重置时发布的事件
  • CircuitBreakerOnFailureRateExceededEvent 当达到失败率时发布的事件
  • CircuitBreakerOnIgnoredErrorEvent 当忽略的调用异常发生时发布的事件
  • CircuitBreakerOnSlowCallRateExceededEvent 当达到慢调用率时发布的事件
  • CircuitBreakerOnCallNotPermittedEvent 当请求不被允许调用时发布的事件
  • CircuitBreakerOnStateTransitionEvent 断路器状态转换时发布的事件

对应枚举可以参考CircuitBreakerEvent.Type

enum Type {ERROR(false),IGNORED_ERROR(false),SUCCESS(false),NOT_PERMITTED(false),STATE_TRANSITION(true),RESET(true),FORCED_OPEN(false),DISABLED(false),FAILURE_RATE_EXCEEDED(false),SLOW_CALL_RATE_EXCEEDED(false);public final boolean forcePublish;Type(boolean forcePublish) {this.forcePublish = forcePublish;}
}

image-20220914173149270

EventPublisher 中定义了八种注册事件处理方法

interface EventPublisher extendsio.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);EventPublisher onFailureRateExceeded(EventConsumer<CircuitBreakerOnFailureRateExceededEvent> eventConsumer);EventPublisher onSlowCallRateExceeded(EventConsumer<CircuitBreakerOnSlowCallRateExceededEvent> eventConsumer);
}

CircuitBreaker 断路器

最后 我们再回到我们的主角 CircuitBreaker来瞅瞅。

image-20220915161436952

注:该图省略了很多方法,具体使用什么请具体分析,这里只列举了一些常用的,用于分析;State、Metrics、StateTransition、EventPublisher 在前面都有提到。具体往前面翻翻吧

public interface CircuitBreaker {static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker,CheckedFunction0<T> supplier) {return () -> {// 尝试获取执行权限 如果没权限会抛出 CallNotPermittedException 异常 (此时处于熔断状态)circuitBreaker.acquirePermission();final long start = circuitBreaker.getCurrentTimestamp();try {// 执行T result = supplier.apply();long duration = circuitBreaker.getCurrentTimestamp() - start;// 执行成功后 则记录状态。内部会根据config配置的 recordResultPredicate 检测 result 是否是需要统计失败率circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);return result;} catch (Exception exception) {// Do not handle java.lang.Errorlong duration = circuitBreaker.getCurrentTimestamp() - start;// 异常状态下,记录失败状态,计算失败率circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);throw exception;}};}default <T> T executeCheckedSupplier(CheckedFunction0<T> checkedSupplier) throws Throwable {return decorateCheckedSupplier(this, checkedSupplier).apply();}// 这里负责创建一个CircuitBreaker 实例static CircuitBreaker of(String name, CircuitBreakerConfig circuitBreakerConfig) {// 熟悉吗 return new CircuitBreakerStateMachine(name, circuitBreakerConfig);}// 接下来是几个权限接口boolean tryAcquirePermission();// 和tryAcquirePermission区别不同在于 没权限 该方法会抛出 CallNotPermittedException 异常void acquirePermission();// 释放权限void releasePermission();// decorateCheckedSupplier 方法中会调这三个方法哦void onSuccess(long duration, TimeUnit durationUnit);void onError(long duration, TimeUnit durationUnit, Throwable throwable);void onResult(long duration, TimeUnit durationUnit, Object result);// 该方法负责将状态重置为CLOSED状态void reset();
}

接下来分析下 CircuitBreakerStateMachine 中的几个方法

public final class CircuitBreakerStateMachine implements CircuitBreaker {// 保证状态的原子性,初始状态为关闭状态private final AtomicReference<CircuitBreakerState> stateReference;// xxxPermission方法就没必要分析了 都是调用 CircuitBreakerState的方法public boolean tryAcquirePermission() {boolean callPermitted = stateReference.get().tryAcquirePermission();if (!callPermitted) {publishCallNotPermittedEvent();}return callPermitted;}public void releasePermission() {stateReference.get().releasePermission();}public void acquirePermission() {try {stateReference.get().acquirePermission();} catch (Exception e) {publishCallNotPermittedEvent();throw e;}}public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {Throwable cause = throwable.getCause();handleThrowable(duration, durationUnit, cause);} else {handleThrowable(duration, durationUnit, throwable);}}private void handleThrowable(long duration, TimeUnit durationUnit, Throwable throwable) {if (circuitBreakerConfig.getIgnoreExceptionPredicate().test(throwable)) {releasePermission();publishCircuitIgnoredErrorEvent(name, duration, durationUnit, throwable);} else if (circuitBreakerConfig.getRecordExceptionPredicate().test(throwable)) {publishCircuitErrorEvent(name, duration, durationUnit, throwable);stateReference.get().onError(duration, durationUnit, throwable);} else {publishSuccessEvent(duration, durationUnit);stateReference.get().onSuccess(duration, durationUnit);}}public void onSuccess(long duration, TimeUnit durationUnit) {publishSuccessEvent(duration, durationUnit);stateReference.get().onSuccess(duration, durationUnit);}public void onResult(long duration, TimeUnit durationUnit, @Nullable Object result) {if (result != null && circuitBreakerConfig.getRecordResultPredicate().test(result)) {ResultRecordedAsFailureException failure = new ResultRecordedAsFailureException(name, result);publishCircuitErrorEvent(name, duration, durationUnit, failure);stateReference.get().onError(duration, durationUnit, failure);} else {onSuccess(duration, durationUnit);}}
}

这篇关于Resilience4j.Circuitbreaker源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/274383

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。