一文读懂Kubernetes Scheduler扩展功能

2024-06-16 05:08

本文主要是介绍一文读懂Kubernetes Scheduler扩展功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

Scheduler是Kubernetes组件中功能&逻辑相对单一&简单的模块,它主要的作用是:watch kube-apiserver,监听PodSpec.NodeName为空的pod,并利用预选和优选算法为该pod选择一个最佳的调度节点,最终将pod与该节点进行绑定,使pod调度在该节点上运行

在这里插入图片描述

展开上述调用流程中的scheduler部分,内部细节调用(参考Kubernetes Scheduler)如图所示:

在这里插入图片描述

scheduler内部预置了很多预选和优选算法(参考scheduler_algorithm),比如预选:NoDiskConflict,PodFitsResources,MatchNodeSelector,CheckNodeMemoryPressure等;优选:LeastRequestedPriority,BalancedResourceAllocation,CalculateAntiAffinityPriority,NodeAffinityPriority等。但是在实际生产环境中我们常常会需要一些特殊的调度策略,比如批量调度(aka coscheduling or gang scheduling),这是kubernetes默认调度策略所无法满足的,这个时候就需要我们对scheduler进行扩展来实现这个功能了

scheduler扩展方案

目前Kubernetes支持四种方式实现客户自定义的调度算法(预选&优选),如下:

  • default-scheduler recoding: 直接在Kubernetes默认scheduler基础上进行添加,然后重新编译kube-scheduler
  • standalone: 实现一个与kube-scheduler平行的custom scheduler,单独或者和默认kube-scheduler一起运行在集群中
  • scheduler extender: 实现一个"scheduler extender",kube-scheduler会调用它(http/https)作为默认调度算法(预选&优选&bind)的补充
  • scheduler framework: 实现scheduler framework plugins,重新编译kube-scheduler,类似于第一种方案,但是更加标准化,插件化

下面分别展开介绍这几种方式的原理和开发指引

default-scheduler recoding

这里我们先分析一下kube-scheduler调度相关入口:

  • 设置默认预选&优选策略

见defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go):

func init() {registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
}func defaultPredicates() sets.String {return sets.NewString(predicates.NoVolumeZoneConflictPred,predicates.MaxEBSVolumeCountPred,predicates.MaxGCEPDVolumeCountPred,predicates.MaxAzureDiskVolumeCountPred,predicates.MaxCSIVolumeCountPred,predicates.MatchInterPodAffinityPred,predicates.NoDiskConflictPred,predicates.GeneralPred,predicates.PodToleratesNodeTaintsPred,predicates.CheckVolumeBindingPred,predicates.CheckNodeUnschedulablePred,)
}func defaultPriorities() sets.String {return sets.NewString(priorities.SelectorSpreadPriority,priorities.InterPodAffinityPriority,priorities.LeastRequestedPriority,priorities.BalancedResourceAllocation,priorities.NodePreferAvoidPodsPriority,priorities.NodeAffinityPriority,priorities.TaintTolerationPriority,priorities.ImageLocalityPriority,)
}func registerAlgorithmProvider(predSet, priSet sets.String) {// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used// by specifying flag.scheduler.RegisterAlgorithmProvider(scheduler.DefaultProvider, predSet, priSet)// Cluster autoscaler friendly scheduling algorithm.scheduler.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}const (// DefaultProvider defines the default algorithm provider name.DefaultProvider = "DefaultProvider"
)
  • 注册预选和优选相关处理函数

注册预选函数(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go):

func init() {...// Fit is determined by resource availability.// This predicate is actually a default predicate, because it is invoked from// predicates.GeneralPredicates()scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
}

注册优选函数(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go):

func init() {...// Prioritizes nodes that have labels matching NodeAffinityscheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
}
  • 编写预选和优选处理函数

PodFitsResourcesPred对应的预选函数如下(k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go):

// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {node := nodeInfo.Node()if node == nil {return false, nil, fmt.Errorf("node not found")}var predicateFails []PredicateFailureReasonallowedPodNumber := nodeInfo.AllowedPodNumber()if len(nodeInfo.Pods())+1 > allowedPodNumber {predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))}// No extended resources should be ignored by default.ignoredExtendedResources := sets.NewString()var podRequest *schedulernodeinfo.Resourceif predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil {podRequest = predicateMeta.podFitsResourcesMetadata.podRequestif predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil {ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources}} else {// We couldn't parse metadata - fallback to computing it.podRequest = GetResourceRequest(pod)}if podRequest.MilliCPU == 0 &&podRequest.Memory == 0 &&podRequest.EphemeralStorage == 0 &&len(podRequest.ScalarResources) == 0 {return len(predicateFails) == 0, predicateFails, nil}allocatable := nodeInfo.AllocatableResource()if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))}if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))}if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))}for rName, rQuant := range podRequest.ScalarResources {if v1helper.IsExtendedResourceName(rName) {// If this resource is one of the extended resources that should be// ignored, we will skip checking it.if ignoredExtendedResources.Has(string(rName)) {continue}}if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))}}if klog.V(10) {if len(predicateFails) == 0 {// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is// not logged. There is visible performance gain from it.klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)}}return len(predicateFails) == 0, predicateFails, nil
}

优选NodeAffinityPriority对应的Map与Reduce函数(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/node_affinity.go)如下:

// CalculateNodeAffinityPriorityMap prioritizes nodes according to node affinity scheduling preferences
// indicated in PreferredDuringSchedulingIgnoredDuringExecution. Each time a node matches a preferredSchedulingTerm,
// it will get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms
// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher
// score the node gets.
func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {node := nodeInfo.Node()if node == nil {return framework.NodeScore{}, fmt.Errorf("node not found")}// default is the podspec.affinity := pod.Spec.Affinityif priorityMeta, ok := meta.(*priorityMetadata); ok {// We were able to parse metadata, use affinity from there.affinity = priorityMeta.affinity}var count int32// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an// empty PreferredSchedulingTerm matches all objects.if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]if preferredSchedulingTerm.Weight == 0 {continue}// TODO: Avoid computing it for all nodes if this becomes a performance problem.nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)if err != nil {return framework.NodeScore{}, err}if nodeSelector.Matches(labels.Set(node.Labels)) {count += preferredSchedulingTerm.Weight}}}return framework.NodeScore{Name:  node.Name,Score: int64(count),}, nil
}// CalculateNodeAffinityPriorityReduce is a reduce function for node affinity priority calculation.
var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false)
  • 相关使用

接下来我们看一下kube-scheduler调度算法(预选&优选)是如何与上述这些操作结合起来的:

// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)...
// RegisterFitPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered.
func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {return RegisterFitPredicateFactory(name, func(AlgorithmFactoryArgs) predicates.FitPredicate { return predicate })
}...
// RegisterFitPredicateFactory registers a fit predicate factory with the
// algorithm registry. Returns the name with which the predicate was registered.
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {schedulerFactoryMutex.Lock()defer schedulerFactoryMutex.Unlock()validateAlgorithmNameOrDie(name)fitPredicateMap[name] = predicateFactoryreturn name
}...
// Prioritizes nodes that have labels matching NodeAffinity
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)...
// RegisterPriorityMapReduceFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriorityMapReduceFunction(name string,mapFunction priorities.PriorityMapFunction,reduceFunction priorities.PriorityReduceFunction,weight int) string {return RegisterPriorityConfigFactory(name, PriorityConfigFactory{MapReduceFunction: func(AlgorithmFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {return mapFunction, reduceFunction},Weight: int64(weight),})
}...
// RegisterPriorityConfigFactory registers a priority config factory with its name.
func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) string {schedulerFactoryMutex.Lock()defer schedulerFactoryMutex.Unlock()validateAlgorithmNameOrDie(name)priorityFunctionMap[name] = pcfreturn name
}...
// (g.predicates)
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func (g *genericScheduler) podFitsOnNode(ctx context.Context,state *framework.CycleState,pod 

这篇关于一文读懂Kubernetes Scheduler扩展功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1065531

相关文章

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构

一文详解SpringBoot中控制器的动态注册与卸载

《一文详解SpringBoot中控制器的动态注册与卸载》在项目开发中,通过动态注册和卸载控制器功能,可以根据业务场景和项目需要实现功能的动态增加、删除,提高系统的灵活性和可扩展性,下面我们就来看看Sp... 目录项目结构1. 创建 Spring Boot 启动类2. 创建一个测试控制器3. 创建动态控制器注

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

一文详解Git中分支本地和远程删除的方法

《一文详解Git中分支本地和远程删除的方法》在使用Git进行版本控制的过程中,我们会创建多个分支来进行不同功能的开发,这就容易涉及到如何正确地删除本地分支和远程分支,下面我们就来看看相关的实现方法吧... 目录技术背景实现步骤删除本地分支删除远程www.chinasem.cn分支同步删除信息到其他机器示例步骤

Golang如何用gorm实现分页的功能

《Golang如何用gorm实现分页的功能》:本文主要介绍Golang如何用gorm实现分页的功能方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录背景go库下载初始化数据【1】建表【2】插入数据【3】查看数据4、代码示例【1】gorm结构体定义【2】分页结构体

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam

Java Web实现类似Excel表格锁定功能实战教程

《JavaWeb实现类似Excel表格锁定功能实战教程》本文将详细介绍通过创建特定div元素并利用CSS布局和JavaScript事件监听来实现类似Excel的锁定行和列效果的方法,感兴趣的朋友跟随... 目录1. 模拟Excel表格锁定功能2. 创建3个div元素实现表格锁定2.1 div元素布局设计2.

HTML5实现的移动端购物车自动结算功能示例代码

《HTML5实现的移动端购物车自动结算功能示例代码》本文介绍HTML5实现移动端购物车自动结算,通过WebStorage、事件监听、DOM操作等技术,确保实时更新与数据同步,优化性能及无障碍性,提升用... 目录1. 移动端购物车自动结算概述2. 数据存储与状态保存机制2.1 浏览器端的数据存储方式2.1.

基于 HTML5 Canvas 实现图片旋转与下载功能(完整代码展示)

《基于HTML5Canvas实现图片旋转与下载功能(完整代码展示)》本文将深入剖析一段基于HTML5Canvas的代码,该代码实现了图片的旋转(90度和180度)以及旋转后图片的下载... 目录一、引言二、html 结构分析三、css 样式分析四、JavaScript 功能实现一、引言在 Web 开发中,