K8s APF 机制 (API Priority and Fairness)

2023-10-28 06:10

本文主要是介绍K8s APF 机制 (API Priority and Fairness),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 背景

APIServer 已有一个简单的机制 (MaxInFlightLimit) 来保护自己不受 CPU 和内存过载的影响。通过 --max-requests-inflight 和 --max-mutating-requests-inflight 限制待处理的请求数量。

这种方式存在一些明显的缺陷:

  • 除了区分 mutating 和 readonly 之外,请求之间没有其他区别,管理粒度比较粗
  • 一个大流量的请求子集可能会挤掉其他的请求,如果一些非常重要的请求 (sysytem controllers, leader elections) 被挤掉,后果比较严重。

2. APF 的整体设计

API Priority and Fairness (APF) 是 K8s v1.18 的一个 alpha 特性。APF 这种替代方案可以规避 MaxInFlightLimit 方案存在的问题。下面介绍 APF 的整体设计,从中可以看出 APF 到底是如何规避这些问题的。

img

  • APF 的实现依赖两个非常重要的资源 FlowSchema, PriorityLevelConfiguration

  • APF 对请求进行更细粒度的分类,每一个请求分类对应一个 FlowSchema (FS).

  • FS 内的请求又会根据 distinguisher 进一步划分为不同的 Flow.

  • FS 会设置一个优先级 (Priority Level, PL),不同优先级的并发资源是隔离的。所以不同优先级的资源不会相互排挤。特定优先级的请求可以被高优处理。

  • 一个 PL 可以对应多个 FS,PL 中维护了一个 QueueSet,用于缓存不能及时处理的请求,请求不会因为超出 PL 的并发限制而被丢弃。

  • FS 中的每个 Flow 通过 shuffle sharding 算法从 QueueSet 选取特定的 queues 缓存请求。

  • 每次从 QueueSet 中取请求执行时,会先应用 fair queuing 算法从 QueueSet 中选中一个 queue,然后从这个 queue 中取出 oldest 请求执行。所以即使是同一个 PL 内的请求,也不会出现一个 Flow 内的请求一直占用资源的不公平现象。

2.1 FlowSchema

1、如何对请求进行分类?

用户可以通过创建 FlowSchema 资源对象自定义分类方式。

FS 代表一个请求分类,包含多条匹配规则,如果某个请求能匹配其中任意一条规则就认为这个请求属于这个 FS (只匹配第一个匹配的 FS)。

// FS 规则
// FlowSchemaSpec describes how the FlowSchema's specification looks like.
type FlowSchemaSpec struct {...Rules []PolicyRulesWithSubjects `json:"rules,omitempty" protobuf:"bytes,4,rep,name=rules"`
}

请求与 FS 规则匹配:同时满足以下条件,就认为请求与该 FS 规则匹配

  • 匹配请求主体 subject
  • 对资源的请求,匹配 ResourceRules 中任意一条规则
  • 对非资源的请求, 匹配 NonResourceRules 中任意一条规则
type PolicyRulesWithSubjects struct {Subjects []SubjectResourceRules []ResourcePolicyRuleNonResourceRules []NonResourcePolicyRule
}type Subject struct {Kind SubjectKind `json:"kind" protobuf:"bytes,1,opt,name=kind"`User *UserSubject `json:"user,omitempty" protobuf:"bytes,2,opt,name=user"`Group *GroupSubject `json:"group,omitempty" protobuf:"bytes,3,opt,name=group"`ServiceAccount *ServiceAccountSubject `json:"serviceAccount,omitempty" protobuf:"bytes,4,opt,name=serviceAccount"`
}type ResourcePolicyRule struct {Verbs []string `json:"verbs" protobuf:"bytes,1,rep,name=verbs"`APIGroups []string `json:"apiGroups" protobuf:"bytes,2,rep,name=apiGroups"`Resources []string `json:"resources" protobuf:"bytes,3,rep,name=resources"`ClusterScope bool `json:"clusterScope,omitempty" protobuf:"varint,4,opt,name=clusterScope"`Namespaces []string `json:"namespaces" protobuf:"bytes,5,rep,name=namespaces"`
}type NonResourcePolicyRule struct {Verbs []string `json:"verbs" protobuf:"bytes,1,rep,name=verbs"`NonResourceURLs []string `json:"nonResourceURLs" protobuf:"bytes,6,rep,name=nonResourceURLs"`
}

