kubelet源码学习(一):kubelet工作原理、kubelet启动过程

2023-12-25 02:36

本文主要是介绍kubelet源码学习(一):kubelet工作原理、kubelet启动过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文基于Kubernetes v1.22.4版本进行源码学习

1、kubelet工作原理

1)、kubelet核心工作

kubelet的工作核心就是一个控制循环,即:SyncLoop(图中的大圆圈)。而驱动这个控制循环运行的事件,包括:Pod更新事件、Pod生命周期变化、kubelet本身设置的执行周期、定时的清理事件

kubelet还负责维护着很多其他的子控制循环(也就是图中的小圆圈),叫做xxxManager,比如:probeManager会定时去监控Pod中容器的健康状况,当前支持两种类型的探针:livenessProbe和readinessProbe;statusManager负责维护状态信息,并把Pod状态更新到APIServer;containerRefManager是容器引用的管理,用来报告容器的创建、失败等事件

2)、CRI与容器运行时

kubelet调用下层容器运行时的执行过程,并不会直接调用Docker的API,而是通过一组叫作CRI(Container Runtime Interface,容器运行时接口)的gRPC接口来间接执行的

CRI shim负责响应CRI请求,扮演kubelet与容器项目之间的垫片(shim)。CRI shim实现了CRI规定的每个接口,然后把具体的CRI请求翻译成对后端容器项目的请求或者操作

每一种容器项目都可以自己实现一个CRI shim,自行对CRI请求进行处理,这样,Kubernetes就有了一个统一的容器抽象层,使得下层容器运行时可以自由地对接进入Kubernetes当中

如果使用Docker的话,dockershim负责处理CRI请求,然后组装成Docker API请求发给Docker Daemon

CRI接口可以分为两组:

  • RuntimeService:主要是跟容器相关的操作,比如创建、启动、删除容器,执行exec命令等
  • ImageManagerService:主要是容器镜像相关的操作,比如拉取镜像、删除镜像等

CRI接口核心方法如下图:

CRD设计的一个重要原则,就是确保这个接口本身,只关注容器,不关注Pod,在CRI的设计里并没有一个直接创建Pod或者启动Pod的接口

PodSandboxManager中包含RunPodSandbox方法,这个PodSandbox对应的并不是Kubernetes里的Pod API对象,而只是抽取了Pod里的一部分与容器运行时相关的字段,比如HostName、DnsConfig、CgroupParent等。所以说,PodSandbox描述的其实是Kubernetes将Pod这个概念映射到容器运行时层面所需要的字段,或者说是一个Pod对象子集

比如,当执行kubectl run创建了一个名叫foo的、包括了A、B两个容器的Pod之后。如果是Docker项目,dockershim就会创建出一个名叫foo的Infra容器(pause容器)用来hold住整个Pod的Network Namespace

2、kubelet启动过程

pkg/kubelet/kubelet.goRun()方法启动了kubelet各个模块,代码如下:

// pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {if kl.logServer == nil {kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))}if kl.kubeClient == nil {klog.InfoS("No API server defined - no node status update will be sent")}// Start the cloud provider sync managerif kl.cloudResourceSyncManager != nil {go kl.cloudResourceSyncManager.Run(wait.NeverStop)}// 启动不依赖container runtime的一些模块if err := kl.initializeModules(); err != nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.ErrorS(err, "Failed to initialize internal modules")os.Exit(1)}// Start volume manager// 启动volume managergo kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)if kl.kubeClient != nil {// Start syncing node status immediately, this may set up things the runtime needs to run.// 定时同步node状态go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)// 更新pod CIDR、runtime状态以及执行首次node状态同步go kl.fastStatusUpdateOnce()// start syncing lease// 启动node lease机制go kl.nodeLeaseController.Run(wait.NeverStop)}// 定时更新runtime状态go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)// Set up iptables util rulesif kl.makeIPTablesUtilChains {kl.initNetworkUtil()}// Start component sync loops.// 启动statusManagerkl.statusManager.Start()// Start syncing RuntimeClasses if enabled.// 启动runtimeClassManagerif kl.runtimeClassManager != nil {kl.runtimeClassManager.Start(wait.NeverStop)}// Start the pod lifecycle event generator.// 启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态kl.pleg.Start()// 启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作kl.syncLoop(updates, kl)
}

