kubelet启动pod的源码分析

2024-05-10 17:58
文章标签 分析 源码 启动 pod kubelet

本文主要是介绍kubelet启动pod的源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

如果看过之前的blog,大家都肯定都已经知道kubelet是负责真正启动容器的干活的,其实k8s的代码里面最复杂的其实是kubelet,而不是无脑的大脑apiserver,它基本是依靠etcd完成很多工作。
那么kubelet怎么去启动pod,何时去启动pod呢?我先从kubelet启动的地方说起。
这里写图片描述
先看上面这张图,这张图详细的介绍了kubelet启动pod的三个来源,分别是apiserver、URL和本地文件,篇幅有限我们已最常用的apiserver为例,其实其他两个也是一样的,譬如本地本机就是notify manifest下面下的static pod文件,发现有变化则发动到change的管道。回到apiserver的这个源上面来pkg/kubelet/kubelet.go

if kubeDeps.KubeClient != nil {glog.Infof("Watching apiserver")config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))}

上面的代码是创建apiserver的watch。具体实现在下面
pkg/kubelet/config/apiserver.go

func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))newSourceApiserverFromLW(lw, updates)
}// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {send := func(objs []interface{}) {var pods []*v1.Podfor _, o := range objs {pods = append(pods, o.(*v1.Pod))}updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}}cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}

上面的代码有两个地方需要注意,第一是watch哪些pod呢?答案是所以命名空间下面,和该主机绑定的pod。第二需要注意的是,这里把所以pod的变化都分装到了一个kubetypes.PodUpdate结构体里面,虽然叫做update但其实并并不只是update,Op: kubetypes.SET这个设置在后面还会用到,当然源自然是kubetypes.ApiserverSource。
那么将变化放到update管道之后,接下来怎么处理呢?就是上面图片所说的merge方法去处理。pkg/kubelet/config/config.go
这个merge方法的功能就是通过比较确定到底应该是创建pod还是删除pod等操作主要返回下面这几个值

    addPods := []*v1.Pod{}updatePods := []*v1.Pod{}deletePods := []*v1.Pod{}removePods := []*v1.Pod{}reconcilePods := []*v1.Pod{}

具体到apiserver看下面代码

case kubetypes.SET:glog.V(4).Infof("Setting pods for source %s", source)s.markSourceSet(source)// Clear the old map entries by just creating a new mapoldPods := podspods = make(map[types.UID]*v1.Pod)updatePodsFunc(update.Pods, oldPods, pods)for uid, existing := range oldPods {if _, found := pods[uid]; !found {// this is a deleteremovePods = append(removePods, existing)}}

这里的kubetypes.SET和上面的对应上去了。主要是通过updatePodsFunc去处理,

if existing, found := oldPods[ref.UID]; found {pods[ref.UID] = existingneedUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)if needUpdate {updatePods = append(updatePods, existing)} else if needReconcile {reconcilePods = append(reconcilePods, existing)} else if needGracefulDelete {deletePods = append(deletePods, existing)}continue}recordFirstSeenTime(ref)pods[ref.UID] = refaddPods = append(addPods, ref)

上面的代码很简单,如果发现本地的oldPods已经存在则更新(checkAndUpdatePod去判断具体更新的内容,如果状态不一致则needReconcile,如果DeletionTimestamp不为空则needGracefulDelete其它更新如label、annotation等变化则needUpdate),更新包括删除和同步,如果不存在则加入到addPods的切片中去。
这个这个merge方法就可以返回需要创建、删除或者更新的pods了。
上面图Merge方法,调用merge返回了所以需要操作的pod之后,放入到updates的channel里面

case PodConfigNotificationIncremental:if len(removes.Pods) > 0 {s.updates <- *removes}if len(adds.Pods) > 0 {s.updates <- *adds}if len(updates.Pods) > 0 {s.updates <- *updates}if len(deletes.Pods) > 0 {s.updates <- *deletes}if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 { // Send an empty update when first seeing the source and there are // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that // the source is ready.s.updates <- *adds} // Only add reconcile support here, because kubelet doesn't support Snapshot update now.if len(reconciles.Pods) > 0 {s.updates <- *reconciles}

上面的创建删除更新等都放到s.updates里面。这样就kubelet就把需要变化的内容放到s.updates管道里面。
当kubelet启动的时候cmd/kubelet/app/server.go

go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)

上面的Updates()方法就是获取之前的s.updates。传给kubelet启动的Run方法里面pkg/kubelet/kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {}

那么kubelet里面就可以获取这写需要变化的pod了。然后通过kubelet.go里面syncLoopIteration这个方法去创建

kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)

这个方法有四个输入源,我们上面分析的update只是其中之一,下面的blog将会继续分析启动过程。

这篇关于kubelet启动pod的源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/977154

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

Android里面的Service种类以及启动方式

《Android里面的Service种类以及启动方式》Android中的Service分为前台服务和后台服务,前台服务需要亮身份牌并显示通知,后台服务则有启动方式选择,包括startService和b... 目录一句话总结:一、Service 的两种类型:1. 前台服务(必须亮身份牌)2. 后台服务(偷偷干

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

Windows设置nginx启动端口的方法

《Windows设置nginx启动端口的方法》在服务器配置与开发过程中,nginx作为一款高效的HTTP和反向代理服务器,被广泛应用,而在Windows系统中,合理设置nginx的启动端口,是确保其正... 目录一、为什么要设置 nginx 启动端口二、设置步骤三、常见问题及解决一、为什么要设置 nginx

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat

树莓派启动python的实现方法

《树莓派启动python的实现方法》本文主要介绍了树莓派启动python的实现方法,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录一、RASPBerry系统设置二、使用sandroidsh连接上开发板Raspberry Pi三、运

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

SpringBoot项目启动后自动加载系统配置的多种实现方式

《SpringBoot项目启动后自动加载系统配置的多种实现方式》:本文主要介绍SpringBoot项目启动后自动加载系统配置的多种实现方式,并通过代码示例讲解的非常详细,对大家的学习或工作有一定的... 目录1. 使用 CommandLineRunner实现方式:2. 使用 ApplicationRunne