总之,通过 FS,可以根据请求的主体 (User, Group, ServiceAccout)、动作 (Get, List, Create, Delete …)、资源类型 (pod, deployment …)、namespace、url 对请求进行分类。

2、FS 内的请求如何进一步划分 Flow ?

有两种方式对请求进行 Flow 划分:

  • distinguisher = ByUser, 根据请求的 User 划分不同 Flow;可以让来自不同用户的请求平等使用 PL 内的资源。
  • distinguisher = ByNamespace, 根据请求的 namespace 划分不同的 Flow;可以让来自不同 namespace 的请求平等使用 PL 内的资源。
  • distinguisher = nil,表示不划分

distinguisher 的取值由请求所属的 FS 决定:

type FlowSchemaSpec struct {...DistinguisherMethod *FlowDistinguisherMethod `json:"distinguisherMethod,omitempty" protobuf:"bytes,3,opt,name=distinguisherMethod"`...
}type FlowDistinguisherMethod struct {Type FlowDistinguisherMethodType `json:"type" protobuf:"bytes,1,opt,name=type"`
}type FlowDistinguisherMethodType string
const (FlowDistinguisherMethodByUserType FlowDistinguisherMethodType = "ByUser"FlowDistinguisherMethodByNamespaceType FlowDistinguisherMethodType = "ByNamespace"
)

3、如何给请求分配优先级?

FS 通过 FlowSchema.Spec.PriorityLevelConfiguration.Name 指定 PL,从属于这个 FS 的所有请求都划分到这个优先级中。

2.2 Priority Level

用户可以通过创建 PriorityLevelConfiguration 资源对象自定义 PL。

如果 API sever 启动了 APF,它的总并发数为 --max-requests-inflight 和 --max-mutating-requests-inflight 两个配置值之和。这些并发数被分配给各个 PL,分配方式是根据 PriorityLevelConfiguration.Spec.Limited.AssuredConcurrencyShares 的数值按比例分配。PL 的 AssuredConcurrencyShare 越大,分配到的并发份额越大。

每个 PL 都对应维护了一个 QueueSet,其中包含多个 queue ,当 PL 达到并发限制时,收到的请求会被缓存在 QueueSet 中,不会丢弃,除非 queue 也达到了容量限制。

QueueSet 中 queue 数量由PriorityLevelConfiguration.Spec.Limited.LimitResponse.Queuing.Queues 指定;每个 queue 的长度由 PriorityLevelConfiguration.Spec.Limited.LimitResponse.Queuing.QueueLengthLimit 指定。

5 个推荐的 PL

PLrequest
system来自 system:nodes group 的请求
leader-election内建 controller 的 leader election 请求
workload-high内建 controller 的其他请求
workload-low来自其他 service account 的请求
global-default所有其他的请求

2.3 两对内建的 PL 和 FS

Exempt PL (PriorityLevelConfigurations.Spec.Type = exempt):这个 PL 内的请求完全不受限制,总是被立即执行。

Exempt FS:将来自 system:master group 的所有请求划分到 Exempt PL。用户可以自定义 FS 将一些特殊的请求划分到 Exempt PL 中。

catch-all PL:只有一个并发配额,没有 queue。一般会返回 HTTP 429 错误。

catch-all FS:说有未能匹配其他 FS 的请求,最终会被这个 FS 匹配上。保证所有的请求都有一个分类。

由于 request 只匹配第一个符合条件的 FS,所以 APF 会对 FS 进行排序,将 Exempt FS 排第一位,catch-all FS 排最后一位。

// sort into the order to be used for matching
sort.Sort(fsSeq)// Supply missing mandatory FlowSchemas, in correct position
if !haveExemptFS {// 放在第一位fsSeq = append(apihelpers.FlowSchemaSequence{fcboot.MandatoryFlowSchemaExempt}, fsSeq...)
}
if !haveCatchAllFS {// 放在最后一位fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll)
}

3. 请求流程

1、分配 queue

API Server 接收到请求后,先按照前面提到的方式,找到与之匹配的 FS,实现分类,并根据 FS 确定请求的所属的Flow 和 PL。

APF 利用 FS 的 name 和请求的 userName 或 namespace 计算一个 hashFlowID 标识 Flow。

var hashValue uint64
if numQueues > 1 {// 1. DistinguisherMethod = ByUser, flowDistinguisher = rd.User.Name// 2. DistinguisherMethod = ByNamespace, flowDistinguisher = rd.RequestInfo.Namespace// 3. DistinguisherMethod = nil, flowDistinguisher = ""flowDistinguisher := computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod)hashValue = hashFlowID(fs.Name, flowDistinguisher)
}

