kubelet组件的启动流程源码分析

2024-09-09 05:28

本文主要是介绍kubelet组件的启动流程源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概述

摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。

正文

kubelet的作用

这里对kubelet的作用做一个简单总结。

  • 节点管理

    • 节点的注册

    • 节点状态更新

  • 容器管理(pod生命周期管理)

    • 监听apiserver的容器事件

    • 容器的创建、删除(CRI)

    • 容器的网络的创建与删除(CNI)

    • 容器状态监控

    • 容器的驱逐

  • 监控

    • cadvisor
    • healthz

kubelet的原理

在这里插入图片描述
(图片来源于网络,如有侵权请联系作者)

如下 kubelet 内部组件结构图所示,Kubelet 由许多内部组件构成

  • Kubelet API,包括 10250 端口的认证 API、4194 端口的 cAdvisor API、10255 端口的只读 API 以及 10248 端口的健康检查 API
  • syncLoop:从 API 或者 manifest 目录接收 Pod 更新,发送到 podWorkers 处理,大量使用 channel 处理来处理异步请求
  • 辅助的 manager,如 cAdvisor、PLEG、Volume Manager 等,处理 syncLoop 以外的其他工作
  • CRI:容器执行引擎接口,负责与 container runtime shim 通信
  • 容器执行引擎,如 dockershim、rkt 等
  • 网络插件,目前支持 CNI 和 kubenet

kubelet的启动参数


/usr/local/bin/kubelet--address=<node-name># 指定 kubelet 与 apiserver 通信的端口--port=10250--healthz-bind-address=0.0.0.0# 指定 只读api 的端口--read-only-port=10255# 注册 node 节点使用的hostname--hostname_override=<node-name># 重要!!! 指定kubeletconfig配置文件的路径--config=/home/kube/kubernetes/conf/config.yaml# 指定 pause 容器的image下载路径--pod-infra-container-image=mirrors.myoas.com/nebula-docker/seg/pod/pause:3.1# 指定 kubelet 在节点上存储数据和文件的根目录。--root-dir=/home/kube/kubernetes/lib/kubelet# 每个宿主最大能创建多少个POD--max-pods=60# 指定日志输出记录级别,4表示记录含有调试信息的所有信息--v=4# 指定cni插件--network-plugin=cni--cni-conf-dir=/etc/cni/net.d--cni-bin-dir=/opt/cni/bin# 指定了 kubelet 从 Kubernetes API Server 同步更新的时间间隔(以秒为单位),默认是60秒--sync-frequency=5s# 指定kubeconfig的路径,用于节点注册的认证--kubeconfig=/home/kube/kubernetes/conf/kubelet.kubeconfig# 存放认证文件的目录--cert-dir=/home/kube/ssl/pkc# 设置系统保留资源--system-reserved=cpu=2000m,memory=20000Mi# 支持宿主节点使用swap--fail-swap-on=false

kubelet监听的端口

kubelet 默认监听三个端口,分别为 10250 、10255、10248(有些k8s版本的kubelet也包括cadvisor的4194端口)

  • 10250: kubelet server 与 apiserver 通信的端口,定期请求 apiserver 获取自己所应当处理的任务,通过该端口可以访问获取 node 资源以及状态。

  • 10255: 提供了 pod 、 node、metric和cadvisor 的信息,接口以只读形式暴露出去,访问该端口不需要认证和鉴权。

    root@ubuntu:~# curl http://10.234.12.78:10255/stats/summary
    {"node": {"nodeName": "10.234.12.78","systemContainers": [{"name": "pods","startTime": "2024-09-07T06:00:48Z","cpu": {"time": "2024-09-07T11:49:09Z","usageNanoCores": 12571621,"usageCoreNanoSeconds": 261530338504},// 输出略
    }
    
root@ubuntu:~# curl http://10.234.12.77:10255/metrics  |head -10% Total    % Received % Xferd  Average Speed   Time    Time     Time  CurrentDload  Upload   Total   Spent    Left  Speed0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0# HELP apiserver_audit_event_total [ALPHA] Counter of audit events generated and sent to the audit backend.
# TYPE apiserver_audit_event_total counter
apiserver_audit_event_total 0
# HELP apiserver_audit_requests_rejected_total [ALPHA] Counter of apiserver requests rejected due to an error in audit logging backend.
# TYPE apiserver_audit_requests_rejected_total counter
apiserver_audit_requests_rejected_total 0
# HELP apiserver_client_certificate_expiration_seconds [ALPHA] Distribution of the remaining lifetime on the certificate used to authenticate a request.
# TYPE apiserver_client_certificate_expiration_seconds histogram
apiserver_client_certificate_expiration_seconds_bucket{le="0"} 0
apiserver_client_certificate_expiration_seconds_bucket{le="1800"} 0
// 输出略
root@ubuntu:~# curl http://10.234.12.77:4194/metrics  |head -10
# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="511ec9ef",cadvisorVersion="v0.33.0",dockerVersion="18.09.7",kernelVersion="4.15.0-147-generic",osVersion="Alpine Linux v3.8"} 1
# HELP container_cpu_cfs_periods_total Number of elapsed enforcement period intervals.
# TYPE container_cpu_cfs_periods_total counter
container_cpu_cfs_periods_total{container_label_annotation_io_kubernetes_container_hash="",container_label_annotation_io_kubernetes_container_ports="",container_label_annotation_io_kubernetes_container_restartCount="",container_label_annotation_io_kubernetes_container_terminationMessagePath="",container_label_annotation_io_kubernetes_container_terminationMessagePolicy="",container_label_annotation_io_kubernetes_pod_terminationGracePeriod="",container_label_annotation_kubernetes_io_config_seen="",container_label_annotation_kubernetes_io_config_source="",container_label_app="",container_label_controller_revision_hash="",container_label_io_kubernetes_container_logpath="",container_label_io_kubernetes_container_name="",container_label_io_kubernetes_docker_type="",container_label_io_kubernetes_pod_name="",container_label_io_kubernetes_pod_namespace="",container_label_io_kubernetes_pod_uid="",container_label_io_kubernetes_sandbox_id="",container_label_pod_template_generation="",id="/kubepods/burstable/podc387ad42-a56c-4e29-bc78-0264f44614b7",image="",name=""} 212064 1725710124084
// 输出略
  • 10248: 通过访问该端口可以判断 kubelet 是否正常工作, 通过 kubelet 的启动参数 --healthz-port--healthz-bind-address 来指定监听的地址和端口。

    root@ubuntu:~# curl http://10.234.12.78:10248/healthz
    ok
    

查询 Node 汇总信息

  • 在集群内部可以直接访问 kubelet 的 10255 端口
curl http://<node-name>:10255/stats/summary
  • 查询kubelet健康状态
curl http://<node-name>:10248/healthz
  • 查询kubelet的metrics
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics
  • 查询cadvisor
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics/cadvisor

pod创建流程

在这里插入图片描述
(图片来源于网络,如有侵权请联系作者)

上图是一个典型的pod创建流程图,kubelet通过syncloop 监听到 apiserver 将 pod调度到本机上,之后kubelet通过与dockershim交互,创建container,准备cni,准备image.创建完成container后,kubelet将container的状态信息反馈给apiserver.

kubelet启动源码解析

说明:基于 kubernetes v1.18.0 源码分析

再对kubelet的基础知识有一定了解后,我们下面正式进入kubelet启动流程的源码分析。源码位于k8s.io/kubernetes/cmd/kubelet/kubelet.go

kubeletconfig

在进行源码分析之前,我们分析Kubelet使用的配置文件kubeletconfig,kubeletconfig包括了kubelet程序运行的重要参数信息。

查看kubeletconfig的内容

  • 方法一: 直接访问
root@ubuntu:~# curl -X GET https://10.234.12.78:10250/configz -k |jq

方法二:通过kubectl proxy 间接访问

root@ubuntu:~# kubectl proxy
Starting to serve on 127.0.0.1:8001查看kubeletconfig的内容curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .

kubeletconfig的内容如下


root@ubuntu:~# curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .% Total    % Received % Xferd  Average Speed   Time    Time     Time  CurrentDload  Upload   Total   Spent    Left  Speed
100  1985  100  1985    0     0   351k      0 --:--:-- --:--:-- --:--:--  387k
{"kubeletconfig": {"staticPodPath": "/etc/kubernetes/manifests","syncFrequency": "5s","fileCheckFrequency": "20s","httpCheckFrequency": "20s","address": "<node-name>","port": 10250,"tlsCertFile": "/home/kube/ssl/pkc/kubelet.crt","tlsPrivateKeyFile": "/home/kube/ssl/pkc/kubelet.key","rotateCertificates": true,"authentication": {"x509": {"clientCAFile": "/home/kube/ssl/pkc/ca.crt"},"webhook": {"enabled": true,"cacheTTL": "2m0s"},"anonymous": {"enabled": true}},"authorization": {"mode": "Webhook","webhook": {"cacheAuthorizedTTL": "5m0s","cacheUnauthorizedTTL": "30s"}},"registryPullQPS": 5,"registryBurst": 10,"eventRecordQPS": 5,"eventBurst": 10,"enableDebuggingHandlers": true,"healthzPort": 10248,"healthzBindAddress": "0.0.0.0","oomScoreAdj": -999,"clusterDomain": "cluster.local","clusterDNS": ["10.96.0.10"],"streamingConnectionIdleTimeout": "4h0m0s","nodeStatusUpdateFrequency": "10s","nodeStatusReportFrequency": "1m0s","nodeLeaseDurationSeconds": 40,"imageMinimumGCAge": "2m0s","imageGCHighThresholdPercent": 85,"imageGCLowThresholdPercent": 80,"volumeStatsAggPeriod": "1m0s","cgroupsPerQOS": true,"cgroupDriver": "cgroupfs","cpuManagerPolicy": "none","cpuManagerReconcilePeriod": "10s","topologyManagerPolicy": "none","runtimeRequestTimeout": "2m0s","hairpinMode": "promiscuous-bridge","maxPods": 60,"podPidsLimit": -1,"resolvConf": "/etc/resolv.conf","cpuCFSQuota": true,"cpuCFSQuotaPeriod": "100ms","maxOpenFiles": 1000000,"contentType": "application/vnd.kubernetes.protobuf","kubeAPIQPS": 5,"kubeAPIBurst": 10,"serializeImagePulls": true,"evictionHard": {"imagefs.available": "15%","memory.available": "100Mi","nodefs.available": "10%","nodefs.inodesFree": "5%"},"evictionPressureTransitionPeriod": "5m0s","enableControllerAttachDetach": true,"makeIPTablesUtilChains": true,"iptablesMasqueradeBit": 14,"iptablesDropBit": 15,"failSwapOn": false,"containerLogMaxSize": "10Mi","containerLogMaxFiles": 5,"configMapAndSecretChangeDetectionStrategy": "Watch","systemReserved": {"cpu": "2000m","memory": "20000Mi"},"enforceNodeAllocatable": ["pods"]}
}

main

kubelet组件的启动入口main(),在kubernetes/cmd/kubelet/kubelet.go

kubelet与kubernetes其他组件一样还是使用corba框架,进行命令行参数解析。

func main() {rand.Seed(time.Now().UnixNano())// 调用  app.NewKubeletCommand()command := app.NewKubeletCommand()logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err != nil {os.Exit(1)}
}

NewKubeletCommand

NewKubeletCommand 的执行逻辑包括:

  1. 从命令行参数与kubeletconfig中,读取参数

  2. 参数校验,配置文件校验

  3. 初始化默认的 featureGate 配置。

    具体有哪些featuregate可以参考kubernete官方文档feature-gates

  4. 使用命令行参数和配置文件的参数,构建 KubeletServer

  5. KubeletServer来 构建 kubeletDeps,kubeletDeps 包含 kubelet 运行所必须的配置

  6. 将前面创建的 featureGate, KubeletServer,kubeletDeps传入Run()函数,进行启动kubelet


// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)// 从命令行参数中读取配置kubeletFlags := options.NewKubeletFlags()// 从 Kubeletconfig 中读取配置, Kubeletconfig配置文件由 --config=xxx 指定kubeletConfig, err := options.NewKubeletConfiguration()// programmer errorif err != nil {klog.Fatal(err)}cmd := &cobra.Command{Use: componentKubelet,Long: `The kubelet is the primary "node agent" that runs on each
node. It can register the node with the apiserver using one of: the hostname; `DisableFlagParsing: true,Run: func(cmd *cobra.Command, args []string) {// initial flag parse, since we disable cobra's flag parsing// Parse 解析命令行参数if err := cleanFlagSet.Parse(args); err != nil {cmd.Usage()klog.Fatal(err)}// check if there are non-flag arguments in the command linecmds := cleanFlagSet.Args()if len(cmds) > 0 {cmd.Usage()klog.Fatalf("unknown command: %s", cmds[0])}// short-circuit on helphelp, err := cleanFlagSet.GetBool("help")if err != nil {klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)}if help {cmd.Help()return}// short-circuit on verflagverflag.PrintAndExitIfRequested()utilflag.PrintFlags(cleanFlagSet)// set feature gates from initial flags-based config// 初始化 featureGate 配置if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}// validate the initial KubeletFlags// 校验命令行参数if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {klog.Fatal(err)}// pause 容器下载的路径必须指定。--pod-infra-container-image== 指定的pause容器的image下载路径if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")}// load kubelet config file, if provided// 加载 kubeconfig 配置文件if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {kubeletConfig, err = loadConfigFile(configFile)if err != nil {klog.Fatal(err)}// We must enforce flag precedence by re-parsing the command line into the new object.// This is necessary to preserve backwards-compatibility across binary upgrades.// See issue #56171 for more details.if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {klog.Fatal(err)}// update feature gates based on new configif err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}}// We always validate the local configuration (command line + config file).// This is the default "last-known-good" config for dynamic config, and must always remain valid.// 校验 kubeletconfig 配置文件if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {klog.Fatal(err)}// use dynamic kubelet config, if enabled// 处理动态配置 var kubeletConfigController *dynamickubeletconfig.Controllerif dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfigurationdynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,func(kc *kubeletconfiginternal.KubeletConfiguration) error {return kubeletConfigFlagPrecedence(kc, args)})if err != nil {klog.Fatal(err)}// If we should just use our existing, local config, the controller will return a nil configif dynamicKubeletConfig != nil {kubeletConfig = dynamicKubeletConfig// Note: flag precedence was already enforced in the controller, prior to validation,// by our above transform function. Now we simply update feature gates from the new config.if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}}}// construct a KubeletServer from kubeletFlags and kubeletConfig// 用上面的命令行参数与kubeletconfig来构建一个 KubeletServer 对象kubeletServer := &options.KubeletServer{KubeletFlags:         *kubeletFlags,KubeletConfiguration: *kubeletConfig,}// use kubeletServer to construct the default KubeletDeps// 使用 kubeletServer 构建一个的 kubeletDeps, kubeletDeps是 kubelet启动所依赖的条件kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)if err != nil {klog.Fatal(err)}// add the kubelet config controller to kubeletDepskubeletDeps.KubeletConfigController = kubeletConfigController// set up stopCh here in order to be reused by kubelet and docker shimstopCh := genericapiserver.SetupSignalHandler()// start the experimental docker shim, if enabledif kubeletServer.KubeletFlags.ExperimentalDockershim {if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {klog.Fatal(err)}return}// run the kubelet// 启动 kubeletklog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {klog.Fatal(err)}},}// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flagskubeletFlags.AddFlags(cleanFlagSet)options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)options.AddGlobalFlags(cleanFlagSet)cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flagsconst usageFmt = "Usage:\n  %s\n\nFlags:\n%s"cmd.SetUsageFunc(func(cmd *cobra.Command) error {fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))return nil})cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))})return cmd
}

Run

Run使用给定的Dependencies,来运行指定的KubeletServer,而且它永远不应该退出。kubeDeps参数可以是nil,如果是nil的话,它将从KubeletServer上的设置初始化。否则,将假定调用者已经设置了Dependencies对象,并且不会生成默认对象。

从源码可以看到,Run对OS类型是windows时做一些处理,之后就马上进入run方法

// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {// To help debugging, immediately log versionklog.Infof("Version: %+v", version.Get())// 如果 OS 是 Windows 时,这做一些处理if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {return fmt.Errorf("failed OS init: %v", err)}// 调用run方法进一步处理if err := run(s, kubeDeps, featureGate, stopCh); err != nil {return fmt.Errorf("failed to run Kubelet: %v", err)}return nil
}

run

run() 函数很长,总结下它的做的工作包括:

  • 为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于 Alpha 状态的 FeatureGates 在组件启动时默认关闭,处于 Beta 和 GA 状态的默认开启;
  • 校验 kubelet 的参数;
  • 将当前的配置文件注册到 http server /configz URL 中;这样就可以通过curl -X GET https://127.0.0.1:10250/configz -k查看kubeletconfig的配置信息
  • 检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
  • 初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有 KubeClientEventClientHeartbeatClientAuthcadvisorContainerManager
  • 配置cgroupRoot目录。通过参数指定 --cgroup-root=。kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。
  • 检查是否以 root 用户启动;
  • 为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
  • 调用 RunKubelet 方法;
  • 检查 kubelet 是否启动了动态配置功能;
  • 启动 Healthz http server;默认是10248端口
  • 如果使用 systemd 启动,通知 systemd kubelet 已经启动;

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {// Set global feature gates based on the value on the initial KubeletServer// 设置 全局的 featureerr = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)if err != nil {return err}// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)// 对初始化的 KubeletServer 进行校验if err := options.ValidateKubeletServer(s); err != nil {return err}// Obtain Kubelet Lock Fileif s.ExitOnLockContention && s.LockFilePath == "" {return errors.New("cannot exit on lock file contention: no lock file specified")}done := make(chan struct{})if s.LockFilePath != "" {klog.Infof("acquiring file lock on %q", s.LockFilePath)if err := flock.Acquire(s.LockFilePath); err != nil {return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)}if s.ExitOnLockContention {klog.Infof("watching for inotify events for: %v", s.LockFilePath)if err := watchForLockfileContention(s.LockFilePath, done); err != nil {return err}}}// Register current configuration with /configz endpoint// 注册当前配置文到 /configz http访问点, 可以通过访问 ”curl -X GET https://<nodename>:10250/configz -k“err = initConfigz(&s.KubeletConfiguration)if err != nil {klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)}if len(s.ShowHiddenMetricsForVersion) > 0 {metrics.SetShowHidden()}// About to get clients and such, detect standaloneModestandaloneMode := trueif len(s.KubeConfig) > 0 {standaloneMode = false}// 初始化 kubeDepsif kubeDeps == nil {kubeDeps, err = UnsecuredDependencies(s, featureGate)if err != nil {return err}}if kubeDeps.Cloud == nil {if !cloudprovider.IsExternal(s.CloudProvider) {cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)if err != nil {return err}if cloud == nil {klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)} else {klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)}kubeDeps.Cloud = cloud}}// 初始 hostName, nodeNamehostName, err := nodeutil.GetHostname(s.HostnameOverride)if err != nil {return err}nodeName, err := getNodeName(kubeDeps.Cloud, hostName)if err != nil {return err}// if in standalone mode, indicate as much by setting all clients to nil// 如果是 standalone 模式,则设置 KubeClient ,EventClient,HeartbeatClient 为空switch {case standaloneMode:kubeDeps.KubeClient = nilkubeDeps.EventClient = nilkubeDeps.HeartbeatClient = nilklog.Warningf("standalone mode, no API client")// 初始化 kubeClient , EventClient, HearbeatClientcase kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)if err != nil {return err}if closeAllConns == nil {return errors.New("closeAllConns must be a valid function other than nil")}kubeDeps.OnHeartbeatFailure = closeAllConnskubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet client: %v", err)}// make a separate client for eventseventClientConfig := *clientConfigeventClientConfig.QPS = float32(s.EventRecordQPS)eventClientConfig.Burst = int(s.EventBurst)kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet event client: %v", err)}// make a separate client for heartbeat with throttling disabled and a timeout attachedheartbeatClientConfig := *clientConfigheartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration// The timeout is the minimum of the lease duration and status update frequencyleaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Secondif heartbeatClientConfig.Timeout > leaseTimeout {heartbeatClientConfig.Timeout = leaseTimeout}heartbeatClientConfig.QPS = float32(-1)kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)}}// 出还是 auth 模块if kubeDeps.Auth == nil {auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)if err != nil {return err}kubeDeps.Auth = authrunAuthenticatorCAReload(stopCh)}// 这种 cgroupRoot (linux通过cgroup现在容器的资源使用,cgroupRoot就是 cgroup 对应的根目录,默认是/sys/fs/cgroup/systemd/ 目录)// kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。// runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。 --cgroup-root=/my/cgroup/path// SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。// 这些配置项允许对 kubelet 自身和所管理的容器在 cgroup 层面进行资源限制和隔离。它们可以根据需求进行自定义配置,以满足特定的部署要求。var cgroupRoots []string// 在 systemd 就是一个 cgroupDriver 的一种cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)if err != nil {klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)} else if kubeletCgroup != "" {cgroupRoots = append(cgroupRoots, kubeletCgroup)}runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)if err != nil {klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)} else if runtimeCgroup != "" {// RuntimeCgroups is optional, so ignore if it isn't specifiedcgroupRoots = append(cgroupRoots, runtimeCgroup)}if s.SystemCgroups != "" {// SystemCgroups is optional, so ignore if it isn't specifiedcgroupRoots = append(cgroupRoots, s.SystemCgroups)}// 配置 cavdisor if kubeDeps.CAdvisorInterface == nil {imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))if err != nil {return err}}// Setup event recorder if required.// 配置 事件记录器makeEventRecorder(kubeDeps, nodeName)// 初始化 ContainerManagerif kubeDeps.ContainerManager == nil {if s.CgroupsPerQOS && s.CgroupRoot == "" {klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")s.CgroupRoot = "/"}var reservedSystemCPUs cpuset.CPUSetvar errParse errorif s.ReservedSystemCPUs != "" {reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)if errParse != nil {// invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReservedklog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)return errParse}// is it safe do use CAdvisor here ??machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()if err != nil {// if can't use CAdvisor here, fall back to non-explicit cpu list behavorklog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")reservedSystemCPUs = cpuset.NewCPUSet()} else {reservedList := reservedSystemCPUs.ToSlice()first := reservedList[0]last := reservedList[len(reservedList)-1]if first < 0 || last >= machineInfo.NumCores {// the specified cpuset is outside of the range of what the machine hasklog.Infof("Invalid cpuset specified by --reserved-cpus")return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)}}} else {reservedSystemCPUs = cpuset.NewCPUSet()}if reservedSystemCPUs.Size() > 0 {// at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be okklog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)if s.KubeReserved != nil {delete(s.KubeReserved, "cpu")}if s.SystemReserved == nil {s.SystemReserved = make(map[string]string)}s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)}kubeReserved, err := parseResourceList(s.KubeReserved)if err != nil {return err}systemReserved, err := parseResourceList(s.SystemReserved)if err != nil {return err}var hardEvictionThresholds []evictionapi.Threshold// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)if err != nil {return err}}experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)if err != nil {return err}devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter,kubeDeps.CAdvisorInterface,cm.NodeConfig{RuntimeCgroupsName:    s.RuntimeCgroups,SystemCgroupsName:     s.SystemCgroups,KubeletCgroupsName:    s.KubeletCgroups,ContainerRuntime:      s.ContainerRuntime,CgroupsPerQOS:         s.CgroupsPerQOS,CgroupRoot:            s.CgroupRoot,CgroupDriver:          s.CgroupDriver,KubeletRootDir:        s.RootDirectory,ProtectKernelDefaults: s.ProtectKernelDefaults,NodeAllocatableConfig: cm.NodeAllocatableConfig{KubeReservedCgroupName:   s.KubeReservedCgroup,SystemReservedCgroupName: s.SystemReservedCgroup,EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),KubeReserved:             kubeReserved,SystemReserved:           systemReserved,ReservedSystemCPUs:       reservedSystemCPUs,HardEvictionThresholds:   hardEvictionThresholds,},QOSReserved:                           *experimentalQOSReserved,ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,ExperimentalPodPidsLimit:              s.PodPidsLimit,EnforceCPULimits:                      s.CPUCFSQuota,CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,},s.FailSwapOn,devicePluginEnabled,kubeDeps.Recorder)if err != nil {return err}}// 检查是否以 root 权限启动if err := checkPermissions(); err != nil {klog.Error(err)}utilruntime.ReallyCrash = s.ReallyCrashForTesting// TODO(vmarmol): Do this through container config.// 配置 OOMScoreAdj 分数oomAdjuster := kubeDeps.OOMAdjusterif err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {klog.Warning(err)}err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,kubeDeps, &s.ContainerRuntimeOptions,s.ContainerRuntime,s.RuntimeCgroups,s.RemoteRuntimeEndpoint,s.RemoteImageEndpoint,s.NonMasqueradeCIDR)if err != nil {return err}// 调用 RunKubelet 方法执行后续的启动操作if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {return err}// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loopsif utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {return err}}// 启动 healthz 监控检查的http端口if s.HealthzPort > 0 {mux := http.NewServeMux()healthz.InstallHandler(mux)go wait.Until(func() {err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)if err != nil {klog.Errorf("Starting healthz server failed: %v", err)}}, 5*time.Second, wait.NeverStop)}if s.RunOnce {return nil}// If systemd is used, notify it that we have started// 如果使用的是systemd,则向 systemd 发送启动ready 的信号go daemon.SdNotify(false, "READY=1")select {case <-done:breakcase <-stopCh:break}return nil
}

RunKubelet

run函数调用 RunKubelet 方法执行后续的启动操作,RunKubelet的工作包括:

  1. 设置默认启动特性模式
  2. 调用 createAndInitKubelet ,执行 kubelet 组件的初始化
  3. 检查 kubeDeps.PodConfig. kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,以支持 Kubernetes 中的容器管理和调度任务
  4. 设置 MaxOpenFiles 。 可以通过kubelet启动参数进行调整。–max-open-files=
  5. 调用 startKubelet,启动 kubelet 中的组件
// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
//   1 Integration tests
//   2 Kubelet binary
//   3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)if err != nil {return err}// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nilnodeName, err := getNodeName(kubeDeps.Cloud, hostname)if err != nil {return err}// Setup event recorder if required.makeEventRecorder(kubeDeps, nodeName)// 1.默认使用特权模式capabilities.Initialize(capabilities.Capabilities{AllowPrivileged: true,})credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)if kubeDeps.OSInterface == nil {kubeDeps.OSInterface = kubecontainer.RealOS{}}// 2. 调用 createAndInitKubelet k, err := createAndInitKubelet(.....)if err != nil {return fmt.Errorf("failed to create kubelet: %v", err)}// NewMainKubelet should have set up a pod source config if one didn't exist// when the builder was run. This is just a precaution.// 3. 检查 kubeDeps.PodConfig// kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,// 以支持 Kubernetes 中的容器管理和调度任务if kubeDeps.PodConfig == nil {return fmt.Errorf("failed to create kubelet, pod source config was nil")}podCfg := kubeDeps.PodConfig// 4. 设置 MaxOpenFiles rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))// process pods and exit.if runOnce {if _, err := k.RunOnce(podCfg.Updates()); err != nil {return fmt.Errorf("runonce failed: %v", err)}klog.Info("Started kubelet as runonce")} else {// 5. 调用 startKubeletstartKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)klog.Info("Started kubelet")}return nil
}

createAndInitKubelet

createAndInitKubelet 中主要调用了三个方法来完成 kubelet 的初始化:

  • kubelet.NewMainKubelet 函数使用提供的配置参数创建一个新的kubelet实例。它接收一些输入参数,例如节点名称、配置对象、主节点的网络地址等,以及其他一些选项。它会初始化kubelet的各种子系统和依赖项,并返回一个指向kubelet实例的指针
  • k.BirthCry() 向 apiserver 发送一条 kubelet 启动了的 event;
  • k.StartGarbageCollection() 启动 垃圾回收,回收 container 和 images;
func createAndInitKubelet(....) (k kubelet.Bootstrap, err error) {// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop// up into "per source" synchronizations// 实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;k, err = kubelet.NewMainKubelet(....)if err != nil {return nil, err}// 向 apiserver 发送一条 kubelet 启动了的 event;k.BirthCry()// 启动 垃圾回收,回收 container 和 images;k.StartGarbageCollection()return k, nil
}

startKubelet

  1. k.Run((),启动 kubelet 中的所有模块以及主流程
  2. 启动 kubelet server,默认http监听端口是10250
  3. 启动只读状态http服务,默认是10255
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {// start the kubelet// 启动 kubelet 中的所有模块以及主流程go k.Run(podCfg.Updates())// start the kubelet server// 启动 kubelet server,默认http监听端口是10250if enableServer {go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)}// 只读状态api,默认是10255if kubeCfg.ReadOnlyPort > 0 {go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)}// 启动DefaultFeatureGate中对应http服务的相关端口if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {go k.ListenAndServePodResources()}
}

Kubelet.Run

Run 方法是启动 kubelet 的核心方法,其中会启动 kubelet 的依赖模块以及主循环逻辑,该方法的主要逻辑为:

  • 1、注册 logServer;
  • 2、判断是否需要启动 cloud provider sync manager;
  • 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块;
  • 4、启动 volume manager
  • 5、执行 kl.syncNodeStatus 定时同步 Node 状态;
  • 6、调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步;
  • 7、判断是否启用 NodeLease 机制;
  • 8、执行 kl.updateRuntimeUp 定时更新 Runtime 状态;
  • 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则;
  • 10、执行 kl.podKiller 定时清理异常 pod,当 pod 没有被 podworker 正确处理的时候,启动一个goroutine 负责 kill 掉 pod;
  • 11、启动 statusManager
  • 12、启动 probeManager
  • 13、启动 runtimeClassManager
  • 14、启动 pleg
  • 15、调用 kl.syncLoop 监听 pod 变化;

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {// 配置 logServerif kl.logServer == nil {kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))}// 检查 kubeClientif kl.kubeClient == nil {klog.Warning("No api server defined - no node status update will be sent.")}// Start the cloud provider sync manager// 启动 cloudResourceSyncManager if kl.cloudResourceSyncManager != nil {go kl.cloudResourceSyncManager.Run(wait.NeverStop)}// 在initializeModules()中会启动 imageManager,serverCertificateManager,oomWatcher,resourceAnalyzerif err := kl.initializeModules(); err != nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.Fatal(err)}// Start volume manager// 启动 volumeManagergo kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)if kl.kubeClient != nil {// Start syncing node status immediately, this may set up things the runtime needs to run.// 同步 node 信息go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)// 调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步go kl.fastStatusUpdateOnce()// start syncing lease// 启动 nodeLease控制器,nodeLease是一种节点健康检查机制go kl.nodeLeaseController.Run(wait.NeverStop)}// 执行 kl.updateRuntimeUp 定时更新 Runtime 状态,如果runtime (例如docker) 状态检查返回false,kubelet 将处于not ready状态go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)// Set up iptables util rules// 同步 iptables 规则if kl.makeIPTablesUtilChains {kl.initNetworkUtil()}// Start a goroutine responsible for killing pods (that are not properly// handled by pod workers).// 定时清理异常 podgo wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)// Start component sync loops.// 启动 statusManager、probeManager、runtimeClassManagerkl.statusManager.Start()kl.probeManager.Start()// Start syncing RuntimeClasses if enabled.if kl.runtimeClassManager != nil {kl.runtimeClassManager.Start(wait.NeverStop)}// Start the pod lifecycle event generator.// 启动 pleg 即Pod生命周期实际管理器kl.pleg.Start()// 调用 kl.syncLoop 监听 pod 变化kl.syncLoop(updates, kl)
}

nodeLease 机制是 kubelet 与控制平面交互的机制之一,用于保持节点的活性以及更新节点的状态。是Kubernetes 引入了节点健康检查机制。nodeLease 机制允许 kubelet 周期性地向控制平面报告自己的“租赁”,以证明它仍然在运行。如果 kubelet 停止发送此报告,则控制平面将认为该节点已离线,并在控制平面上更新该节点的状态为“离线”。

initializeModules

initializeModules函数负责初始化kubelet的各个子系统和功能模块,以便kubelet能够正常运行和管理容器化工作负载。具体而言,它的工作包括:

  • 1、创建kubelet工作的文件目录,由参数–root-dir指定

  • 2、创建 ContainerLogsDir

  • 3、启动 imageManager

  • 4、启动 certificate manager

  • 5、启动 oomWatcher.

  • 6、启动 resource analyzer

func (kl *Kubelet) initializeModules() error {metrics.Register(kl.runtimeCache,collectors.NewVolumeStatsCollector(kl),collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),)metrics.SetNodeName(kl.nodeName)servermetrics.Register()// 1、创建文件目录,由参数--root-dir指定if err := kl.setupDataDirs(); err != nil {return err}// 2、创建 ContainerLogsDirif _, err := os.Stat(ContainerLogsDir); err != nil {if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)}}// 3、启动 imageManagerkl.imageManager.Start()// 4、启动 certificate manager if kl.serverCertificateManager != nil {kl.serverCertificateManager.Start()}// 5、启动 oomWatcher.if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {return fmt.Errorf("failed to start OOM watcher %v", err)}// 6、启动 resource analyzerkl.resourceAnalyzer.Start()return nil
}

Kubelet.synloop

syncLoop是处理pod变化的主循环。它监视来自三个通道(file、apisserver和http)的更改,并创建它们的联合。如果发现到任何新更改,将针对期望状态和运行状态运行同步处理。如果配置没有变化,将每同步频率秒同步最后一个已知的所需状态。

Kubelet.synloop 启动for{}循环调用Kubelet.syncLoopIteration对应监听的通道的变化事件进行处理

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {klog.Info("Starting kubelet main sync loop.")// The syncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s.syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop()housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()plegCh := kl.pleg.Watch()const (base   = 100 * time.Millisecondmax    = 5 * time.Secondfactor = 2)duration := base// Responsible for checking limits in resolv.conf// The limits do not have anything to do with individual pods// Since this is called in syncLoop, we don't need to call it anywhere elseif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {kl.dnsConfigurer.CheckLimitsForResolvConf()}// 循环处理 syncLoopIteration for {if err := kl.runtimeState.runtimeErrors(); err != nil {klog.Errorf("skipping pod synchronization - %v", err)// exponential backofftime.Sleep(duration)duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration = basekl.syncLoopMonitor.Store(kl.clock.Now())if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}

Kubelet.syncLoopIteration

kubelet 的 pods 同步逻辑都在 syncLoopIteration 这里. syncLoopIteration 同时监听下面的 chan, 根据事件做不同的处理.

  • configCh: 监听 file, http, apiserver 的事件更新
  • syncCh: 定时器管道, 每隔一秒去同步最新保存的 pod 状态
  • houseKeepingCh: housekeeping 事件的管道,做 pod 清理工作
  • plegCh: 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.
  • livenessManager.Updates: 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {// 监听 file, http, apiserver 的事件更新case u, open := <-configCh:// Update from a config source; dispatch it to the right handler// callback.if !open {klog.Errorf("Update channel is closed. Exiting the sync loop.")return false}switch u.Op {case kubetypes.ADD:klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.RESTORE:klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))// These are pods restored from the checkpoint. Treat them as new// pods.handler.HandlePodAdditions(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?klog.Errorf("Kubelet does not support snapshot update")}if u.Op != kubetypes.RESTORE {// If the update type is RESTORE, it means that the update is from// the pod checkpoints and may be incomplete. Do not mark the// source as ready.// Mark the source ready after receiving at least one update from the// source. Once all the sources are marked ready, various cleanup// routines will start reclaiming resources. It is important that this// takes place only after kubelet calls the update handler to process// the update to ensure the internal pod cache is up-to-date.kl.sourcesReady.AddSource(u.Source)}// 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.case e := <-plegCh:if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)}}if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}//  每隔一秒去同步最新保存的 pod 状态case <-syncCh:// Sync pods waiting for syncpodsToSync := kl.getPodsToSync()if len(podsToSync) == 0 {break}klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))handler.HandlePodSyncs(podsToSync)// 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作case update := <-kl.livenessManager.Updates():if update.Result == proberesults.Failure {// The liveness manager detected a failure; sync the pod.// We should not use the pod from livenessManager, because it is never updated after// initialization.pod, ok := kl.podManager.GetPodByUID(update.PodUID)if !ok {// If the pod no longer exists, ignore the update.klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)break}klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))handler.HandlePodSyncs([]*v1.Pod{pod})}// housekeeping 事件的管道,做 pod 清理工作case <-housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")} else {klog.V(4).Infof("SyncLoop (housekeeping)")if err := handler.HandlePodCleanups(); err != nil {klog.Errorf("Failed cleaning pods: %v", err)}}}return true
}

kubelet工作原理示意图:

img
(图片来源于网络,如有侵权请联系作者)

此处总结一下 kubelet 启动逻辑中的调用关系如下所示:

                                                                                  |--> NewMainKubelet||--> createAndInitKubelet --|--> BirthCry|                           ||--> RunKubelet --|                           |--> StartGarbageCollection|                 ||                 |--> startKubelet --> k.Run --> kl.syncLoop --> kl.syncLoopIteration|
NewKubeletCommand --> Run --> run --|--> http.ListenAndServe||--> daemon.SdNotify

结论

本文主要介绍了 kublet的作用以及工作原理。之后从源码分析了kubelet 的启动流程,从分析过程中可以看到 kubelet 启动流程中的环节非常多,kubelet 中也包含了非常多的模块,如果要深入掌握kubelet的工作机制,则后面得对各个模板进行单独详细展开分析。

参考文档

https://feisky.gitbooks.io/kubernetes/content/components/kubelet.html

https://juejin.cn/post/6844903694618607623

https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kubelet_init.html

这篇关于kubelet组件的启动流程源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1150280

相关文章

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

SpringBoot项目启动后自动加载系统配置的多种实现方式

《SpringBoot项目启动后自动加载系统配置的多种实现方式》:本文主要介绍SpringBoot项目启动后自动加载系统配置的多种实现方式,并通过代码示例讲解的非常详细,对大家的学习或工作有一定的... 目录1. 使用 CommandLineRunner实现方式:2. 使用 ApplicationRunne

vue解决子组件样式覆盖问题scoped deep

《vue解决子组件样式覆盖问题scopeddeep》文章主要介绍了在Vue项目中处理全局样式和局部样式的方法,包括使用scoped属性和深度选择器(/deep/)来覆盖子组件的样式,作者建议所有组件... 目录前言scoped分析deep分析使用总结所有组件必须加scoped父组件覆盖子组件使用deep前言

Python实现NLP的完整流程介绍

《Python实现NLP的完整流程介绍》这篇文章主要为大家详细介绍了Python实现NLP的完整流程,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 编程安装和导入必要的库2. 文本数据准备3. 文本预处理3.1 小写化3.2 分词(Tokenizatio

基于Qt Qml实现时间轴组件

《基于QtQml实现时间轴组件》时间轴组件是现代用户界面中常见的元素,用于按时间顺序展示事件,本文主要为大家详细介绍了如何使用Qml实现一个简单的时间轴组件,需要的可以参考下... 目录写在前面效果图组件概述实现细节1. 组件结构2. 属性定义3. 数据模型4. 事件项的添加和排序5. 事件项的渲染如何使用

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专