Ali-Sentinel-热点流控

2024-05-13 13:04
文章标签 sentinel ali 热点 流控

本文主要是介绍Ali-Sentinel-热点流控,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

归档

  • GitHub: Ali-Sentinel-热点流控

测试

  • 控制台启动参考:WebUI-更新规则-控制台

  • 模块:sentinel-demo-cluster-embedded

  • com.alibaba.csp.sentinel.demo.cluster.app.ClusterDemoApplication

    • 更改如下:
@SpringBootApplication
public class ClusterDemoApplication {public static void main(String[] args) {System.setProperty("csp.sentinel.dashboard.server", "127.0.0.1:8080");System.setProperty("project.name", "My-Cluster-8866");System.setProperty("server.port", "10010");SpringApplication.run(ClusterDemoApplication.class, args);}
}
  • com.alibaba.csp.sentinel.demo.cluster.app.service.DemoService
    • 更改如下:
@Service
public class DemoService {@SentinelResource(value = "sayHello", // 设置资源名,不设置的话会用方法全称blockHandler = "sayHelloBlockHandler")public String sayHello(String name) {return "Hello, " + name;}public String sayHelloBlockHandler(String name, BlockException ex) {return String.format("Oops, [%s] blocked by Sentinel", name);}
}
  • 启动后

    • 访问:http://localhost:10010/hello/test1
  • 在控制台设置规则

    • 点击 簇点链路:http://127.0.0.1:8080/#/dashboard/identity/My-Cluster-8866
      • sayHello 列,点击 + 热点
        • 参数索引 设置为:0
        • 单机阈值 设置为:20
        • 其他使用默认值
      • 点击 新增
        // POST http://127.0.0.1:8080//paramFlow/rule
        {"app": "My-Cluster-8866","ip": "10.32.51.130","port": "8720","rule": {"resource": "sayHello","grade": 1,"paramFlowItemList": [],"count": 20,"limitApp": "default","controlBehavior": 0,"durationInSec": 1,"burstCount": 0,"maxQueueingTimeMs": 0,"clusterMode": false,"clusterConfig": {"thresholdType": 0,"fallbackToLocalWhenFail": true},"paramIdx": 0}
        }
        
    • 新增完会跳转到 热点规则:http://127.0.0.1:8080/#/dashboard/paramFlow/My-Cluster-8866
      • 点击 编辑
      • 再点 高级选项
        • 参数类型 选择:java.lang.String
        • 参数值 设置为:test1
        • 限流阈值 设置为:2
        • 点击 + 添加
      • 点击 保存
        // PUT http://127.0.0.1:8080//paramFlow/rule/2
        {"id": 2,"app": "My-Cluster-8866","ip": "10.32.51.130","port": 8720,"rule": {"id": null,"resource": "sayHello","limitApp": "default","grade": 1,"paramIdx": 0,"count": 20,"controlBehavior": 0,"maxQueueingTimeMs": 0,"burstCount": 0,"durationInSec": 1,"paramFlowItemList": [{"object": "test1","classType": "java.lang.String","count": 2}],"clusterMode": false,"clusterConfig": {"flowId": null,"thresholdType": 0,"fallbackToLocalWhenFail": true,"sampleCount": 10,"windowIntervalMs": 1000}},"gmtCreate": null,"gmtModified": null
        }
        
  • 再测试

    • 连续刷 3 次:http://localhost:10010/hello/test1
    • 出现:Oops, [test1] blocked by Sentinel

原理

  • 模块:sentinel-parameter-flow-control

    • 其 SPI 配置文件 ../META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
      • 内容为:com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
      • 加入处理链参考:入口控制-进入资源 sign_m_003
  • com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot

    • 参考:入口控制-链处理进入 sign_m_021
@Spi(order = -3000)
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {... // 如果资源没设置热点规则,则传给下个节点并返回checkFlow(resourceWrapper, count, args);                                // 检测规则,ref: sign_m_100fireEntry(context, resourceWrapper, node, count, prioritized, args);    // 传给下游}// sign_m_100 检测规则void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {... // args 为 null 校验 & 资源是否设置有规则校验List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());for (ParamFlowRule rule : rules) {applyRealParamIdx(rule, args.length);   // 重新设置下索引,ref: sign_m_110// 初始化指标 (不存在则创建,存在则不处理也不重置),ref: sign_m_150ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {  // 规则检验 (不通过则报错),ref: sign_m_210String triggeredParam = "";... // 从参数数组里获取校验索引的值,即: = args[idx]throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);}}}/*** sign_m_110 设置正确的索引:* *   1. len: 5, idx: -3 (倒数第 3 个)*      idx = 5 + (-3) = 2* *   2. len: 5, idx: -6 (倒数第 6 个,无效索引)*      idx = -(-6) = 6*/void applyRealParamIdx(ParamFlowRule rule, int length) {int paramIdx = rule.getParamIdx();if (paramIdx < 0) {if (-paramIdx <= length) {rule.setParamIdx(length + paramIdx);} else {// 非法索引,给它一个非法的正值,后面的规则检查就会通过。ref: sign_m_210rule.setParamIdx(-paramIdx);}}}
}

参数计量指标

  • com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetricStorage
// 指标存储器
public final class ParameterMetricStorage {// 存储 mapprivate static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();// sign_m_150 初始化指标 (不存在则创建,存在则不处理也不重置)public static void initParamMetricsFor(ResourceWrapper resourceWrapper, ParamFlowRule rule) {... // 资源或名称为空,返回String resourceName = resourceWrapper.getName();ParameterMetric metric;     // 资源名对应的指标已存在,则不创建if ((metric = metricsMap.get(resourceName)) == null) {synchronized (LOCK) {   // DCLif ((metric = metricsMap.get(resourceName)) == null) {metric = new ParameterMetric(); // 创建指标metricsMap.put(resourceName, metric);}}}metric.initialize(rule);    // 初始化规则相关数据,ref: sign_m_160 }
}
  • com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric
    • 线程数指标计量参考:链路控制-StatisticSlot sign_call_100 | sign_call_200
// 对应的是单个资源的(参数)指标
public class ParameterMetric {private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();// sign_m_160 初始化规则相关数据 (不存在则创建,存在则不处理也不重置)public void initialize(ParamFlowRule rule) {// 用规则作 keyif (!ruleTimeCounters.containsKey(rule)) {synchronized (lock) {   // DCLif (ruleTimeCounters.get(rule) == null) {long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);// ConcurrentLinkedHashMapWrapper 只是对 ConcurrentLinkedHashMap 进行封装,// 其 ref: https://blog.csdn.net/rockvine/article/details/122852332ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));}}}if (!ruleTokenCounter.containsKey(rule)) {... // 类似上面初始化}/*** 用规则参数的索引作 key* 对应的指标计量参考: ParamFlowStatisticEntryCallback 和 ParamFlowStatisticExitCallback*          回调参考: 链路控制-StatisticSlot sign_call_100 | sign_call_200*/if (!threadCountMap.containsKey(rule.getParamIdx())) {... // 类似上面初始化}}
}

热点参数流控校验器

  • com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker
public final class ParamFlowChecker {// sign_m_210 规则检验public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,Object... args) {... // args 为 null,返回 true 表示校验通过int paramIdx = rule.getParamIdx();... // args 长度不满足,返回 true 表示校验通过,对应上面: sign_m_110Object value = args[paramIdx];... // value 为 ParamFlowArgument 时,使用其 paramFlowKey... // value 为 null,返回 true 表示校验通过... // 集群模式校验return passLocalCheck(resourceWrapper, rule, count, value); // 本地校验,ref: sign_m_212}// sign_m_212 本地校验private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,Object value) {try {if (Collection.class.isAssignableFrom(value.getClass())) {... // 是集合,则对集合里的每个元素进行校验} else if (value.getClass().isArray()) {... // 是数组,则对数组的每个元素进行校验} else {return passSingleValueCheck(resourceWrapper, rule, count, value);   // 单个值校验,ref: sign_m_213}} ... // catch return true;    // 校验过程中出现异常,则当作"通过"}// sign_m_213 单个值校验static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);} else {// 默认是 CONTROL_BEHAVIOR_DEFAULT,看懂这个,其他 2 个逻辑也会觉得简单,且目前 Web 还不支持其他 2 种设置// 使用默认校验,ref: sign_m_214return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);}} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {... // 通过线程数进行流控}return true;}// sign_m_214 默认校验static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {ParameterMetric metric = getParameterMetric(resourceWrapper);CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);... // 规则没有对应的 tokenCounters 或 timeCounters,则返回 true 表示校验通过// 计算最大 token 数(阈值)// rule.getParsedHotItems() 相当于: {"test1": 2}Set<Object> exclusionItems = rule.getParsedHotItems().keySet();long tokenCount = (long) rule.getCount();   // 先用规则阈值if (exclusionItems.contains(value)) {       // 有参数例外项,就用单独设置的阈值tokenCount = rule.getParsedHotItems().get(value);}... // tokenCount 为 0,直接不通过long maxCount = tokenCount + rule.getBurstCount();  // 加上突发设置数 (def: 0)... // maxCount 判断while (true) {long currentTime = TimeUtil.currentTimeMillis();AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime)); // sign_step_001if (lastAddTokenTime == null) {// 令牌从未添加,只需补充令牌并立即消耗 acquireCount 即可。tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));  // sign_step_002return true;    // 通过}// 计算自上次添加令牌以来的持续时间。long passTime = currentTime - lastAddTokenTime.get();// 一种简化的令牌桶算法,仅当统计窗口过去时才会补充令牌。if (passTime > rule.getDurationInSec() * 1000) {    // 超出时间窗口AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));  // sign_step_003if (oldQps == null) {/*** 线程 T1 执行完 sign_step_001,还没执行 sign_step_002;* 线程 T2 执行   sign_step_001,lastAddTokenTime 不为 null,*         在执行 sign_step_003 时,oldQps 就会为 null,因此会进入此 if。*/lastAddTokenTime.set(currentTime);  // 重置时间窗口 (确实可能不准)return true;} else {long restQps = oldQps.get();/*** 5s 后再请求进来,则 passTime = 5000;*    参数 acquireCount = 1;* 则:*   toAddCount = (5000 * 2) / (1 * 1000) = 10;*   newQps = (10 + 1) > 2 ? (2 - 1) : (1 + 10 - 1) = 1;*          相当于:最新剩余的 token 数。*/long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount): (restQps + toAddCount - acquireCount);if (newQps < 0) {return false;   // 没剩余,不通过}if (oldQps.compareAndSet(restQps, newQps)) {lastAddTokenTime.set(currentTime);return true;    // CAS 成功,通过}Thread.yield();     // CAS 失败,再循环一次}} else {    // 在时间窗口内AtomicLong oldQps = tokenCounters.get(value);   // 旧的剩余 token 数 (别被命名影响)if (oldQps != null) {long oldQpsValue = oldQps.get();if (oldQpsValue - acquireCount >= 0) {      // 能满足请求if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {return true;                        // CAS 成功则通过}} else {return false;                           // 剩余 token 数不足,不通过}}Thread.yield(); // 并发时,上面的 CAS 不成功,则休息下,再循环一次}}}}

这篇关于Ali-Sentinel-热点流控的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/985781

相关文章

Sentinel 高可用流量管理框架

Sentinel 是面向分布式服务架构的高可用流量防护组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。 Sentinel 具有以下特性: 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应

[环境配置]ubuntu20.04安装后wifi有图标但是搜不到热点解决方法

最近刚入手一台主机,暗影精灵8plus电竞主机,安装ubuntu后wifi怎么都搜不到热点,前后重装系统6次才算解决问题。这个心酸历程只有搞技术人才明白。下面介绍我解决过程。 首先主机到手后是个windows10系统,我用无线网连接了一下,可以正常上网,说明主机有无限网卡且正常。然后我就直接开始安装Ubuntu20.04了,安装成功后发现wifi有图标但是搜不到热点,我想是不是无线网卡驱动有没有

Redis Sentinel 深度解析:构建高可用性 Redis 集群

Redis Sentinel 深度解析:构建高可用性 Redis 集群 一 . 基本概念1.1 相关名词解释1.2 如何人工恢复主节点故障 ?1.3 哨兵自动恢复主节点故障 二 . 哨兵的安装部署2.1 安装 docker 和 docker-compose2.2 搭建 Redis 的哨兵环境2.2.1 编排 Redis 的主从节点2.2.2 编排 redis-sentinel 节点2.2.3

【Redis】Redis Sentinel(哨兵)系统:自动故障恢复与高可用性配置全解

目录 哨兵 (Sentinel)基本概念主从复制的问题⼈⼯恢复主节点故障哨兵⾃动恢复主节点故障 安装部署 (基于 docker)准备⼯作 以下部分是独立于这一章节的Docker安装Server版本安装CentOS安装实战经验 GUI版本安装(以windows 11为例)安装docker 以上部分是独立于这一章节的重新选举redis-master 宕机之后redis-master 重启之

Class4——Esp32|Thonny两种方式同过电脑控制LED灯,路由器与电脑自带热点连接ESP32

上一节我们通过路由器和设备创建了连接,不懂可按上节配置 Class3——Esp32|Thonny——网络连接主机-wifi连接(源代码带教程)-CSDN博客文章浏览阅读57次。Esp32|Thonny网络连接主机-wifi连接(源代码带教程)https://blog.csdn.net/m0_66701835/article/details/141960572?spm=1001.2014.3001

springboot项目引入Sentinel熔断

本文是springboot项目+nacos进行引入,sentiel需自行按照部署 1.springboot包要是2.2.5或以上 <dubbo.version>2.7.7</dubbo.version><spring-boot.version>2.2.5.RELEASE</spring-boot.version><chainwork-boot.version>1.0.5-SNAPSHOT<

Google Earth Engine——导入无云 Sentinel-2 图像和NDVI计算

目录 搜索和导入无云 Sentinel-2 图像 Sentinel-2 的背景 打开 GEE 界面 定义您感兴趣的领域 查询 Sentinel-2 图像的存档 过滤图像集合 将图像添加到地图视图 定义真彩色可视化参数 探索影像 定义假色可视化参数 从波段组合中导出指数 NDVI 锻炼 本实验的目的是介绍 Google Earth Engine 处理环境。在本练习

【计算机视觉前沿研究 热点 顶会】ECCV 2024中扩散模型有关的论文

神经辐射场修复的驯服潜在扩散模型 神经辐射场(NERF)是一种从多视角图像进行三维重建的表示法。尽管最近的一些工作表明,在编辑具有扩散先验的重建的 NERF 方面取得了初步成功,但他们仍然在努力在完全未覆盖的区域中合成合理的几何图形。一个主要原因是来自扩散模型的合成内容的高度多样性阻碍了辐射场收敛到清晰和确定的几何形状。此外,在实际数据上应用潜在扩散模型通常会产生与图像条件不一致的纹理漂移,这是

【计算机视觉前沿研究 热点 顶会】ECCV 2024中Mamba有关的论文

MambaIR:状态空间模型图像恢复的简单基线 近年来,图像恢复技术取得了长足的进步,这在很大程度上归功于现代深度神经网络的发展,如 CNN 和 Transformers。然而,现有的修复骨干往往面临全局接受域和高效计算之间的两难困境,阻碍了它们在实践中的应用。最近,选择性结构化状态空间模型,特别是改进的 Mamba 模型,在线性复杂度的长程依赖建模方面显示出了巨大的潜力,为解决上述困境提供了一

【2025考研英语高分写作:写作核心词汇】四、社会热点

1.健康 生理健康 physical health 心理健康 mental/psychological health/fitness 健康问题 health concern 亚健康 sub-health 公共卫生 public health 心态 state of mind 体育锻炼 physical exercises 均衡饮食 well-balanced diet 垃圾食品