流控神器Sentinel指南:深入分析流控原理(上)

2024-03-28 15:38

本文主要是介绍流控神器Sentinel指南:深入分析流控原理(上),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

        • 1. 加载规则
        • 2. 访问资源
          • 2.1 创建Conext
          • 2.2 创建SlotChain
          • 2.3 构造Entry
        • 3. NodeSelectorSlot
        • 4. ClusterBuilderSlot
        • 5. 流控核心 FlowSlot

1. 加载规则

上一篇文章中 https://blog.csdn.net/TheLudlows/article/details/84992863 ,我们队Sentinel的抽象概念进行粗略的描述,这篇文章在此基础上对Sentinel的原理进行更深一步的认识。

在Sentinel中要对资源进行保护需要先配置规则,规则包括流量控制规则、熔断降级规则、系统保护规则 以及授权规则。每一种规则
都有对应的xxxRuleManager工具类来加载生效。以FlowRule为例,一般设置规则的代码如下:

private static void initFlowQpsRule() {List<FlowRule> rules = new ArrayList<FlowRule>();FlowRule rule1 = new FlowRule();rule1.setResource(KEY);// set limit qps to 20rule1.setCount(20);rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);// 依据QPS限流//rule1.setLimitApp("default");rule1.setControlBehavior(CONTROL_BEHAVIOR_DEFAULT); // 流控模式,默认为直接拒绝rules.add(rule1);FlowRuleManager.loadRules(rules);
}

通过一个List来保存一些多个流控规则,可以是一个资源的多个流控规则,也可以是多个资源的多个流控规则。下面我们进入FlowRuleManage类,认识一下它的部分重要属性

private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,new NamedThreadFactory("sentinel-metrics-record-task", true));
private final static FlowPropertyListener listener = new FlowPropertyListener();
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();static {currentProperty.addListener(listener);scheduler.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
}
  • Map<String, List<FlowRule>> flowRules是用来保存所有的流控规则,key为资源名称,value为一个list对象。因为是
    全局共享的,使用的是ConcurrentHashMap。一个资源可以有多个流控规则,它们是通过链表保存,比如一个规则通过QPS控制,一个是通过线程数控制。
  • ScheduledExecutorService scheduler
  • FlowPropertyListener listener 是一个监听器,当规则变化,进行相应的操作
  • SentinelProperty<List<FlowRule>> currentProperty 此类包含配置的当前值,并负责在更新配置时通知所有属性监听器。

静态代码块中添加属性监听器,启动计划线程

我们继续看loadRules的实现:

public static void loadRules(List<FlowRule> rules) {currentProperty.updateValue(rules);
}

调用属性类的更新操作,将新的规则赋值给currentProperty的成员变量。

@Override
public void updateValue(T newValue) {if (isEqual(value, newValue)) {return;}value = newValue;for (PropertyListener<T> listener : listeners) {listener.configUpdate(newValue);}
}

然后通知监听器,即调用所有监听器的configUpdate方法:

@Override
public void configUpdate(List<FlowRule> value) {Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);if (rules != null) {flowRules.clear();flowRules.putAll(rules);}RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}

监听器的作用是把新的规则放入flowRules中。buildFlowRuleMap方法实现较长,但是逻辑不复杂,此处就不展开。该方法的作用是将旧规则和新规则合并,并且根据设置生成具体的Controller对相关,1.5版本之后更改为TrafficShapingController,大体的逻辑基本一样。

2. 访问资源

规则生效了之后,就可以保护资源了,一般访问资源的代码如下:

try{Entry entry = SphU.entry(name);// biz code
} catch (BlockException e1) {
} catch (Exception e2) {// biz exception
} finally {if (entry != null) {entry.exit();}
}

如果能够申请成功,则说明没有被限流,否则会抛出BlockException,表面已经被限流了。SphU和SphO是暴露给用户的API,它的内部没有真正意义上的逻辑,相当于Env的代理对象,Env即Environment的缩写,正如其含义,它是全局环境的抽象,进行一些初始化的操作:

public class Env {// 初始化Node构造器,用于构建Node tree。Node tree是用来对调用链进行限流public static final NodeBuilder nodeBuilder = new DefaultNodeBuilder();// 真正的访问资源入口public static final Sph sph = new CtSph();static {InitExecutor.doInit(); // 对实时数据传输的底层通信,规则数据持久化等进行初始化。}
}

doInit不是本文的重点,将会放在Sentinel传输实时数据的文章中介绍。我们继续看访问资源的主线。刚才讲到Sph才是资源的入口,它的实现类CtSph内部才是访问资源的具体逻辑。SphU中entry方法内部实现如下:

public static Entry entry(String name) throws BlockException {return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}

跟进CtSph的entry实现方法:

@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {StringResourceWrapper resource = new StringResourceWrapper(name, type);return entry(resource, count, args);
}

这里创建了资源的包装类,传递给entry的重载方法,这里其实有一个困惑了很久的疑问,访问的次数肯定远大于资源的数量,资源就那么固定的几个。没有必要每一个访问请求都创建一个新的ResourceWrapper对象,为什么不用不放入缓存中呢?这仅仅是个人理解不了的地方。
回到主线跟进:

 public Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}

内部逻辑可以划分为一下几步:

  • 获取当前访问资源的上下文
  • 全局保护是否开启
  • 根据包装过的资源对象获取对应的SlotChain
  • SlotChain的entry方法,真正让执行规则的地方,如果SlotChain的entry方法抛出了BlockException,则将该异常继续向上抛出,如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回。
2.1 创建Conext

上一篇文章中提到了Context,它是一次访问资源的过程。保存于ThreadLocal中,ContextUtil.getContext()就是从ThreadLocal中取出Context。那么NullContext是个什么?暂时我们先不用管它,到目前为止整个流程没有创建Context,取出来的肯定是null。ContextUtil有个非常重要的初始化代码块

static {// Cache the entrance node for default context.initDefaultContext();
}private static void initDefaultContext() {String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;// 构建第一个EntranceNodeEntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);// 添加至ROOT下Constants.ROOT.addChild(node);// 放置缓存,这个是全局共享的缓存,key为ContextName,value为EntranceNodecontextNameNodeMap.put(defaultContextName, node);
}

这里添加了默认的EntranceNode,并且一个defaultContextName为key。继续回到主线
进入MyContextUtil.myEnter方法去创建Context:

    /*** This class is used for skip context name checking.*/private final static class MyContextUtil extends ContextUtil {static Context myEnter(String name, String origin, EntryType type) {return trueEnter(name, origin);}}

入参为Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType(),第一个参数为Conext名称,此处使用常量。第二个是来源为空,第三个是访问资源的类型。
跟进trueEnter:

protected static Context trueEnter(String name, String origin) {// contextHolder为保存Conext的ThreadLocal对象Context context = contextHolder.get();if (context == null) {Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;// 从myEnter方法进来的,获取的node一定不为空,上面的初始化操作已经添加进去默认的entrancenode// 那什么情况下可能为空呢? 外部显示是调用Context.enter方法,输入的key不允许为CONTEXT_DEFAULT_NAME的值,其他值得话第一次调用肯定为nullDefaultNode node = localCacheNameMap.get(name);if (node == null) {if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {try {LOCK.lock();node = contextNameNodeMap.get(name);if (node == null) {if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);// Add entrance node.Constants.ROOT.addChild(node);Map<String, DefaultNode> newMap = new HashMap<String, DefaultNode>(contextNameNodeMap.size() + 1);newMap.putAll(contextNameNodeMap);newMap.put(name, node);contextNameNodeMap = newMap;}}} finally {LOCK.unlock();}}}// 将获取到的entrancenode用来构造一个Conextcontext = new Context(node, name);context.setOrigin(origin);contextHolder.set(context);}return context;
}

别看代码这么长,其实就是创建一个Context,并且关联一个entrancenode,然后放入threadlocal中。至此Context创建完毕。下图表示目前Context和EntranceNode、Root的关系
sentinel

2.2 创建SlotChain

Sentinel中有各种规则,每类规则对应一个slot,多个slot连接起来成为ProcessorSlotChain,

lookProcessChain方法如下:

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {// ProcessorSlot缓存,key为resourceProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {synchronized (LOCK) {chain = chainMap.get(resourceWrapper);if (chain == null) {// Entry size limit.if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {return null;}// 构建一个新的SlotChainchain = SlotChainProvider.newSlotChain();Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}return chain;
}

每个资源都对应一个slotchain,newSlotChain方法就是性构建一个DefaultProcessorSlotChain。下面我们看下slot类关系。
slotSlotChan和Netty中的Pipeline有些类似,而Slot像是Handler。DefaultProcessorSlotChain是如何添加Solt?它有一个默认的first和end 节点,在上篇中提到过,最终过调用此方法添加slot。

public ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();chain.addLast(new NodeSelectorSlot());chain.addLast(new ClusterBuilderSlot());chain.addLast(new LogSlot());chain.addLast(new StatisticSlot());chain.addLast(new SystemSlot());chain.addLast(new AuthoritySlot());chain.addLast(new FlowSlot());chain.addLast(new DegradeSlot());return chain;}
2.3 构造Entry

回到主线,当slotchain创建完毕,下面Entry e = new CtEntry(resourceWrapper, chain, context);开始构造Entry.

CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {super(resourceWrapper);this.chain = chain;this.context = context;setUpEntryFor(context);
}
// 父类Entry
public Entry(ResourceWrapper resourceWrapper) {this.resourceWrapper = resourceWrapper;// 创建时间,用于计算RTthis.createTime = TimeUtil.currentTimeMillis();
}

构造方法中主要是对resourceWrapper,chain,context进行赋值,setUpEntryFor是用来构造Entry调用链。

private void setUpEntryFor(Context context) {// The entry should not be associated to NullContext.if (context instanceof NullContext) {return;}this.parent = context.getCurEntry();if (parent != null) {((CtEntry)parent).child = this;}context.setCurEntry(this); // 
}

回顾Context的构造过程,curEntry属性在第一次构造并没有给它赋值,因此为null。然后属性curEntry为当前Entry对象。
中各个对象关系如下:
sentinel

3. NodeSelectorSlot

敲黑板!重点来了,当Entry构造完毕,开始了entry调用链,即通过SlotChain检测这个Entry是否能访问此资源。first节点后是NodeSelectorSlot, 它负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,说白了就是构建resource的DefaultNode节点,DefaultNode是用来保存某个资源实时流量数据的,一个资源可以有多个DefaultNode,比如下面代码:

Entry entry1 = SphU.entry(KEY);
//....
entry1.exit();ContextUtil.enter("entrance1", "appA"); //创建Entrance Node
Entry entry2 = SphU.entry(KEY);
//....
entry2.exit();

代码中两次请求同一个资源,使用了上下文不同,EntranceNode 也不同,同样DefaultNode也是不同。

NodeSelectorSlot的entry方法实现如下:

public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {// 以Context name 作为key保存DefaultNodeprivate volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)throws Throwable {// 第一次获取为空DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) {node = map.get(context.getName());if (node == null) {// 新创建一个DefaultNodenode = Env.nodeBuilder.buildTreeNode(resourceWrapper, null);HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;}// Build invocation tree((DefaultNode)context.getLastNode()).addChild(node);}}// 设置Entry当前节点context.setCurNode(node);// 触发下一个slotfireEntry(context, resourceWrapper, node, count, args);}
}
// Context#setCurNode
public Context setCurNode(Node node) {this.curEntry.setCurNode(node);return this;
}

NodeSelectorSlot 的entry方法主要流程为

  1. 获取当前上下文对应的DefaultNode,如果没有的话会为当前的调用新生成一个DefaultNode节点,它的作用是对资源进行各种统计度量以便进行流控;
  2. 将新创建的DefaultNode节点,添加到context中,作为curNode(entrance node / default node)的子节点;
  3. 将DefaultNode节点,添加到Entry中,作为curNode

Sentinel
注意一点,这里使用的是ContextName作为key来保存DefaultNode,开始笔者非常感到很困惑,回想一下ContextName决定EntranceNode,每个资源独享SlotChain,因此该NodeSelectorSlot也是属于某个资源,这里通过ResourceName保存,只是为了决定生成的DefaultNode挂在那个EntranceNode下。

不同的资源独享Slotchain,但是调用树是共享的。不同资源对应的DefaultNode也是独享的。这里确实有点绕。

4. ClusterBuilderSlot

NodeSelectorSlot的entry方法执行完之后,会调用fireEntry方法,此时会触发ClusterBuilderSlot的entry方法。
先看NodeSelectorSlot中的两个重要变量:

// 该静态变量 用来缓存所有的ClusterNode 
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap= new HashMap<ResourceWrapper, ClusterNode>();
// 每一个资源对应一个ClusterNode
private ClusterNode clusterNode = null;

注意clusterNode 为成员变量,那么如何为每一个资源都生成一个ClusterNode 呢? 答案是每一个资源都专享一个SlotChain,自然ClusterBuilderSlot也是资源专享的,因此它的成员变量也是资源专享的。

继续看它的entry实现:

 @Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)throws Throwable {if (clusterNode == null) {synchronized (lock) {if (clusterNode == null) {// Create the cluster node.clusterNode = Env.nodeBuilder.buildClusterNode();HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<ResourceWrapper, ClusterNode>(16);newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap;}}}//上面构建一个ClusterNode,添加至map中// 将ClusterNode和DefaultNode关联node.setClusterNode(clusterNode);// 构建OriginNode if (!"".equals(context.getOrigin())) {Node originNode = node.getClusterNode().getOriginNode(context.getOrigin());context.getCurEntry().setOriginNode(originNode);}fireEntry(context, resourceWrapper, node, count, args);
}

DefaultNode 的目的很明确,它是用来保存实时资源请求数据的,但是一个资源可能存在多个DefaultNode,那么如何统计该资源的全部实时数据呢?通过ClusterNode关联了一个资源的所有DefaultNode。一个资源只有一个ClusterNode,clusterNodeMap保存了所有资源的ClusterNode,因此很快的就能定位到ClusterNode。

前面分析到创建Context时origin属性是"",即空字符串。因此这里不会进入分支。那么什么情况下回进入呢?

当在入口有ContextUtil.enter(contextName, originName)这行代码,在此处会为当前Entry 对象设置来源节点。并且ClusterNode内部有一个originCountMap用来保存资源的所有来源。key为originName,value为StatisticNode。
Sentinel

5. 流控核心 FlowSlot

ClusterBuilderSlot之后是StatisticSlot,但是StatisticSlot是统计资源实时数据的,它内部直接调用下一个slot,然后根据返回值进行数据统计。

我们直接看流控相关的FlowSlot的entry方法。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)throws Throwable {FlowRuleManager.checkFlow(resourceWrapper, context, node, count);fireEntry(context, resourceWrapper, node, count, args);
}

回忆第一节的内容,FlowRuleManager类中flowRules中保存了所有资源的规则。一个resource作为key。跟进checkFlow

public static void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count)throws BlockException {List<FlowRule> rules = flowRules.get(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!rule.passCheck(context, node, count)) {throw new FlowException(rule.getLimitApp());}}}}

这里遍历所有的规则,如果不能通过跑出FlowException,继续跟进passCheck

    public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {// limitApp 默认是一个 “default”String limitApp = this.getLimitApp();if (limitApp == null) {return true;}// origin默认为空字符串String origin = context.getOrigin();//根据调用方和上下文以及FlowRule所配置的Strategy来获取应该用于限流的统计Node,默认获取的是前面构造的ClusterNodeNode selectedNode = selectNodeByRequesterAndStrategy(origin, context, node);// 如果没有合乎规则的Node,则直接返回true,表示通过if (selectedNode == null) {return true;}// 如果存在统计Node,则通过controller来判断是否需要限流// 这个controller通过设置FlowRule的controllerBehavior来区分// 默认的实现有:0. default, 1. warm up, 2. rate limiterreturn controller.canPass(selectedNode, acquireCount);}

正的限流逻辑藏在了FlowRule的controller里面,而这个controller有三种实现.

见下文-> https://blog.csdn.net/TheLudlows/article/details/85055032

这篇关于流控神器Sentinel指南:深入分析流控原理(上)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/856012

相关文章

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI

Spring Boot结成MyBatis-Plus最全配置指南

《SpringBoot结成MyBatis-Plus最全配置指南》本文主要介绍了SpringBoot结成MyBatis-Plus最全配置指南,包括依赖引入、配置数据源、Mapper扫描、基本CRUD操... 目录前言详细操作一.创建项目并引入相关依赖二.配置数据源信息三.编写相关代码查zsRArly询数据库数

SpringBoot启动报错的11个高频问题排查与解决终极指南

《SpringBoot启动报错的11个高频问题排查与解决终极指南》这篇文章主要为大家详细介绍了SpringBoot启动报错的11个高频问题的排查与解决,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一... 目录1. 依赖冲突:NoSuchMethodError 的终极解法2. Bean注入失败:No qu

JavaScript错误处理避坑指南

《JavaScript错误处理避坑指南》JavaScript错误处理是编程过程中不可避免的部分,它涉及到识别、捕获和响应代码运行时可能出现的问题,本文将详细给大家介绍一下JavaScript错误处理的... 目录一、错误类型:三大“杀手”与应对策略1. 语法错误(SyntaxError)2. 运行时错误(R

Python使用date模块进行日期处理的终极指南

《Python使用date模块进行日期处理的终极指南》在处理与时间相关的数据时,Python的date模块是开发者最趁手的工具之一,本文将用通俗的语言,结合真实案例,带您掌握date模块的六大核心功能... 目录引言一、date模块的核心功能1.1 日期表示1.2 日期计算1.3 日期比较二、六大常用方法详

MySQL中慢SQL优化方法的完整指南

《MySQL中慢SQL优化方法的完整指南》当数据库响应时间超过500ms时,系统将面临三大灾难链式反应,所以本文将为大家介绍一下MySQL中慢SQL优化的常用方法,有需要的小伙伴可以了解下... 目录一、慢SQL的致命影响二、精准定位问题SQL1. 启用慢查询日志2. 诊断黄金三件套三、六大核心优化方案方案

使用Python高效获取网络数据的操作指南

《使用Python高效获取网络数据的操作指南》网络爬虫是一种自动化程序,用于访问和提取网站上的数据,Python是进行网络爬虫开发的理想语言,拥有丰富的库和工具,使得编写和维护爬虫变得简单高效,本文将... 目录网络爬虫的基本概念常用库介绍安装库Requests和BeautifulSoup爬虫开发发送请求解

SpringBoot整合MybatisPlus的基本应用指南

《SpringBoot整合MybatisPlus的基本应用指南》MyBatis-Plus,简称MP,是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,下面小编就来和大家介绍一下... 目录一、MyBATisPlus简介二、SpringBoot整合MybatisPlus1、创建数据库和