Run()方法主要逻辑如下:

  1. 调用kl.initializeModules()方法,启动不依赖container runtime的一些模块
  2. 启动volume manager
  3. 定时同步node状态
  4. 调用kl.fastStatusUpdateOnce()方法,更新pod CIDR、runtime状态以及执行首次node状态同步
  5. 启动node lease机制,同步节点租约
  6. 定时执行kl.updateRuntimeUp()方法,更新runtime状态
  7. 启动statusManager、runtimeClassManager
  8. 调用kl.pleg.Start()方法,启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态
  9. 调用kl.syncLoop()方法,启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作
1)、initializeModules()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) initializeModules() error {// Prometheus metrics.metrics.Register(collectors.NewVolumeStatsCollector(kl),collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),)metrics.SetNodeName(kl.nodeName)servermetrics.Register()// Setup filesystem directories.if err := kl.setupDataDirs(); err != nil {return err}// If the container logs directory does not exist, create it.if _, err := os.Stat(ContainerLogsDir); err != nil {if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)}}// Start the image manager.// 启动imageManagerkl.imageManager.Start()// Start the certificate manager if it was enabled.// 启动certificateManager,证书相关if kl.serverCertificateManager != nil {kl.serverCertificateManager.Start()}// Start out of memory watcher.// 启动oomWatcherif kl.oomWatcher != nil {if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {return fmt.Errorf("failed to start OOM watcher: %w", err)}}// Start resource analyzer// 启动resource analyzer,刷新volume stats到缓存中kl.resourceAnalyzer.Start()return nil
}

initializeModules()方法主要逻辑如下:

  1. 启动imageManager,实际上是realImageGCManager
  2. 启动certificateManager,证书相关
  3. 启动oomWatcher
  4. 启动resource analyzer,刷新volume stats到缓存中

kl.imageManager.Start()方法代码如下:

// pkg/kubelet/images/image_gc_manager.go
func (im *realImageGCManager) Start() {go wait.Until(func() {// Initial detection make detected time "unknown" in the past.var ts time.Timeif im.initialized {ts = time.Now()}// 找出所有的image,并删除不再使用的image_, err := im.detectImages(ts)if err != nil {klog.InfoS("Failed to monitor images", "err", err)} else {im.initialized = true}}, 5*time.Minute, wait.NeverStop)// Start a goroutine periodically updates image cache.// 更新image的缓存go wait.Until(func() {// 调用CRI接口,获取最新的imageimages, err := im.runtime.ListImages()if err != nil {klog.InfoS("Failed to update image list", "err", err)} else {im.imageCache.set(images)}}, 30*time.Second, wait.NeverStop)}

realImageGCManager的Start()方法启动两个协程

  1. 定时调用detectImages()方法,会找出所有正在使用的image,然后删除不再使用的image
  2. 定时获取最新的image,调用imageCache()方法更新image的缓存
2)、fastStatusUpdateOnce()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) fastStatusUpdateOnce() {for {time.Sleep(100 * time.Millisecond)node, err := kl.GetNode()if err != nil {klog.ErrorS(err, "Error getting node")continue}if len(node.Spec.PodCIDRs) != 0 {podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")if _, err := kl.updatePodCIDR(podCIDRs); err != nil {klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)continue}// 更新runtime状态kl.updateRuntimeUp()// node状态同步kl.syncNodeStatus()return}}
}

fastStatusUpdateOnce()方法启动一个循环,尝试立即更新pod CIDR。更新pod CIDR后,会更新runtime状态并同步node状态。该方法在一次成功的node状态同步后直接返回,仅在kubelet启动期间执行

kl.updateRuntimeUp()方法代码如下:

// pkg/kubelet/kubelet.go
// 首次执行的时候会初始化runtime依赖模块,然后更新runtimeState
func (kl *Kubelet) updateRuntimeUp() {kl.updateRuntimeMux.Lock()defer kl.updateRuntimeMux.Unlock()// 获取containerRuntime状态s, err := kl.containerRuntime.Status()if err != nil {klog.ErrorS(err, "Container runtime sanity check failed")return}if s == nil {klog.ErrorS(nil, "Container runtime status is nil")return}// Periodically log the whole runtime status for debugging.klog.V(4).InfoS("Container runtime status", "status", s)networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)// 检查network和runtime是否处于ready状态if networkReady == nil || !networkReady.Status {klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))} else {// Set nil if the container runtime network is ready.kl.runtimeState.setNetworkState(nil)}// information in RuntimeReady condition will be propagated to NodeReady condition.// 获取运行时状态runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)// If RuntimeReady is not set or is false, report an error.if runtimeReady == nil || !runtimeReady.Status {klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))return}kl.runtimeState.setRuntimeState(nil)// 调用kl.initializeRuntimeDependentModules初始化依赖模块kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)kl.runtimeState.setRuntimeSync(kl.clock.Now())
}

updateRuntimeUp()方法会获取containerRuntime状态信息,然后根据返回containerRuntime状态检查网络、runtime是不是已经处于ready状态;接着调用kl.initializeRuntimeDependentModules()方法初始化依赖模块,这里会启动cadvisor、containerManager、evictionManager、containerLogManager、pluginManager、shutdownManager;最后设置runtime同步时间

3)、syncLoop()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {klog.InfoS("Starting kubelet main sync loop")// The syncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s.syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop()housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()plegCh := kl.pleg.Watch()const (base   = 100 * time.Millisecondmax    = 5 * time.Secondfactor = 2)duration := base// Responsible for checking limits in resolv.conf// The limits do not have anything to do with individual pods// Since this is called in syncLoop, we don't need to call it anywhere elseif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {kl.dnsConfigurer.CheckLimitsForResolvConf()}for {if err := kl.runtimeState.runtimeErrors(); err != nil {klog.ErrorS(err, "Skipping pod synchronization")// exponential backofftime.Sleep(duration)duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration = basekl.syncLoopMonitor.Store(kl.clock.Now())// 调用kl.syncLoopIteration方法if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}

syncLoop()方法在一个循环中不断的调用syncLoopIteration()方法

关于syncLoopIteration()方法中涉及的channel后面会详细介绍,这里只关注syncLoopIteration()方法中的处理逻辑

1)configCh

// pkg/kubelet/kubelet.go
// 该方法会监听多个channel,当发现任何一个channel有数据就交给handler去处理,在handler中通过调用dispatchWork分发任务
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {case u, open := <-configCh:// 该模块将同时watch 3个不同来源的pod信息的变化(file,http,apiServer)// 一旦某个来源的pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的pod信息和更新的具体操作// Update from a config source; dispatch it to the right handler// callback.if !open {klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")return false}switch u.Op {case kubetypes.ADD:klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?klog.ErrorS(nil, "Kubelet does not support snapshot update")default:klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)}kl.sourcesReady.AddSource(u.Source)...}return true
}    

configCh是读取配置事件的管道,该模块将同时watch 3个不同来源的Pod信息的变化(file、http、APIServer),一旦某个来源的Pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的Pod信息和更新的具体操作

2)plegCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case e := <-plegCh:if e.Type == pleg.ContainerStarted {// record the most recent time we observed a container start for this pod.// this lets us selectively invalidate the runtimeCache when processing a delete for this pod// to make sure we don't miss handling graceful termination for containers we reported as having started.kl.lastContainerStartedTime.Add(e.ID, time.Now())}if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)}}if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}...}return true
}        

kl.pleg.Start()的时候会每秒钟调用一次relist,根据最新的PodStatus生成PodLiftCycleEvent,然后存入到plegCh中

syncLoop()方法中调用kl.pleg.Watch()获取plegCh,然后传给syncLoopIteration()方法,syncLoopIteration()方法中消费plegCh中的数据,在handler中通过调用dispatchWork分发任务

3)syncCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case <-syncCh:// Sync pods waiting for syncpodsToSync := kl.getPodsToSync()if len(podsToSync) == 0 {break}klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", format.Pods(podsToSync))// 同步最新保存的pod状态handler.HandlePodSyncs(podsToSync)...}return true
}        

syncCh是由syncLoop()方法里面创建的一个定时任务,每秒钟会向syncCh添加一个数据,这个方法会同步所有等待同步的Pod

4)kl.livenessManager.Updates()kl.readinessManager.Updates()kl.startupManager.Updates()

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case update := <-kl.livenessManager.Updates():// 如果探针健康检查失败,需要更新pod的状态if update.Result == proberesults.Failure {handleProbeSync(kl, update, handler, "liveness", "unhealthy")}case update := <-kl.readinessManager.Updates():// 当readiness状态变更时,更新容器和pod的状态ready := update.Result == proberesults.Successkl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)status := ""if ready {status = "ready"}handleProbeSync(kl, update, handler, "readiness", status)case update := <-kl.startupManager.Updates():// 当startup状态变更时,更新容器和pod的状态started := update.Result == proberesults.Successkl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)status := "unhealthy"if started {status = "started"}handleProbeSync(kl, update, handler, "startup", status)...}return true
}       

6)housekeepingCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case <-housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")} else {start := time.Now()klog.V(4).InfoS("SyncLoop (housekeeping)")// 执行一些清理工作,包括终止pod workers、删除不想要的pod,移除volumes、pod目录if err := handler.HandlePodCleanups(); err != nil {klog.ErrorS(err, "Failed cleaning pods")}duration := time.Since(start)if duration > housekeepingWarningDuration {klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())}klog.V(4).InfoS("SyncLoop (housekeeping) end")}}return true
}

housekeepingCh也是由由syncLoop()方法创建的,每两秒钟会触发清理,包括:终止Pod Workers、删除不想要的Pod,移除Volumes、Pod目录等

syncLoopIteration()方法监听如下的channel,根据事件做不同的处理

  • configCh:监听file、HTTP、APIServer的时间更新
  • plegCh:pleg子模块每秒钟调用一次relist,根据最新的PodStatus生成podLiftCycleEvent,然后存入到plegCh中
  • syncCh:定时器管道, 每隔一秒去同步最新保存的Pod状态
  • kl.livenessManager.Updates():如果探针检查失败,需要更新Pod的状态
  • kl.readinessManager.Updates():当readiness状态变更时,更新容器和Pod的状态
  • kl.startupManager.Updates():当startup状态变更时,更新容器和Pod的状态
  • housekeepingCh:每两秒钟会触发Pod清理工作
4)、小结

kubelet启动过程如下图

3、syncLoopIteration()方法中涉及的channel

1)、configCh

configCh相关的代码调用流程如上图,关于syncLoopIteration()方法中configCh的处理逻辑前面已经讲过了,这里来看下kubelet是如何监听APIServer并将Pod信息变化写入configCh的

// pkg/kubelet/kubelet.go
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {manifestURLHeader := make(http.Header)if len(kubeCfg.StaticPodURLHeader) > 0 {for k, v := range kubeCfg.StaticPodURLHeader {for i := range v {manifestURLHeader.Add(k, v[i])}}}// source of all configuration// 初始化config.PodConfigcfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)// 添加三种数据来源,分别是file、http、apiServer// define file config sourceif kubeCfg.StaticPodPath != "" {klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))}// define url config sourceif kubeCfg.StaticPodURL != "" {klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))}if kubeDeps.KubeClient != nil {klog.InfoS("Adding apiserver pod source")config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))}return cfg, nil
}

makePodSourceConfig()方法中先初始化config.PodConfig,然后添加三种数据来源:分别是file、http、APIServer,调用cfg.Channel()方法会创建对应的channel

1)NewSourceApiserver()

这里先来看监听APIServer的部分,NewSourceApiserver()方法代码如下:

// pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {// 创建ListWatch,监听当前node的pod变化lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))// The Reflector responsible for watching pods at the apiserver should be run only after// the node sync with the apiserver has completed.klog.InfoS("Waiting for node sync before watching apiserver pods")go func() {for {if nodeHasSynced() {klog.V(4).InfoS("node sync completed")break}time.Sleep(WaitForAPIServerSyncPeriod)klog.V(4).InfoS("node sync has not completed yet")}klog.InfoS("Watching apiserver")// 如果node sync完成,调用newSourceApiserverFromLW方法newSourceApiserverFromLW(lw, updates)}()
}func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {send := func(objs []interface{}) {var pods []*v1.Podfor _, o := range objs {pods = append(pods, o.(*v1.Pod))}// 监听到apiServer当前node的pod信息变化后写入channel,后续listen()方法会监听这个channel接收值updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}}// 调用client-go API来创建reflectorr := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)go r.Run(wait.NeverStop)
}

newSourceApiserverFromLW()方法中调用client-go API来创建Reflector,当监听到APIServer中当前Node的Pod信息变化后写入channel

2)cfg.Channel()

