本文主要是介绍Ali-Sentinel-热点流控,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
归档
- GitHub: Ali-Sentinel-热点流控
测试
-
控制台启动参考:WebUI-更新规则-控制台
-
模块:
sentinel-demo-cluster-embedded
-
com.alibaba.csp.sentinel.demo.cluster.app.ClusterDemoApplication
- 更改如下:
@SpringBootApplication
public class ClusterDemoApplication {public static void main(String[] args) {System.setProperty("csp.sentinel.dashboard.server", "127.0.0.1:8080");System.setProperty("project.name", "My-Cluster-8866");System.setProperty("server.port", "10010");SpringApplication.run(ClusterDemoApplication.class, args);}
}
com.alibaba.csp.sentinel.demo.cluster.app.service.DemoService
- 更改如下:
@Service
public class DemoService {@SentinelResource(value = "sayHello", // 设置资源名,不设置的话会用方法全称blockHandler = "sayHelloBlockHandler")public String sayHello(String name) {return "Hello, " + name;}public String sayHelloBlockHandler(String name, BlockException ex) {return String.format("Oops, [%s] blocked by Sentinel", name);}
}
-
启动后
- 访问:http://localhost:10010/hello/test1
-
在控制台设置规则
- 点击 簇点链路:http://127.0.0.1:8080/#/dashboard/identity/My-Cluster-8866
- 在
sayHello
列,点击 + 热点:- 参数索引 设置为:
0
- 单机阈值 设置为:
20
- 其他使用默认值
- 参数索引 设置为:
- 点击 新增
// POST http://127.0.0.1:8080//paramFlow/rule {"app": "My-Cluster-8866","ip": "10.32.51.130","port": "8720","rule": {"resource": "sayHello","grade": 1,"paramFlowItemList": [],"count": 20,"limitApp": "default","controlBehavior": 0,"durationInSec": 1,"burstCount": 0,"maxQueueingTimeMs": 0,"clusterMode": false,"clusterConfig": {"thresholdType": 0,"fallbackToLocalWhenFail": true},"paramIdx": 0} }
- 在
- 新增完会跳转到 热点规则:http://127.0.0.1:8080/#/dashboard/paramFlow/My-Cluster-8866
- 点击 编辑
- 再点 高级选项
- 参数类型 选择:
java.lang.String
- 参数值 设置为:
test1
- 限流阈值 设置为:
2
- 点击 + 添加
- 参数类型 选择:
- 点击 保存
// PUT http://127.0.0.1:8080//paramFlow/rule/2 {"id": 2,"app": "My-Cluster-8866","ip": "10.32.51.130","port": 8720,"rule": {"id": null,"resource": "sayHello","limitApp": "default","grade": 1,"paramIdx": 0,"count": 20,"controlBehavior": 0,"maxQueueingTimeMs": 0,"burstCount": 0,"durationInSec": 1,"paramFlowItemList": [{"object": "test1","classType": "java.lang.String","count": 2}],"clusterMode": false,"clusterConfig": {"flowId": null,"thresholdType": 0,"fallbackToLocalWhenFail": true,"sampleCount": 10,"windowIntervalMs": 1000}},"gmtCreate": null,"gmtModified": null }
- 点击 簇点链路:http://127.0.0.1:8080/#/dashboard/identity/My-Cluster-8866
-
再测试
- 连续刷 3 次:http://localhost:10010/hello/test1
- 出现:
Oops, [test1] blocked by Sentinel
原理
-
模块:
sentinel-parameter-flow-control
- 其 SPI 配置文件
../META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
- 内容为:
com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
- 加入处理链参考:入口控制-进入资源 sign_m_003
- 内容为:
- 其 SPI 配置文件
-
com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
- 参考:入口控制-链处理进入 sign_m_021
@Spi(order = -3000)
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {... // 如果资源没设置热点规则,则传给下个节点并返回checkFlow(resourceWrapper, count, args); // 检测规则,ref: sign_m_100fireEntry(context, resourceWrapper, node, count, prioritized, args); // 传给下游}// sign_m_100 检测规则void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {... // args 为 null 校验 & 资源是否设置有规则校验List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());for (ParamFlowRule rule : rules) {applyRealParamIdx(rule, args.length); // 重新设置下索引,ref: sign_m_110// 初始化指标 (不存在则创建,存在则不处理也不重置),ref: sign_m_150ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) { // 规则检验 (不通过则报错),ref: sign_m_210String triggeredParam = "";... // 从参数数组里获取校验索引的值,即: = args[idx]throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);}}}/*** sign_m_110 设置正确的索引:* * 1. len: 5, idx: -3 (倒数第 3 个)* idx = 5 + (-3) = 2* * 2. len: 5, idx: -6 (倒数第 6 个,无效索引)* idx = -(-6) = 6*/void applyRealParamIdx(ParamFlowRule rule, int length) {int paramIdx = rule.getParamIdx();if (paramIdx < 0) {if (-paramIdx <= length) {rule.setParamIdx(length + paramIdx);} else {// 非法索引,给它一个非法的正值,后面的规则检查就会通过。ref: sign_m_210rule.setParamIdx(-paramIdx);}}}
}
参数计量指标
com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetricStorage
// 指标存储器
public final class ParameterMetricStorage {// 存储 mapprivate static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();// sign_m_150 初始化指标 (不存在则创建,存在则不处理也不重置)public static void initParamMetricsFor(ResourceWrapper resourceWrapper, ParamFlowRule rule) {... // 资源或名称为空,返回String resourceName = resourceWrapper.getName();ParameterMetric metric; // 资源名对应的指标已存在,则不创建if ((metric = metricsMap.get(resourceName)) == null) {synchronized (LOCK) { // DCLif ((metric = metricsMap.get(resourceName)) == null) {metric = new ParameterMetric(); // 创建指标metricsMap.put(resourceName, metric);}}}metric.initialize(rule); // 初始化规则相关数据,ref: sign_m_160 }
}
com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric
- 线程数指标计量参考:链路控制-StatisticSlot sign_call_100 | sign_call_200
// 对应的是单个资源的(参数)指标
public class ParameterMetric {private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();// sign_m_160 初始化规则相关数据 (不存在则创建,存在则不处理也不重置)public void initialize(ParamFlowRule rule) {// 用规则作 keyif (!ruleTimeCounters.containsKey(rule)) {synchronized (lock) { // DCLif (ruleTimeCounters.get(rule) == null) {long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);// ConcurrentLinkedHashMapWrapper 只是对 ConcurrentLinkedHashMap 进行封装,// 其 ref: https://blog.csdn.net/rockvine/article/details/122852332ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));}}}if (!ruleTokenCounter.containsKey(rule)) {... // 类似上面初始化}/*** 用规则参数的索引作 key* 对应的指标计量参考: ParamFlowStatisticEntryCallback 和 ParamFlowStatisticExitCallback* 回调参考: 链路控制-StatisticSlot sign_call_100 | sign_call_200*/if (!threadCountMap.containsKey(rule.getParamIdx())) {... // 类似上面初始化}}
}
热点参数流控校验器
com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker
public final class ParamFlowChecker {// sign_m_210 规则检验public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,Object... args) {... // args 为 null,返回 true 表示校验通过int paramIdx = rule.getParamIdx();... // args 长度不满足,返回 true 表示校验通过,对应上面: sign_m_110Object value = args[paramIdx];... // value 为 ParamFlowArgument 时,使用其 paramFlowKey... // value 为 null,返回 true 表示校验通过... // 集群模式校验return passLocalCheck(resourceWrapper, rule, count, value); // 本地校验,ref: sign_m_212}// sign_m_212 本地校验private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,Object value) {try {if (Collection.class.isAssignableFrom(value.getClass())) {... // 是集合,则对集合里的每个元素进行校验} else if (value.getClass().isArray()) {... // 是数组,则对数组的每个元素进行校验} else {return passSingleValueCheck(resourceWrapper, rule, count, value); // 单个值校验,ref: sign_m_213}} ... // catch return true; // 校验过程中出现异常,则当作"通过"}// sign_m_213 单个值校验static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);} else {// 默认是 CONTROL_BEHAVIOR_DEFAULT,看懂这个,其他 2 个逻辑也会觉得简单,且目前 Web 还不支持其他 2 种设置// 使用默认校验,ref: sign_m_214return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);}} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {... // 通过线程数进行流控}return true;}// sign_m_214 默认校验static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {ParameterMetric metric = getParameterMetric(resourceWrapper);CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);... // 规则没有对应的 tokenCounters 或 timeCounters,则返回 true 表示校验通过// 计算最大 token 数(阈值)// rule.getParsedHotItems() 相当于: {"test1": 2}Set<Object> exclusionItems = rule.getParsedHotItems().keySet();long tokenCount = (long) rule.getCount(); // 先用规则阈值if (exclusionItems.contains(value)) { // 有参数例外项,就用单独设置的阈值tokenCount = rule.getParsedHotItems().get(value);}... // tokenCount 为 0,直接不通过long maxCount = tokenCount + rule.getBurstCount(); // 加上突发设置数 (def: 0)... // maxCount 判断while (true) {long currentTime = TimeUtil.currentTimeMillis();AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime)); // sign_step_001if (lastAddTokenTime == null) {// 令牌从未添加,只需补充令牌并立即消耗 acquireCount 即可。tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); // sign_step_002return true; // 通过}// 计算自上次添加令牌以来的持续时间。long passTime = currentTime - lastAddTokenTime.get();// 一种简化的令牌桶算法,仅当统计窗口过去时才会补充令牌。if (passTime > rule.getDurationInSec() * 1000) { // 超出时间窗口AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); // sign_step_003if (oldQps == null) {/*** 线程 T1 执行完 sign_step_001,还没执行 sign_step_002;* 线程 T2 执行 sign_step_001,lastAddTokenTime 不为 null,* 在执行 sign_step_003 时,oldQps 就会为 null,因此会进入此 if。*/lastAddTokenTime.set(currentTime); // 重置时间窗口 (确实可能不准)return true;} else {long restQps = oldQps.get();/*** 5s 后再请求进来,则 passTime = 5000;* 参数 acquireCount = 1;* 则:* toAddCount = (5000 * 2) / (1 * 1000) = 10;* newQps = (10 + 1) > 2 ? (2 - 1) : (1 + 10 - 1) = 1;* 相当于:最新剩余的 token 数。*/long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount): (restQps + toAddCount - acquireCount);if (newQps < 0) {return false; // 没剩余,不通过}if (oldQps.compareAndSet(restQps, newQps)) {lastAddTokenTime.set(currentTime);return true; // CAS 成功,通过}Thread.yield(); // CAS 失败,再循环一次}} else { // 在时间窗口内AtomicLong oldQps = tokenCounters.get(value); // 旧的剩余 token 数 (别被命名影响)if (oldQps != null) {long oldQpsValue = oldQps.get();if (oldQpsValue - acquireCount >= 0) { // 能满足请求if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {return true; // CAS 成功则通过}} else {return false; // 剩余 token 数不足,不通过}}Thread.yield(); // 并发时,上面的 CAS 不成功,则休息下,再循环一次}}}}
这篇关于Ali-Sentinel-热点流控的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!