然后利用这个 hashFlowID 通过 Shuttle Sharding 算法,从请求所属的 PL 的 QueueSet 中选取指定数目的 queues (PriorityLevelConfiguration.Spec.Limited.LimitResponse.Queuing.HandSize):

func (d *Dealer) Deal(hashValue uint64, pick func(int)) {// 15 is the largest possible value of handSizevar remainders [15]intfor i := 0; i < d.handSize; i++ {hashValueNext := hashValue / uint64(d.deckSize-i)remainders[i] = int(hashValue - uint64(d.deckSize-i)*hashValueNext)hashValue = hashValueNext}// 防止重复:正反馈机制,大者更大for i := 0; i < d.handSize; i++ {card := remainders[i]for j := i; j > 0; j-- {if card >= remainders[j-1] {// 不会出现 card > deckSize// 因为 hashValue % uint64(d.deckSize-i) <= d.deckSize-i-1,而第 i 个 card 最多自增 i 次card++}}pick(card)}
}

然后从这些候选的 queues 中,选择其中 length 最小 queue. 并移出 queue 中超时的请求。

判断是否入队这个请求:如果队列已满且 PL 中正在执行的请求数达到 PL 的并发限制,就会拒绝这个请求,否则入队这个请求。

此处保证了不同 Flow 的请求不会挤掉其他 Flow 的请求。Flow 是按照用户或 namespace 划分的,它的实际意义就是来自不同用户或 namespace 的请求不会挤掉同优先级的其他用户或 namespace 的请求。

2、分发请求

为了保证同一个 PL 中缓存的不同 Flow 的请求被处理机会平等,每次分发请求时,都会先应用 fair queuing 算法从 PL 的 QueueSet 中选中一个 queue:

// selectQueueLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal.
func (qs *queueSet) selectQueueLocked() *queue {minVirtualFinish := math.Inf(1)var minQueue *queuevar minIndex intnq := len(qs.queues)for range qs.queues {qs.robinIndex = (qs.robinIndex + 1) % nqqueue := qs.queues[qs.robinIndex]if len(queue.requests) != 0 {currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)if currentVirtualFinish < minVirtualFinish {minVirtualFinish = currentVirtualFinishminQueue = queueminIndex = qs.robinIndex}}}// we set the round robin indexing to start at the chose queue// for the next round.  This way the non-selected queues// win in the case that the virtual finish times are the sameqs.robinIndex = minIndexreturn minQueue
}

fiar queuing 选 queue 的基本思路是:

1)每一个 queue 都维护了一个 virtualStart: oldest 请求的虚拟开始执行时间

type queue struct {requests []*request// 如果队列中没有 request 且没有 request 在执行 (requestsExecuting = 0), virtualStart = queueSet.virtualTime// 每分发一个 request, virtualStart = virtualStart + queueSet.estimatedServiceTime// 每执行完一个 request, virtualStart = virtualStart - queueSet.estimatedServiceTime + actualServiceTime,用真实的执行时间,校准 virtualStart// 计算第 J 个 request 的 virtualFinishTime = virtualStart + (J+1) * serviceTimevirtualStart float64requestsExecuting intindex             int
}

比较关键的一点是:virtualStart 如何进行初始化?

virtualStart 初始化是直接设置为 QueueSet 中维护的 virtualTime。而 QueueSet.virtualTime 是在这个 PL 初始化的时候赋值为 0。此后,如果 QueueSet 中的 queue 如有任何状态变化,都要执行更新,根据自身两次变更历经的 realTime 按比例增加:

func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now()
timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds()
qs.lastRealTime = realNow
qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked()
}

其中,这个比例计算方式为:min(QueueSet 中正字执行的请求数, PL 的并发配额) / QueueSet 中活跃的 queue 数目。

virtualTime 实际对应于 bit-by-bit round-robin 算法中的 R(t),当前时间 round-robin 轮数。具体可以参考文后第4个链接。

  1. 选 queue 时,会估计每个 queue 中 oldest 请求的虚拟执行完毕时间,选择这个虚拟执行完毕时间最小的 queue

选中 queue 之后,从 queue 中取出 oldest 请求,设置执行标记。

重复执行以上选 queue 给 oldest 请求设置执行标志,直到 PL 所有的 Queue 中都没有缓存的请求或达到 PL 的并发限制。