makePodSourceConfig()方法中调用cfg.Channel()方法会创建对应的channel

// pkg/kubelet/config/config.go
// 给每个来源注册一个专用的channel
func (c *PodConfig) Channel(source string) chan<- interface{} {c.sourcesLock.Lock()defer c.sourcesLock.Unlock()c.sources.Insert(source)// 调用c.mux.Channel方法return c.mux.Channel(source)
}
// pkg/util/config/config.go
func (m *Mux) Channel(source string) chan interface{} {if len(source) == 0 {panic("Channel given an empty name")}m.sourceLock.Lock()defer m.sourceLock.Unlock()channel, exists := m.sources[source]if exists {return channel}newChannel := make(chan interface{})m.sources[source] = newChannel// 同时启动goroutine去监听新数据// 这里创建的channel最终会传入newSourceApiserverFromLW中定义的send函数中,当监听到apiServer当前node的pod数据变化后会写入channel// listen函数会一直监听这个channel来接收数据go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)return newChannel
}func (m *Mux) listen(source string, listenChannel <-chan interface{}) {for update := range listenChannel {// 调用Merge方法m.merger.Merge(source, update)}
}

Channel()方法中创建的channel最终会传入newSourceApiserverFromLW()方法中定义的send()函数中,当监听到APIServer当前Node的Pod信息数据变化后会写入channel,这里的listen()方法会一直监听这个channel来接收数据,listen()方法调用Merge()方法处理接收到的数据

// pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {s.updateLock.Lock()defer s.updateLock.Unlock()seenBefore := s.sourcesSeen.Has(source)// 区分pod变更类型adds, updates, deletes, removes, reconciles := s.merge(source, change)firstSet := !seenBefore && s.sourcesSeen.Has(source)// deliver update notificationsswitch s.mode {case PodConfigNotificationIncremental:// 最终将pod变更信息传入configChif len(removes.Pods) > 0 {s.updates <- *removes}if len(adds.Pods) > 0 {s.updates <- *adds}if len(updates.Pods) > 0 {s.updates <- *updates}if len(deletes.Pods) > 0 {s.updates <- *deletes}if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {// Send an empty update when first seeing the source and there are// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that// the source is ready.s.updates <- *adds}// Only add reconcile support here, because kubelet doesn't support Snapshot update now.if len(reconciles.Pods) > 0 {s.updates <- *reconciles}...}return nil
}

Merge()方法中会区分Pod变更类型,最终将Pod变更信息传入configCh,kl.syncLoopIteration()方法中监听configCh,交给handler去处理,在handler中通过调用dispatchWork分发任务

configCh数据写入流程如下图:

在这里插入图片描述

2)、plegCh

初始化pleg并运行代码如下:

// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) Start() {go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

Start()方法中启动一个gorounite函数每一秒执行一次relist()方法

// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) relist() {klog.V(5).InfoS("GenericPLEG: Relisting")if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))}timestamp := g.clock.Now()defer func() {metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))}()// Get all the pods.// 调用runtime获取当前node的所有pod和container信息(最终调用CRI接口)podList, err := g.runtime.GetPods(true)if err != nil {klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")return}g.updateRelistTime(timestamp)pods := kubecontainer.Pods(podList)// update running pod and container countupdateRunningPodAndContainerMetrics(pods)g.podRecords.setCurrent(pods)// Compare the old and the current pods, and generate events.eventsByPodID := map[types.UID][]*PodLifecycleEvent{}for pid := range g.podRecords {oldPod := g.podRecords.getOld(pid)pod := g.podRecords.getCurrent(pid)// Get all containers in the old and the new pod.// 获得pod中的所有containerallContainers := getContainersFromPods(oldPod, pod)for _, container := range allContainers {// 检查container是否有变化,如果有变化,生成podLifecycleEventevents := computeEvents(oldPod, pod, &container.ID)for _, e := range events {updateEvents(eventsByPodID, e)}}}var needsReinspection map[types.UID]*kubecontainer.Podif g.cacheEnabled() {needsReinspection = make(map[types.UID]*kubecontainer.Pod)}// If there are events associated with a pod, we should update the// podCache.// 遍历所有发生的event的podfor pid, events := range eventsByPodID {pod := g.podRecords.getCurrent(pid)if g.cacheEnabled() {// updateCache() will inspect the pod and update the cache. If an// error occurs during the inspection, we want PLEG to retry again// in the next relist. To achieve this, we do not update the// associated podRecord of the pod, so that the change will be// detect again in the next relist.// TODO: If many pods changed during the same relist period,// inspecting the pod and getting the PodStatus to update the cache// serially may take a while. We should be aware of this and// parallelize if needed.if err := g.updateCache(pod, pid); err != nil {// Rely on updateCache calling GetPodStatus to log the actual error.klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))// make sure we try to reinspect the pod during the next relistingneedsReinspection[pid] = podcontinue} else {// this pod was in the list to reinspect and we did so because it had events, so remove it// from the list (we don't want the reinspection code below to inspect it a second time in// this relist execution)delete(g.podsToReinspect, pid)}}// Update the internal storage and send out the events.g.podRecords.update(pid)// Map from containerId to exit code; used as a temporary cache for lookupcontainerExitCode := make(map[string]int)// 遍历这个pod的所有event变化for i := range events {// Filter out events that are not reliable and no other components use yet.if events[i].Type == ContainerChanged {continue}select {// 推送到kubelet的plegCh中case g.eventChannel <- events[i]:default:metrics.PLEGDiscardEvents.Inc()klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")}// Log exit code of containers when they finished in a particular eventif events[i].Type == ContainerDied {// Fill up containerExitCode map for ContainerDied event when first time appearedif len(containerExitCode) == 0 && pod != nil && g.cache != nil {// Get updated podStatusstatus, err := g.cache.Get(pod.ID)if err == nil {for _, containerStatus := range status.ContainerStatuses {containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode}}}if containerID, ok := events[i].Data.(string); ok {if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)}}}}}if g.cacheEnabled() {// reinspect any pods that failed inspection during the previous relistif len(g.podsToReinspect) > 0 {klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")for pid, pod := range g.podsToReinspect {if err := g.updateCache(pod, pid); err != nil {// Rely on updateCache calling GetPodStatus to log the actual error.klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))needsReinspection[pid] = pod}}}// Update the cache timestamp.  This needs to happen *after*// all pods have been properly updated in the cache.g.cache.UpdateTime(timestamp)}// make sure we retain the list of pods that need reinspecting the next time relist is calledg.podsToReinspect = needsReinspection
}

relist()方法中主要逻辑如下:

  1. 调用runtime获取当前Node的所有Pod和Container信息(最终调用CRI接口)
  2. 遍历所有Pod,检查container是否有变化,如果有变化,生成podLifecycleEvent
  3. 遍历所有发生的event的Pod,遍历Pod的所有event变化,推送到kubelet的plegChg中
3)、syncCh

所有的Pod进入syncLoopIteration()方法后,最终会走到managePodLoop()方法中,会将Pod信息添加到workQueue队列里

// pkg/kubelet/pleg/generic.go
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {// Requeue the last update if the last sync returned error.switch {case syncErr == nil:// No error; requeue at the regular resync interval.p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):// Network is not ready; back off for short period of time and retry as network might be ready soon.p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))default:// Error occurred during the sync; back off and then retry.p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))}p.completeWorkQueueNext(pod.UID)
}

syncCh是由syncLoop()方法里面创建的一个定时任务,每秒钟会调用getPodsToSync()方法从workQueue中获取等待同步的Pod进行同步

4)、小结

kubelet核心流程如下图

在这里插入图片描述

参考:

11.深入k8s:kubelet工作原理及其初始化源码分析

45 | 幕后英雄:SIG-Node与CRI

46 | 解读 CRI 与 容器运行时

kubelet启动&&创建Pod源码分析

kubelet源码分析 syncLoopIteration(一) configCh

kubelet源码分析 syncLoopIteration(二) plegCh、syncCh、relist

kubelet源码分析 housekeeping 定时清理

这篇关于kubelet源码学习(一):kubelet工作原理、kubelet启动过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/533995

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided