【K8s源码分析(三)】-K8s调度器调度周期介绍

2024-06-09 16:36

本文主要是介绍【K8s源码分析(三)】-K8s调度器调度周期介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文首发在个人博客上,欢迎来踩!

本次分析参考的K8s版本是v1.27.0。

K8s的整体调度框架如下图所示。
请添加图片描述

调度框架顶层函数

K8s调度器调度的核心函数scheduleronepkg/scheduler/schedule_one.go:62,如下,这里将一些解释写在了注释里

// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {// 获取调度队列中的下一个 Pod 信息podInfo := sched.NextPod()// 如果 podInfo 或者其包含的 Pod 为 nil,说明调度队列关闭或者没有 Pod 需要调度,直接返回if podInfo == nil || podInfo.Pod == nil {return}// 获取 Pod 对象pod := podInfo.Pod// 为当前 Pod 选择一个调度框架(scheduler framework)fwk, err := sched.frameworkForPod(pod)if err != nil {// 这种情况不应该发生,因为我们只接受那些指定了匹配调度器名称的 Pod 进行调度klog.ErrorS(err, "Error occurred")return}// 如果跳过调度,则直接返回if sched.skipPodSchedule(fwk, pod) {return}// 记录尝试调度 Pod 的日志klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))// 开始计时,尝试为 Pod 找到合适的宿主机start := time.Now()// 初始化调度周期状态state := framework.NewCycleState()// 设置是否记录插件指标的随机概率state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)// 初始化一个空的 podsToActivate 结构,这个结构将由插件填充或者保持为空podsToActivate := framework.NewPodsToActivate()// 将 podsToActivate 写入状态中state.Write(framework.PodsToActivateKey, podsToActivate)// 创建一个新的带有取消功能的上下文,用于调度周期schedulingCycleCtx, cancel := context.WithCancel(ctx)defer cancel()// 执行调度周期,尝试为 Pod 找到合适的宿主机scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)// 如果调度失败,则调用失败处理器if !status.IsSuccess() {sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)return}// 异步绑定 Pod 到其宿主机(可以这样做是因为上面的假设步骤)go func() {// 创建一个新的带有取消功能的上下文,用于绑定周期bindingCycleCtx, cancel := context.WithCancel(ctx)defer cancel()// 增加绑定阶段的 goroutine 指标metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()// 执行绑定周期,尝试将 Pod 绑定到宿主机status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)// 如果绑定失败,则处理绑定周期错误if !status.IsSuccess() {sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)}}()
}

这段代码的主要功能是:

  1. 从调度队列中获取下一个要调度的 Pod。
  2. 为 Pod 选择一个调度框架。
  3. 如果配置允许,跳过调度。
  4. 记录日志并开始调度周期。
  5. 如果调度成功,异步地尝试将 Pod 绑定到选定的宿主机。
  6. 如果调度或绑定失败,执行相应的错误处理逻辑。

此处也指明了两个周期,分别为调度周期schedulingCycle和绑定周期bindingCycle,绑定周期会在后面一节进行介绍,这里主要关注schedulingCycle

查看关键的schedulingCycle函数,在pkg/scheduler/schedule_one.go:120中,补充了部分注释。

// schedulingCycle tries to schedule a single Pod.
func (sched *Scheduler) schedulingCycle(ctx context.Context, // 调度上下文state *framework.CycleState, // 调度周期状态fwk framework.Framework, // 调度框架podInfo *framework.QueuedPodInfo, // 待调度的 Pod 信息start time.Time, // 调度开始时间podsToActivate *framework.PodsToActivate, // 待激活的 Pods
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {// 获取待调度的 Podpod := podInfo.Pod// 调用调度器的 SchedulePod 方法尝试调度 PodscheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)if err != nil {// 如果没有可用节点,则返回错误状态if err == ErrNoNodesAvailable {status := framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status}// 如果错误是 FitError 类型,则说明 Pod 无法适应任何节点fitError, ok := err.(*framework.FitError)if !ok {klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, framework.AsStatus(err)}// 如果没有 PostFilter 插件,则不执行抢占if !fwk.HasPostFilterPlugins() {klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")return ScheduleResult{}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)}// 运行 PostFilter 插件,尝试使 Pod 在未来的调度周期中可调度result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)msg := status.Message()fitError.Diagnosis.PostFilterMsg = msgif status.Code() == framework.Error {klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)} else {klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)}// 获取 PostFilter 插件返回的 NominatingInfovar nominatingInfo *framework.NominatingInfoif result != nil {nominatingInfo = result.NominatingInfo}return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)}// 计算并记录调度算法的延迟metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))// 假设 Pod 已经在给定节点上运行,这样子就不用等它实际绑定就可以执行后续的操作了assumedPodInfo := podInfo.DeepCopy()assumedPod := assumedPodInfo.Pod// 假设操作,设置 Pod 的 NodeName 为调度结果推荐的宿主机err = sched.assume(assumedPod, scheduleResult.SuggestedHost)if err != nil {// 如果假设操作失败,这可能是重试逻辑中的一个 BUG// 报告错误以便重新调度 Podreturn ScheduleResult{nominatingInfo: clearNominatedNode},assumedPodInfo,framework.AsStatus(err)}// 运行预留插件的 Reserve 方法if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {// 如果预留失败,触发取消预留以清理与预留 Pod 相关的资源fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")}return ScheduleResult{nominatingInfo: clearNominatedNode},assumedPodInfo,sts}// 运行 "permit" 插件runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {// 如果许可检查失败,触发取消预留以清理与预留 Pod 相关的资源fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")}return ScheduleResult{nominatingInfo: clearNominatedNode},assumedPodInfo,runPermitStatus}// 成功调度周期结束后,查看是否有必要设置一些pod为可调度的状态if len(podsToActivate.Map) != 0 {sched.SchedulingQueue.Activate(podsToActivate.Map)// 激活后清空条目podsToActivate.Map = make(map[string]*v1.Pod)}// 返回调度结果return scheduleResult, assumedPodInfo, nil
}

主要流程包括:

  1. 尝试调度 Pod,并处理可能出现的错误。
  2. 如果调度失败,根据错误类型执行不同的逻辑,如处理节点不可用或 Pod 不适应任何节点的情况。
  3. 如果调度成功,记录调度算法的延迟,并提前假设 Pod 已经在推荐的节点上运行。
  4. 运行预留插件的 Reserve 方法,并处理预留成功或失败的情况。
  5. 运行抢占插件,并根据结果进行相应的处理。
  6. 如果有待转为active的 Pods,执行激活操作。
  7. 返回调度结果。

一般调度

这里最关键的是SchedulePod函数,在pkg/scheduler/schedule_one.go:334

// schedulePod tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError with reasons.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})defer trace.LogIfLong(100 * time.Millisecond)if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {return result, err}trace.Step("Snapshotting scheduler cache and node infos done")if sched.nodeInfoSnapshot.NumNodes() == 0 {return result, ErrNoNodesAvailable}feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)if err != nil {return result, err}trace.Step("Computing predicates done")if len(feasibleNodes) == 0 {return result, &framework.FitError{Pod:         pod,NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),Diagnosis:   diagnosis,}}// When only one node after predicate, just use it.if len(feasibleNodes) == 1 {return ScheduleResult{SuggestedHost:  feasibleNodes[0].Name,EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),FeasibleNodes:  1,}, nil}priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)if err != nil {return result, err}host, err := selectHost(priorityList)trace.Step("Prioritizing done")return ScheduleResult{SuggestedHost:  host,EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),FeasibleNodes:  len(feasibleNodes),}, err
}

在这里我们就能具体的看到predicates筛选过程和Prioritizing打分过程,整体的逻辑也比较简单,首先是筛选出合适的node,如果只有一个node了,那么就直接返回这个node,如果有多个就进行打分,然后选择评分最高的node返回回去。

筛选过程

然后我们查看predicates筛选过程,其代码在pkg/scheduler/schedule_one.go:387中,如下,补充了一些注释

// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {// 初始化诊断信息,用于记录调度过程中的详细信息diagnosis := framework.Diagnosis{NodeToStatusMap:      make(framework.NodeToStatusMap),UnschedulablePlugins: sets.NewString(),}// 获取所有节点的信息allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()if err != nil {return nil, diagnosis, err}// 运行 "prefilter" 插件preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)if !s.IsSuccess() {if !s.IsUnschedulable() {return nil, diagnosis, s.AsError()}// 如果 PreFilter 插件返回的状态是不可调度的,记录相关信息msg := s.Message()diagnosis.PreFilterMsg = msgklog.V(5).InfoS("Status after running PreFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)// 如果有插件失败,记录失败的插件名称if s.FailedPlugin() != "" {diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())}return nil, diagnosis, nil}// 如果 Pod 已经被提名到一个节点上(可能由于之前的抢占操作),// 这个节点很可能是唯一一个合适的节点,所以首先评估这个节点if len(pod.Status.NominatedNodeName) > 0 {feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)if err != nil {klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)}// 如果提名的节点通过了所有的过滤,调度器可以决定将这个节点分配给 Podif len(feasibleNodes) != 0 {return feasibleNodes, diagnosis, nil}}// 根据 PreFilter 插件的结果,可能需要过滤掉一些节点nodes := allNodesif !preRes.AllNodes() {nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))for n := range preRes.NodeNames {nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)if err != nil {return nil, diagnosis, err}nodes = append(nodes, nInfo)}}// 寻找通过过滤的节点feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)// 无论是否发生错误,都尝试更新下一次开始搜索节点的索引processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)if err != nil {return nil, diagnosis, err}// 检查过滤扩展器以找到更多通过过滤的节点feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)if err != nil {return nil, diagnosis, err}// 返回所有通过过滤的节点return feasibleNodes, diagnosis, nil
}

这部分首先运行preFilter插件首先进行一些轻量级的检查,然后再运行filter插件进行正式筛选,然后在运行filter拓展插件。

这里我们主要关注filter插件的运行,查看其对应的findNodesThatPassFilters函数,在pkg/scheduler/schedule_one.go:475中,如下,补充了部分注释

// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (sched *Scheduler) findNodesThatPassFilters(ctx context.Context, // 调度上下文fwk framework.Framework, // 调度框架state *framework.CycleState, // 调度周期状态pod *v1.Pod, // 待调度的 Poddiagnosis framework.Diagnosis, // 调度诊断信息nodes []*framework.NodeInfo) ([]*v1.Node, error) { // 所有节点信息numAllNodes := len(nodes) // 所有节点的数量// 计算应该找到的可行节点数量numNodesToFind := sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes))// 创建一个足够大的列表来存储通过过滤的节点,以避免在运行时增长该列表feasibleNodes := make([]*v1.Node, numNodesToFind)// 如果框架没有过滤插件,直接使用所有节点if !fwk.HasFilterPlugins() {for i := range feasibleNodes {// 从上一个调度周期停止的地方开始检查节点feasibleNodes[i] = nodes[(sched.nextStartNodeIndex+i)%numAllNodes].Node()}return feasibleNodes, nil}// 用于并行处理时的错误通道errCh := parallelize.NewErrorChannel()var statusesLock sync.Mutex // 用于保护对诊断信息的并发访问var feasibleNodesLen int32 // 通过过滤的节点数量ctx, cancel := context.WithCancel(ctx) // 创建一个可取消的上下文defer cancel()// 检查每个节点是否通过过滤checkNode := func(i int) {nodeInfo := nodes[(sched.nextStartNodeIndex+i)%numAllNodes] // 获取节点信息status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) // 运行过滤插件if status.Code() == framework.Error {errCh.SendErrorWithCancel(status.AsError(), cancel) // 发送错误并可能取消整个操作return}if status.IsSuccess() {// 如果节点通过过滤,将其添加到可行节点列表中length := atomic.AddInt32(&feasibleNodesLen, 1)if length > numNodesToFind {cancel() // 如果找到的节点超过了预定数量,取消剩余的检查atomic.AddInt32(&feasibleNodesLen, -1)} else {feasibleNodes[length-1] = nodeInfo.Node()}} else {// 如果节点没有通过过滤,记录其状态statusesLock.Lock()diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = statusdiagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())statusesLock.Unlock()}}// 记录开始检查节点的时间beginCheckNode := time.Now()statusCode := framework.Successdefer func() {// 记录 Filter 扩展点的延迟metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))}()// 并行检查所有节点,直到找到预定数量的可行节点或检查完所有节点fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, metrics.Filter)// 截断可行节点列表到实际找到的节点数量feasibleNodes = feasibleNodes[:feasibleNodesLen]if err := errCh.ReceiveError(); err != nil {statusCode = framework.Errorreturn feasibleNodes, err}return feasibleNodes, nil
}

注意到这里首先计算了需要筛选的node的数量,这主要是为了在大规模场景下降低筛选的数量,查看其对应的函数,在pkg/scheduler/schedule_one.go:548中,如下,补充了部分注释。

// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
// its search for more feasible nodes.
func (sched *Scheduler) numFeasibleNodesToFind(percentageOfNodesToScore *int32, numAllNodes int32) (numNodes int32) {if numAllNodes < minFeasibleNodesToFind {// 如果所有节点的数量小于预设的最小可行节点数,则返回所有节点的数量return numAllNodes}// 使用框架(profile)中设置的百分比,如果没有设置,则使用全局的百分比var percentage int32if percentageOfNodesToScore != nil {percentage = *percentageOfNodesToScore} else {percentage = sched.percentageOfNodesToScore}if percentage == 0 {// 如果没有提供百分比,则使用默认的计算方式percentage = int32(50) - numAllNodes/125if percentage < minFeasibleNodesPercentageToFind {// 确保百分比不低于预设的最小值percentage = minFeasibleNodesPercentageToFind}}// 计算基于总节点数和百分比的节点数numNodes = numAllNodes * percentage / 100if numNodes < minFeasibleNodesToFind {// 如果计算出的节点数小于最小可行节点数,则返回最小值return minFeasibleNodesToFind}// 返回计算出的可行节点数return numNodes
}

然后定义了内部的checkNode函数,其输入是要检查的node 的id相对于sched.nextStartNodeIndex的偏移。注意这里使用了k8s内部定义的并行函数fwk.Parallelizer().Until,其定义如下,在pkg/scheduler/framework/parallelize/parallelism.go:56staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:46中:

// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
// A given operation will be a label that is recorded in the goroutine metric.
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) {goroutinesMetric := metrics.Goroutines.WithLabelValues(operation)withMetrics := func(piece int) {goroutinesMetric.Inc()doWorkPiece(piece)goroutinesMetric.Dec()}workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, withMetrics, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism)))
}
// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) {if pieces == 0 {return}o := options{}for _, opt := range opts {opt(&o)}chunkSize := o.chunkSizeif chunkSize < 1 {chunkSize = 1}chunks := ceilDiv(pieces, chunkSize)toProcess := make(chan int, chunks)for i := 0; i < chunks; i++ {toProcess <- i}close(toProcess)var stop <-chan struct{}if ctx != nil {stop = ctx.Done()}if chunks < workers {workers = chunks}wg := sync.WaitGroup{}wg.Add(workers)for i := 0; i < workers; i++ {go func() {defer utilruntime.HandleCrash()defer wg.Done()for chunk := range toProcess {start := chunk * chunkSizeend := start + chunkSizeif end > pieces {end = pieces}for p := start; p < end; p++ {select {case <-stop:returndefault:doWorkPiece(p)}}}}()}wg.Wait()
}

checkNode函数内部检查对应的node是否能通过所有filter插件的过滤(RunFilterPluginsWithNominatedPods)如果通过就将筛选过的node数量+1,并记录相关的值,同时还会检查是否已经筛选到了足够的node,如果足够了,那么就发送取消信号,停止并行进程,不再继续筛选。

对于每个node进行筛选的函数RunFilterPluginsWithNominatedPodspkg/scheduler/framework/runtime/framework.go:816中,如下

func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, // 调度上下文state *framework.CycleState, // 当前周期状态pod *v1.Pod, // 待调度的 Podinfo *framework.NodeInfo, // 节点信息
) *framework.Status {var status *framework.StatuspodsAdded := false// We run filters twice in some cases. If the node has greater or equal priority// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.// If all filters succeed in this pass, we run them again when these// nominated pods are not added. This second pass is necessary because some// filters such as inter-pod affinity may not pass without the nominated pods.// If there are no nominated pods for the node or if the first run of the// filters fail, we don't run the second pass.// We consider only equal or higher priority pods in the first pass, because// those are the current "pod" must yield to them and not take a space opened// for running them. It is ok if the current "pod" take resources freed for// lower priority pods.// Requiring that the new pod is schedulable in both circumstances ensures that// we are making a conservative decision: filters like resources and inter-pod// anti-affinity are more likely to fail when the nominated pods are treated// as running, while filters like pod affinity are more likely to fail when// the nominated pods are treated as not running. We can't just assume the// nominated pods are running because they are not running right now and in fact,// they may end up getting scheduled to a different node.// 我们可能需要两次运行过滤插件。如果节点上有优先级更高或相等的被提名的 Pods,// 我们会在这些 Pods 被添加到 PreFilter 状态和 nodeInfo 时运行它们。// 如果所有过滤插件在这一轮通过,我们会在这些被提名的 Pods 没有被添加的情况下再次运行它们。// 第二轮运行是必要的,因为一些过滤插件(如 Pod 亲和性)可能在没有被提名的 Pods 的情况下无法通过。// 如果节点没有被提名的 Pods 或者第一轮过滤插件失败,我们不会进行第二轮。// 我们只考虑第一轮中优先级相等或更高的 Pods,因为当前的 "pod" 必须为它们让路,而不是占用为它们运行而开放的空间。// 如果当前的 "pod" 占用了为低优先级 Pods 释放的资源,这是可以的。// 要求新的 Pod 在这两种情况下都是可调度的,确保我们做出的是保守的决定:// 像资源和 Pod 反亲和性这样的过滤器在将被提名的 Pods 视为运行时更有可能失败,// 而像 Pod 亲和性这样的过滤器在将被提名的 Pods 视为未运行时更有可能失败。// 我们不能仅仅假设被提名的 Pods 正在运行,因为它们现在并没有运行,事实上,// 它们最终可能会被调度到一个不同的节点上。for i := 0; i < 2; i++ {stateToUse := statenodeInfoToUse := infoif i == 0 {// 第一轮:添加被提名的 Pods 到周期状态和节点信息var err errorpodsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)if err != nil {return framework.AsStatus(err)}} else if !podsAdded || !status.IsSuccess() {break}// 运行过滤插件status = f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)if !status.IsSuccess() && !status.IsUnschedulable() {return status}}return status
}

注意到这里执行了两遍筛选,主要是考虑到这个node上面可能存在一些预计要被调度过来的pod,在第一轮中会假设这些pod真的会被调度过来,然后查看是否满足pod筛选需求,在第二列会假设这些pod最后没有被调度过来,然后检查是否满足pod的筛选需求。因为在第一轮中可能会存在反亲和性要求,导致无法通过筛选,在第二轮中可能会存在亲和性要求,导致无法通过筛选,这是一种很保守的筛选方式。

利用各个插件进行筛选的函数(RunFilterPlugins)在pkg/scheduler/framework/runtime/framework.go:725中,如下

// RunFilterPlugins runs the set of configured Filter plugins for pod on
// the given node. If any of these plugins doesn't return "Success", the
// given node is not suitable for running pod.
// Meanwhile, the failure message and status are set for the given node.
func (f *frameworkImpl) RunFilterPlugins(ctx context.Context,state *framework.CycleState,pod *v1.Pod,nodeInfo *framework.NodeInfo,
) *framework.Status {for _, pl := range f.filterPlugins {if state.SkipFilterPlugins.Has(pl.Name()) {continue}metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.Filter, f.profileName).Inc()if status := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo); !status.IsSuccess() {if !status.IsUnschedulable() {// Filter plugins are not supposed to return any status other than// Success or Unschedulable.status = framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), status.AsError()))}status.SetFailedPlugin(pl.Name())return status}}return nil
}

这里的逻辑很简单,就是遍历各个筛选的插件,依次检查是否符合要求。

可以继续看runFilterPlugin这运行一个筛选插件进行检查的函数,在pkg/scheduler/framework/runtime/framework.go:750中。

func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {if !state.ShouldRecordPluginMetrics() {return pl.Filter(ctx, state, pod, nodeInfo)}startTime := time.Now()status := pl.Filter(ctx, state, pod, nodeInfo)f.metricsRecorder.ObservePluginDurationAsync(metrics.Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))return status
}

主要也就是调用插件的Filter函数,具体插件的介绍后面再补充。

打分过程

打分的函数prioritizeNodespkg/scheduler/schedule_one.go 中,如下,补充了部分注释

func prioritizeNodes(ctx context.Context,extenders []framework.Extender,fwk framework.Framework,state *framework.CycleState,pod *v1.Pod,nodes []*v1.Node,
) ([]framework.NodePluginScores, error) {// 如果没有提供优先级配置,则所有节点的分数都设为 1。// 这是为了在所需的格式中生成优先级列表if len(extenders) == 0 && !fwk.HasScorePlugins() {result := make([]framework.NodePluginScores, 0, len(nodes))for i := range nodes {result = append(result, framework.NodePluginScores{Name:       nodes[i].Name,TotalScore: 1,})}return result, nil}// 运行 PreScore 插件。preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)if !preScoreStatus.IsSuccess() {return nil, preScoreStatus.AsError()}// 运行 Score 插件。nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)if !scoreStatus.IsSuccess() {return nil, scoreStatus.AsError()}// 如果启用了详细日志记录,记录每个插件对每个节点的打分klogV := klog.V(10)if klogV.Enabled() {for _, nodeScore := range nodesScores {for _, pluginScore := range nodeScore.Scores {klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", pluginScore.Name, "node", nodeScore.Name, "score", pluginScore.Score)}}}// 如果有扩展器并且有节点,运行扩展器if len(extenders) != 0 && nodes != nil {allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))var mu sync.Mutexvar wg sync.WaitGroup// 并发运行每个扩展器的优先级函数for i := range extenders {if !extenders[i].IsInterested(pod) {continue}wg.Add(1)go func(extIndex int) {defer wg.Done()metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()defer func() {metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()}()prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)if err != nil {klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())return}mu.Lock()defer mu.Unlock()for i := range *prioritizedList {nodename := (*prioritizedList)[i].Hostscore := (*prioritizedList)[i].ScoreklogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)// 将扩展器的分数转换为调度器使用的分数范围finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)if allNodeExtendersScores[nodename] == nil {allNodeExtendersScores[nodename] = &framework.NodePluginScores{Name:   nodename,Scores: make([]framework.PluginScore, 0, len(extenders)),}}allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{Name:  extenders[extIndex].Name(),Score: finalscore,})allNodeExtendersScores[nodename].TotalScore += finalscore}}(i)}wg.Wait() // 等待所有扩展器完成// 将扩展器的分数添加到节点分数中for i := range nodesScores {if score, ok := allNodeExtendersScores[nodes[i].Name]; ok {nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...)nodesScores[i].TotalScore += score.TotalScore}}}// 记录每个节点的最终分数if klogV.Enabled() {for i := range nodesScores {klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore)}}return nodesScores, nil
}

主要流程包括:

  1. 如果没有提供任何扩展器或打分插件,则为所有节点设置默认分数,并返回。
  2. 运行 PreScore 插件,为打分阶段做准备。
  3. 运行 Score 插件,获取每个节点的分数。
  4. 如果有扩展器并且有节点,则并发运行每个扩展器的优先级函数,获取扩展器为节点分配的分数。
  5. 将扩展器的分数转换为调度器使用的分数范围,并添加到节点分数中。
  6. 记录每个节点的最终分数。

这里补充一下其记录节点分数的结构体NodePluginScores,在文件pkg/scheduler/framework/interface.go:55中,其定义如下:

// NodePluginScores is a struct with node name and scores for that node.
type NodePluginScores struct {// Name is node name.Name string// Scores is scores from plugins and extenders.Scores []PluginScore// TotalScore is the total score in Scores.TotalScore int64
}// PluginScore is a struct with plugin/extender name and score.
type PluginScore struct {// Name is the name of plugin or extender.Name  stringScore int64
}

可以看到每个插件给node打分都是一个int64的类型,一个节点可能会被多个插件进行打分,最后再汇总。

再回到插件打分,这里我们主要关注关键的打分插件RunScorePlugins ,在pkg/scheduler/framework/runtime/framework.go:931中,如下,补充了部分注释

func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ns []framework.NodePluginScores, status *framework.Status) {startTime := time.Now()defer func() {// 记录打分扩展点的持续时间metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))}()allNodePluginScores := make([]framework.NodePluginScores, len(nodes))numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len()plugins := make([]framework.ScorePlugin, 0, numPlugins)pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins)// 为每个插件创建一个节点分数列表for _, pl := range f.scorePlugins {if state.SkipScorePlugins.Has(pl.Name()) {continue}plugins = append(plugins, pl)pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))}ctx, cancel := context.WithCancel(ctx)defer cancel()errCh := parallelize.NewErrorChannel()if len(plugins) > 0 {// 并行地为每个节点运行每个插件的 Score 方法f.Parallelizer().Until(ctx, len(nodes), func(index int) {nodeName := nodes[index].Namefor _, pl := range plugins {s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)if !status.IsSuccess() {err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())errCh.SendErrorWithCancel(err, cancel)return}pluginToNodeScores[pl.Name()][index] = framework.NodeScore{Name:  nodeName,Score: s,}}}, metrics.Score)if err := errCh.ReceiveError(); err != nil {return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))}}// 并行地为每个打分插件运行 NormalizeScore 方法f.Parallelizer().Until(ctx, len(plugins), func(index int) {pl := plugins[index]if pl.ScoreExtensions() == nil {return}nodeScoreList := pluginToNodeScores[pl.Name()]status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)if !status.IsSuccess() {err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())errCh.SendErrorWithCancel(err, cancel)return}}, metrics.Score)if err := errCh.ReceiveError(); err != nil {return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))}// 并行地为每个打分插件应用分数权重,并构建 allNodePluginScoresf.Parallelizer().Until(ctx, len(nodes), func(index int) {nodePluginScores := framework.NodePluginScores{Name:   nodes[index].Name,Scores: make([]framework.PluginScore, len(plugins)),}for i, pl := range plugins {weight := f.scorePluginWeight[pl.Name()]nodeScoreList := pluginToNodeScores[pl.Name()]score := nodeScoreList[index].Scoreif score > framework.MaxNodeScore || score < framework.MinNodeScore {err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), score, framework.MinNodeScore, framework.MaxNodeScore)errCh.SendErrorWithCancel(err, cancel)return}weightedScore := score * int64(weight)nodePluginScores.Scores[i] = framework.PluginScore{Name:  pl.Name(),Score: weightedScore,}nodePluginScores.TotalScore += weightedScore}allNodePluginScores[index] = nodePluginScores}, metrics.Score)if err := errCh.ReceiveError(); err != nil {return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))}// 返回所有节点的插件分数return allNodePluginScores, nil
}

主要流程包括:

  1. 为每个插件创建一个节点分数列表。
  2. 使用并行处理为每个节点运行每个插件的 Score 方法。
  3. 为每个插件运行 NormalizeScore 方法,以标准化分数。
  4. 应用每个插件的分数权重,构建最终的节点分数。
  5. 返回各个节点的分数

查看插件打分的函数runScorePlugin,在pkg/scheduler/framework/runtime/framework.go:1025 中,如下。

func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {if !state.ShouldRecordPluginMetrics() {return pl.Score(ctx, state, pod, nodeName)}startTime := time.Now()s, status := pl.Score(ctx, state, pod, nodeName)f.metricsRecorder.ObservePluginDurationAsync(metrics.Score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))return s, status
}

可以看到主要是调用插件的Score方法。

一般调度的后期处理

PostFilter插件

schedulingCycle中可以看到如果上述的一般调度没有为Pod找到合适的node,并且错误不是没有合适的node,即ErrNoNodesAvailable 的话,就会检查是否存在有PostFilterPlugins,如果有就运行,即运行RunPostFilterPlugins函数,来进行相关的处理,例如释放一些资源,从而希望使得该pod在下一次调度时有机会成功调度,当然这被释放的资源也可能被其他不同的pod给占用了,但是这对系统是无害的,所以也不管。

RunPostFilterPlugins函数在pkg/scheduler/framework/runtime/framework.go:762中,如下所示

// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
// Success, Error or UnschedulableAndUnresolvable is met; otherwise continues to execute all plugins.
func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) {startTime := time.Now()defer func() {metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))}()// `result` records the last meaningful(non-noop) PostFilterResult.var result *framework.PostFilterResultvar reasons []stringvar failedPlugin stringfor _, pl := range f.postFilterPlugins {r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)if s.IsSuccess() {return r, s} else if s.Code() == framework.UnschedulableAndUnresolvable {return r, s.WithFailedPlugin(pl.Name())} else if !s.IsUnschedulable() {// Any status other than Success, Unschedulable or UnschedulableAndUnresolvable is Error.return nil, framework.AsStatus(s.AsError()).WithFailedPlugin(pl.Name())} else if r != nil && r.Mode() != framework.ModeNoop {result = r}reasons = append(reasons, s.Reasons()...)// Record the first failed plugin unless we proved that// the latter is more relevant.if len(failedPlugin) == 0 {failedPlugin = pl.Name()}}return result, framework.NewStatus(framework.Unschedulable, reasons...).WithFailedPlugin(failedPlugin)
}

可以看到他就是遍历了所有的postFilter插件,然后使用函数runPostFilterPlugin运行这些插件,其在pkg/scheduler/framework/runtime/framework.go:796

func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {if !state.ShouldRecordPluginMetrics() {return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)}startTime := time.Now()r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)f.metricsRecorder.ObservePluginDurationAsync(metrics.PostFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime))return r, s
}

Reserve插件

得到想要调度到的pod后,可能需要执行一些资源预留的操作,就需要定义在reserve插件中,该插件对应的调用函数为RunReservePluginsReserve,在pkg/scheduler/framework/runtime/framework.go:1144

// RunReservePluginsReserve runs the Reserve method in the set of configured
// reserve plugins. If any of these plugins returns an error, it does not
// continue running the remaining ones and returns the error. In such a case,
// the pod will not be scheduled and the caller will be expected to call
// RunReservePluginsUnreserve.
func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {startTime := time.Now()defer func() {metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))}()for _, pl := range f.reservePlugins {status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)if !status.IsSuccess() {err := status.AsError()klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))}}return nil
}

这里也是遍历所有的reserve插件,如果有任意一个插件失败了那么就失败了。单个插件的调用函数在pkg/scheduler/framework/runtime/framework.go:1160中,如下

func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framework.ReservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {if !state.ShouldRecordPluginMetrics() {return pl.Reserve(ctx, state, pod, nodeName)}startTime := time.Now()status := pl.Reserve(ctx, state, pod, nodeName)f.metricsRecorder.ObservePluginDurationAsync(metrics.Reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))return status
}

Permit插件

找到了要调度的pod后还需要运行permit插件,该插件主要用来查看记录是否还需要等待一下其他操作,例如抢占某个pod的资源,那么就需要等待被抢占pod的资源释放掉。

该插件对应的函数RunPermitPluginspkg/scheduler/framework/runtime/framework.go:1200中,如下

// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will create and add waiting pod
// to a map of currently waiting pods and return status with "Wait" code.
// Pod will remain waiting pod for the minimum duration returned by the permit plugins.
func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {startTime := time.Now() // 记录permit插件开始运行的时间defer func() {// 记录permit插件的运行时间和最终状态metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))}()pluginsWaitTime := make(map[string]time.Duration) // 存储每个插件的等待时间statusCode := framework.Success // 初始化状态码为成功for _, pl := range f.permitPlugins {// 运行当前permit插件status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)if !status.IsSuccess() {if status.IsUnschedulable() {// 如果插件返回不可调度的状态,则记录日志并返回该状态klog.V(4).InfoS("Pod rejected by permit plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())status.SetFailedPlugin(pl.Name()) // 设置失败的插件名称return status}if status.IsWait() {// 如果插件返回等待的状态,则记录等待时间,但不立即返回// 允许的最长等待时间由 maxTimeout 限制if timeout > maxTimeout {timeout = maxTimeout}pluginsWaitTime[pl.Name()] = timeoutstatusCode = framework.Wait // 更新状态码为等待} else {// 如果插件返回错误状态,则记录错误日志并返回错误状态err := status.AsError()klog.ErrorS(err, "Failed running Permit plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name())}}}if statusCode == framework.Wait {// 如果任何插件返回等待状态,则创建并添加等待中的 Pod 到映射中,并返回等待状态waitingPod := newWaitingPod(pod, pluginsWaitTime)f.waitingPods.add(waitingPod)msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)klog.V(4).InfoS("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod))return framework.NewStatus(framework.Wait, msg)}// 如果所有插件都成功或返回等待,且没有插件拒绝 Pod,则返回 nil 表示没有错误return nil
}

主要流程包括:

  1. 记录开始运行许可插件的时间。
  2. 使用 defer 语句确保无论函数如何结束,都记录许可插件的运行时间和状态。
  3. 遍历所有的permit插件。
  4. 运行当前插件,并将结果状态保存到 status
  5. 检查状态:
    • 如果状态是成功的,则继续运行下一个插件。
    • 如果状态是不可调度的,则记录日志并返回该状态。
    • 如果状态是等待的,则记录等待时间,并更新状态码为等待,然后继续运行下一个插件。
    • 如果状态是错误,则记录错误日志,并返回错误状态。
  6. 如果任何插件返回等待状态,则创建等待中的 Pod 并添加到映射中,然后返回等待状态。
  7. 如果所有插件都成功或返回等待,且没有插件拒绝 Pod,则返回 nil

这篇关于【K8s源码分析(三)】-K8s调度器调度周期介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1045717

相关文章

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}

90、k8s之secret+configMap

一、secret配置管理 配置管理: 加密配置:保存密码,token,其他敏感信息的k8s资源 应用配置:我们需要定制化的给应用进行配置,我们需要把定制好的配置文件同步到pod当中容器 1.1、加密配置: secret: [root@master01 ~]# kubectl get secrets ##查看加密配置[root@master01 ~]# kubectl get se

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get