一文读懂Kubernetes Scheduler扩展功能

2024-06-16 05:08

本文主要是介绍一文读懂Kubernetes Scheduler扩展功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

Scheduler是Kubernetes组件中功能&逻辑相对单一&简单的模块,它主要的作用是:watch kube-apiserver,监听PodSpec.NodeName为空的pod,并利用预选和优选算法为该pod选择一个最佳的调度节点,最终将pod与该节点进行绑定,使pod调度在该节点上运行

在这里插入图片描述

展开上述调用流程中的scheduler部分,内部细节调用(参考Kubernetes Scheduler)如图所示:

在这里插入图片描述

scheduler内部预置了很多预选和优选算法(参考scheduler_algorithm),比如预选:NoDiskConflict,PodFitsResources,MatchNodeSelector,CheckNodeMemoryPressure等;优选:LeastRequestedPriority,BalancedResourceAllocation,CalculateAntiAffinityPriority,NodeAffinityPriority等。但是在实际生产环境中我们常常会需要一些特殊的调度策略,比如批量调度(aka coscheduling or gang scheduling),这是kubernetes默认调度策略所无法满足的,这个时候就需要我们对scheduler进行扩展来实现这个功能了

scheduler扩展方案

目前Kubernetes支持四种方式实现客户自定义的调度算法(预选&优选),如下:

  • default-scheduler recoding: 直接在Kubernetes默认scheduler基础上进行添加,然后重新编译kube-scheduler
  • standalone: 实现一个与kube-scheduler平行的custom scheduler,单独或者和默认kube-scheduler一起运行在集群中
  • scheduler extender: 实现一个"scheduler extender",kube-scheduler会调用它(http/https)作为默认调度算法(预选&优选&bind)的补充
  • scheduler framework: 实现scheduler framework plugins,重新编译kube-scheduler,类似于第一种方案,但是更加标准化,插件化

下面分别展开介绍这几种方式的原理和开发指引

default-scheduler recoding

这里我们先分析一下kube-scheduler调度相关入口:

  • 设置默认预选&优选策略

见defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go):

func init() {registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
}func defaultPredicates() sets.String {return sets.NewString(predicates.NoVolumeZoneConflictPred,predicates.MaxEBSVolumeCountPred,predicates.MaxGCEPDVolumeCountPred,predicates.MaxAzureDiskVolumeCountPred,predicates.MaxCSIVolumeCountPred,predicates.MatchInterPodAffinityPred,predicates.NoDiskConflictPred,predicates.GeneralPred,predicates.PodToleratesNodeTaintsPred,predicates.CheckVolumeBindingPred,predicates.CheckNodeUnschedulablePred,)
}func defaultPriorities() sets.String {return sets.NewString(priorities.SelectorSpreadPriority,priorities.InterPodAffinityPriority,priorities.LeastRequestedPriority,priorities.BalancedResourceAllocation,priorities.NodePreferAvoidPodsPriority,priorities.NodeAffinityPriority,priorities.TaintTolerationPriority,priorities.ImageLocalityPriority,)
}func registerAlgorithmProvider(predSet, priSet sets.String) {// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used// by specifying flag.scheduler.RegisterAlgorithmProvider(scheduler.DefaultProvider, predSet, priSet)// Cluster autoscaler friendly scheduling algorithm.scheduler.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}const (// DefaultProvider defines the default algorithm provider name.DefaultProvider = "DefaultProvider"
)
  • 注册预选和优选相关处理函数

注册预选函数(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go):

func init() {...// Fit is determined by resource availability.// This predicate is actually a default predicate, because it is invoked from// predicates.GeneralPredicates()scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
}

注册优选函数(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go):

func init() {...// Prioritizes nodes that have labels matching NodeAffinityscheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
}
  • 编写预选和优选处理函数

PodFitsResourcesPred对应的预选函数如下(k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go):

// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {node := nodeInfo.Node()if node == nil {return false, nil, fmt.Errorf("node not found")}var predicateFails []PredicateFailureReasonallowedPodNumber := nodeInfo.AllowedPodNumber()if len(nodeInfo.Pods())+1 > allowedPodNumber {predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))}// No extended resources should be ignored by default.ignoredExtendedResources := sets.NewString()var podRequest *schedulernodeinfo.Resourceif predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil {podRequest = predicateMeta.podFitsResourcesMetadata.podRequestif predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil {ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources}} else {// We couldn't parse metadata - fallback to computing it.podRequest = GetResourceRequest(pod)}if podRequest.MilliCPU == 0 &&podRequest.Memory == 0 &&podRequest.EphemeralStorage == 0 &&len(podRequest.ScalarResources) == 0 {return len(predicateFails) == 0, predicateFails, nil}allocatable := nodeInfo.AllocatableResource()if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))}if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))}if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))}for rName, rQuant := range podRequest.ScalarResources {if v1helper.IsExtendedResourceName(rName) {// If this resource is one of the extended resources that should be// ignored, we will skip checking it.if ignoredExtendedResources.Has(string(rName)) {continue}}if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))}}if klog.V(10) {if len(predicateFails) == 0 {// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is// not logged. There is visible performance gain from it.klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)}}return len(predicateFails) == 0, predicateFails, nil
}

优选NodeAffinityPriority对应的Map与Reduce函数(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/node_affinity.go)如下:

// CalculateNodeAffinityPriorityMap prioritizes nodes according to node affinity scheduling preferences
// indicated in PreferredDuringSchedulingIgnoredDuringExecution. Each time a node matches a preferredSchedulingTerm,
// it will get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms
// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher
// score the node gets.
func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {node := nodeInfo.Node()if node == nil {return framework.NodeScore{}, fmt.Errorf("node not found")}// default is the podspec.affinity := pod.Spec.Affinityif priorityMeta, ok := meta.(*priorityMetadata); ok {// We were able to parse metadata, use affinity from there.affinity = priorityMeta.affinity}var count int32// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an// empty PreferredSchedulingTerm matches all objects.if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]if preferredSchedulingTerm.Weight == 0 {continue}// TODO: Avoid computing it for all nodes if this becomes a performance problem.nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)if err != nil {return framework.NodeScore{}, err}if nodeSelector.Matches(labels.Set(node.Labels)) {count += preferredSchedulingTerm.Weight}}}return framework.NodeScore{Name:  node.Name,Score: int64(count),}, nil
}// CalculateNodeAffinityPriorityReduce is a reduce function for node affinity priority calculation.
var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false)
  • 相关使用

接下来我们看一下kube-scheduler调度算法(预选&优选)是如何与上述这些操作结合起来的:

// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)...
// RegisterFitPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered.
func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {return RegisterFitPredicateFactory(name, func(AlgorithmFactoryArgs) predicates.FitPredicate { return predicate })
}...
// RegisterFitPredicateFactory registers a fit predicate factory with the
// algorithm registry. Returns the name with which the predicate was registered.
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {schedulerFactoryMutex.Lock()defer schedulerFactoryMutex.Unlock()validateAlgorithmNameOrDie(name)fitPredicateMap[name] = predicateFactoryreturn name
}...
// Prioritizes nodes that have labels matching NodeAffinity
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)...
// RegisterPriorityMapReduceFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriorityMapReduceFunction(name string,mapFunction priorities.PriorityMapFunction,reduceFunction priorities.PriorityReduceFunction,weight int) string {return RegisterPriorityConfigFactory(name, PriorityConfigFactory{MapReduceFunction: func(AlgorithmFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {return mapFunction, reduceFunction},Weight: int64(weight),})
}...
// RegisterPriorityConfigFactory registers a priority config factory with its name.
func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) string {schedulerFactoryMutex.Lock()defer schedulerFactoryMutex.Unlock()validateAlgorithmNameOrDie(name)priorityFunctionMap[name] = pcfreturn name
}...
// (g.predicates)
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func (g *genericScheduler) podFitsOnNode(ctx context.Context,state *framework.CycleState,pod 

这篇关于一文读懂Kubernetes Scheduler扩展功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1065531

相关文章

android 免费短信验证功能

没有太复杂的使用的话,功能实现比较简单粗暴。 在www.mob.com网站中可以申请使用免费短信验证功能。 步骤: 1.注册登录。 2.选择“短信验证码SDK” 3.下载对应的sdk包,我这是选studio的。 4.从头像那进入后台并创建短信验证应用,获取到key跟secret 5.根据技术文档操作(initSDK方法写在setContentView上面) 6.关键:在有用到的Mo

android一键分享功能部分实现

为什么叫做部分实现呢,其实是我只实现一部分的分享。如新浪微博,那还有没去实现的是微信分享。还有一部分奇怪的问题:我QQ分享跟QQ空间的分享功能,我都没配置key那些都是原本集成就有的key也可以实现分享,谁清楚的麻烦详解下。 实现分享功能我们可以去www.mob.com这个网站集成。免费的,而且还有短信验证功能。等这分享研究完后就研究下短信验证功能。 开始实现步骤(新浪分享,以下是本人自己实现

Android我的二维码扫描功能发展史(完整)

最近在研究下二维码扫描功能,跟据从网上查阅的资料到自己勉强已实现扫描功能来一一介绍我的二维码扫描功能实现的发展历程: 首页通过网络搜索发现做android二维码扫描功能看去都是基于google的ZXing项目开发。 2、搜索怎么使用ZXing实现自己的二维码扫描:从网上下载ZXing-2.2.zip以及core-2.2-source.jar文件,分别解压两个文件。然后把.jar解压出来的整个c

iptables(7)扩展模块state

简介         前面文章我们已经介绍了一些扩展模块,如iprange、string、time、connlimit、limit,还有扩展匹配条件如--tcp-flags、icmp。这篇文章我们介绍state扩展模块  state          在 iptables 的上下文中,--state 选项并不是直接关联于一个扩展模块,而是与 iptables 的 state 匹配机制相关,特

开启青龙 Ninja 扫码功能失效后修改成手动填写CK功能【修正Ninja拉库地址】

国内:进入容器docker exec -it qinglong bash #获取ninjagit clone -b main https://ghproxy.com/https://github.com/wjx0428/ninja.git /ql/ninja#安装cd /ql/ninja/backend && pnpm install cp .env.example .env

【机器学习】半监督学习可以实现什么功能?

目录 一、什么是机器学习二、半监督学习算法介绍三、半监督学习算法的应用场景四、半监督学习可以实现什么功能? 一、什么是机器学习 机器学习是一种人工智能技术,它使计算机系统能够从数据中学习并做出预测或决策,而无需明确编程。它涉及到使用算法和统计模型来分析大量数据,识别其中的模式和关系,并利用这些信息来预测未来事件或做出决策。机器学习可以应用于各种领域,包括图像识别、自然语言

BD错误集锦6——【IDEA报错】tomcat server功能无效,报错Java EE: EJB, JPA, Servlets

在网上查找原因,发现是非法关闭IDEA导致的。 Open Settings | Plugns and enable it. 在设置中enable JAVA EE和tomcat server即可。 参考: https://stackoverflow.com/questions/43607642/intellij-idea-plugin-errorproblems-found-loadin

小车启动底盘功能包

传感器与小车底盘的集成 新建功能包 catkin_create_pkg mycar_start roscpp rospy std_msgs ros_arduino_python usb_cam ydlidar_ros_driver 功能包下创建launch文件夹,launch文件夹中新建launch文件,文件名start.launch。 内容如下 <!-- 机器人启动文件:1.启动底盘2

fvcore库的一些功能和使用

目录 一、安装fvcore库 二、使用 fvcore是Facebook开源的一个轻量级的核心库,它提供了各种计算机视觉框架中常见且基本的功能。其中就包括了统计模型的参数以及FLOPs等。 项目地址:fvcore 一、安装fvcore库 pip install fvcore 二、使用 1、计算模型的参数数量以及FLOPs 下面以resnet50为示例: import

PHP生成csv格式Excel,秒级别实现excel导出功能

防止报超内存,兼容中文,兼容科学技术法。 爽。。。。很爽。。。。 /*** 告诉浏览器下载csv文件* @param string $filename*/public static function downloadCsv($data, $filename, $encoding = 'utf-8'){header("Content-type: text/csv");header("Conten