注:此处是尽可能多的分发 PL 中缓存的请求,有可能当前新加入的请求不会被分发。

3、请求阻塞监听执行

完成以上操作之后,该请求会进入阻塞监听状态,直到被分发。

func (req *request) wait() (bool, bool) {qs := req.qsqs.lock.Lock()defer qs.lock.Unlock()...// 里面包含一个条件锁,阻塞,等待唤醒decisionAny := req.decision.GetLocked()...decision, isDecision := decisionAny.(requestDecision)if !isDecision {panic(fmt.Sprintf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2))}switch decision {case decisionReject:klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out")return false, qs.isIdleLocked()case decisionCancel:// TODO(aaron-prindle) add metrics for this caseklog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)return false, qs.isIdleLocked()case decisionExecute:klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)return true, falsedefault:// This can not happen, all possible values are handled abovepanic(decision)}
}

4、 请求执行

如果这个请求被唤醒,并收到了 decisionExecute 标记,便会开始执行。

func (req *request) Finish(execFn func()) bool {exec, idle := req.wait()if !exec {return idle}// 请求执行execFn()// 分发请求return req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
}

执行完毕后,便会释放一个并发资源,于是会触发新一轮的请求分发。

4. 如何启用 APF?

kube-apiserver \
--feature-gates=APIPriorityAndFairness=true \
--runtime-config=flowcontrol.apiserver.k8s.io/v1alpha1=true \# …and other flags as usual

“–enable-priority-and-fairness=false” 选项会禁用 APF,即使以上选项已启用。

参考

https://kubernetes.io/blog/2020/04/06/kubernetes-1-18-feature-api-priority-and-fairness-alpha/

https://kubernetes.io/docs/concepts/cluster-administration/flow-control/

https://en.wikipedia.org/wiki/Fair_queuing

http://www.iitg.ac.in/nselvaraju/ma402_2007/Assignments/04010605_tp.pdf

这篇关于K8s APF 机制 (API Priority and Fairness)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/291231

相关文章

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

k8s部署MongDB全过程

《k8s部署MongDB全过程》文章介绍了如何在Kubernetes集群中部署MongoDB,包括环境准备、创建Secret、创建服务和Deployment,并通过Robo3T工具测试连接... 目录一、环境准备1.1 环境说明1.2 创建 namespace1.3 创建mongdb账号/密码二、创建Sec

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1

Java如何通过反射机制获取数据类对象的属性及方法

《Java如何通过反射机制获取数据类对象的属性及方法》文章介绍了如何使用Java反射机制获取类对象的所有属性及其对应的get、set方法,以及如何通过反射机制实现类对象的实例化,感兴趣的朋友跟随小编一... 目录一、通过反射机制获取类对象的所有属性以及相应的get、set方法1.遍历类对象的所有属性2.获取

MySQL中的锁和MVCC机制解读

《MySQL中的锁和MVCC机制解读》MySQL事务、锁和MVCC机制是确保数据库操作原子性、一致性和隔离性的关键,事务必须遵循ACID原则,锁的类型包括表级锁、行级锁和意向锁,MVCC通过非锁定读和... 目录mysql的锁和MVCC机制事务的概念与ACID特性锁的类型及其工作机制锁的粒度与性能影响多版本

使用SpringBoot创建一个RESTful API的详细步骤

《使用SpringBoot创建一个RESTfulAPI的详细步骤》使用Java的SpringBoot创建RESTfulAPI可以满足多种开发场景,它提供了快速开发、易于配置、可扩展、可维护的优点,尤... 目录一、创建 Spring Boot 项目二、创建控制器类(Controller Class)三、运行

centos7基于keepalived+nginx部署k8s1.26.0高可用集群

《centos7基于keepalived+nginx部署k8s1.26.0高可用集群》Kubernetes是一个开源的容器编排平台,用于自动化地部署、扩展和管理容器化应用程序,在生产环境中,为了确保集... 目录一、初始化(所有节点都执行)二、安装containerd(所有节点都执行)三、安装docker-

Spring使用@Retryable实现自动重试机制

《Spring使用@Retryable实现自动重试机制》在微服务架构中,服务之间的调用可能会因为一些暂时性的错误而失败,例如网络波动、数据库连接超时或第三方服务不可用等,在本文中,我们将介绍如何在Sp... 目录引言1. 什么是 @Retryable?2. 如何在 Spring 中使用 @Retryable

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization