kubelet组件的启动流程源码分析

2024-09-09 05:28

本文主要是介绍kubelet组件的启动流程源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概述

摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。

正文

kubelet的作用

这里对kubelet的作用做一个简单总结。

  • 节点管理

    • 节点的注册

    • 节点状态更新

  • 容器管理(pod生命周期管理)

    • 监听apiserver的容器事件

    • 容器的创建、删除(CRI)

    • 容器的网络的创建与删除(CNI)

    • 容器状态监控

    • 容器的驱逐

  • 监控

    • cadvisor
    • healthz

kubelet的原理

在这里插入图片描述
(图片来源于网络,如有侵权请联系作者)

如下 kubelet 内部组件结构图所示,Kubelet 由许多内部组件构成

  • Kubelet API,包括 10250 端口的认证 API、4194 端口的 cAdvisor API、10255 端口的只读 API 以及 10248 端口的健康检查 API
  • syncLoop:从 API 或者 manifest 目录接收 Pod 更新,发送到 podWorkers 处理,大量使用 channel 处理来处理异步请求
  • 辅助的 manager,如 cAdvisor、PLEG、Volume Manager 等,处理 syncLoop 以外的其他工作
  • CRI:容器执行引擎接口,负责与 container runtime shim 通信
  • 容器执行引擎,如 dockershim、rkt 等
  • 网络插件,目前支持 CNI 和 kubenet

kubelet的启动参数


/usr/local/bin/kubelet--address=<node-name># 指定 kubelet 与 apiserver 通信的端口--port=10250--healthz-bind-address=0.0.0.0# 指定 只读api 的端口--read-only-port=10255# 注册 node 节点使用的hostname--hostname_override=<node-name># 重要!!! 指定kubeletconfig配置文件的路径--config=/home/kube/kubernetes/conf/config.yaml# 指定 pause 容器的image下载路径--pod-infra-container-image=mirrors.myoas.com/nebula-docker/seg/pod/pause:3.1# 指定 kubelet 在节点上存储数据和文件的根目录。--root-dir=/home/kube/kubernetes/lib/kubelet# 每个宿主最大能创建多少个POD--max-pods=60# 指定日志输出记录级别,4表示记录含有调试信息的所有信息--v=4# 指定cni插件--network-plugin=cni--cni-conf-dir=/etc/cni/net.d--cni-bin-dir=/opt/cni/bin# 指定了 kubelet 从 Kubernetes API Server 同步更新的时间间隔(以秒为单位),默认是60秒--sync-frequency=5s# 指定kubeconfig的路径,用于节点注册的认证--kubeconfig=/home/kube/kubernetes/conf/kubelet.kubeconfig# 存放认证文件的目录--cert-dir=/home/kube/ssl/pkc# 设置系统保留资源--system-reserved=cpu=2000m,memory=20000Mi# 支持宿主节点使用swap--fail-swap-on=false

kubelet监听的端口

kubelet 默认监听三个端口,分别为 10250 、10255、10248(有些k8s版本的kubelet也包括cadvisor的4194端口)

  • 10250: kubelet server 与 apiserver 通信的端口,定期请求 apiserver 获取自己所应当处理的任务,通过该端口可以访问获取 node 资源以及状态。

  • 10255: 提供了 pod 、 node、metric和cadvisor 的信息,接口以只读形式暴露出去,访问该端口不需要认证和鉴权。

    root@ubuntu:~# curl http://10.234.12.78:10255/stats/summary
    {"node": {"nodeName": "10.234.12.78","systemContainers": [{"name": "pods","startTime": "2024-09-07T06:00:48Z","cpu": {"time": "2024-09-07T11:49:09Z","usageNanoCores": 12571621,"usageCoreNanoSeconds": 261530338504},// 输出略
    }
    
root@ubuntu:~# curl http://10.234.12.77:10255/metrics  |head -10% Total    % Received % Xferd  Average Speed   Time    Time     Time  CurrentDload  Upload   Total   Spent    Left  Speed0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0# HELP apiserver_audit_event_total [ALPHA] Counter of audit events generated and sent to the audit backend.
# TYPE apiserver_audit_event_total counter
apiserver_audit_event_total 0
# HELP apiserver_audit_requests_rejected_total [ALPHA] Counter of apiserver requests rejected due to an error in audit logging backend.
# TYPE apiserver_audit_requests_rejected_total counter
apiserver_audit_requests_rejected_total 0
# HELP apiserver_client_certificate_expiration_seconds [ALPHA] Distribution of the remaining lifetime on the certificate used to authenticate a request.
# TYPE apiserver_client_certificate_expiration_seconds histogram
apiserver_client_certificate_expiration_seconds_bucket{le="0"} 0
apiserver_client_certificate_expiration_seconds_bucket{le="1800"} 0
// 输出略
root@ubuntu:~# curl http://10.234.12.77:4194/metrics  |head -10
# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="511ec9ef",cadvisorVersion="v0.33.0",dockerVersion="18.09.7",kernelVersion="4.15.0-147-generic",osVersion="Alpine Linux v3.8"} 1
# HELP container_cpu_cfs_periods_total Number of elapsed enforcement period intervals.
# TYPE container_cpu_cfs_periods_total counter
container_cpu_cfs_periods_total{container_label_annotation_io_kubernetes_container_hash="",container_label_annotation_io_kubernetes_container_ports="",container_label_annotation_io_kubernetes_container_restartCount="",container_label_annotation_io_kubernetes_container_terminationMessagePath="",container_label_annotation_io_kubernetes_container_terminationMessagePolicy="",container_label_annotation_io_kubernetes_pod_terminationGracePeriod="",container_label_annotation_kubernetes_io_config_seen="",container_label_annotation_kubernetes_io_config_source="",container_label_app="",container_label_controller_revision_hash="",container_label_io_kubernetes_container_logpath="",container_label_io_kubernetes_container_name="",container_label_io_kubernetes_docker_type="",container_label_io_kubernetes_pod_name="",container_label_io_kubernetes_pod_namespace="",container_label_io_kubernetes_pod_uid="",container_label_io_kubernetes_sandbox_id="",container_label_pod_template_generation="",id="/kubepods/burstable/podc387ad42-a56c-4e29-bc78-0264f44614b7",image="",name=""} 212064 1725710124084
// 输出略
  • 10248: 通过访问该端口可以判断 kubelet 是否正常工作, 通过 kubelet 的启动参数 --healthz-port--healthz-bind-address 来指定监听的地址和端口。

    root@ubuntu:~# curl http://10.234.12.78:10248/healthz
    ok
    

查询 Node 汇总信息

  • 在集群内部可以直接访问 kubelet 的 10255 端口
curl http://<node-name>:10255/stats/summary
  • 查询kubelet健康状态
curl http://<node-name>:10248/healthz
  • 查询kubelet的metrics
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics
  • 查询cadvisor
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics/cadvisor

pod创建流程

在这里插入图片描述
(图片来源于网络,如有侵权请联系作者)

上图是一个典型的pod创建流程图,kubelet通过syncloop 监听到 apiserver 将 pod调度到本机上,之后kubelet通过与dockershim交互,创建container,准备cni,准备image.创建完成container后,kubelet将container的状态信息反馈给apiserver.

kubelet启动源码解析

说明:基于 kubernetes v1.18.0 源码分析

再对kubelet的基础知识有一定了解后,我们下面正式进入kubelet启动流程的源码分析。源码位于k8s.io/kubernetes/cmd/kubelet/kubelet.go

kubeletconfig

在进行源码分析之前,我们分析Kubelet使用的配置文件kubeletconfig,kubeletconfig包括了kubelet程序运行的重要参数信息。

查看kubeletconfig的内容

  • 方法一: 直接访问
root@ubuntu:~# curl -X GET https://10.234.12.78:10250/configz -k |jq

方法二:通过kubectl proxy 间接访问

root@ubuntu:~# kubectl proxy
Starting to serve on 127.0.0.1:8001查看kubeletconfig的内容curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .

kubeletconfig的内容如下


root@ubuntu:~# curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .% Total    % Received % Xferd  Average Speed   Time    Time     Time  CurrentDload  Upload   Total   Spent    Left  Speed
100  1985  100  1985    0     0   351k      0 --:--:-- --:--:-- --:--:--  387k
{"kubeletconfig": {"staticPodPath": "/etc/kubernetes/manifests","syncFrequency": "5s","fileCheckFrequency": "20s","httpCheckFrequency": "20s","address": "<node-name>","port": 10250,"tlsCertFile": "/home/kube/ssl/pkc/kubelet.crt","tlsPrivateKeyFile": "/home/kube/ssl/pkc/kubelet.key","rotateCertificates": true,"authentication": {"x509": {"clientCAFile": "/home/kube/ssl/pkc/ca.crt"},"webhook": {"enabled": true,"cacheTTL": "2m0s"},"anonymous": {"enabled": true}},"authorization": {"mode": "Webhook","webhook": {"cacheAuthorizedTTL": "5m0s","cacheUnauthorizedTTL": "30s"}},"registryPullQPS": 5,"registryBurst": 10,"eventRecordQPS": 5,"eventBurst": 10,"enableDebuggingHandlers": true,"healthzPort": 10248,"healthzBindAddress": "0.0.0.0","oomScoreAdj": -999,"clusterDomain": "cluster.local","clusterDNS": ["10.96.0.10"],"streamingConnectionIdleTimeout": "4h0m0s","nodeStatusUpdateFrequency": "10s","nodeStatusReportFrequency": "1m0s","nodeLeaseDurationSeconds": 40,"imageMinimumGCAge": "2m0s","imageGCHighThresholdPercent": 85,"imageGCLowThresholdPercent": 80,"volumeStatsAggPeriod": "1m0s","cgroupsPerQOS": true,"cgroupDriver": "cgroupfs","cpuManagerPolicy": "none","cpuManagerReconcilePeriod": "10s","topologyManagerPolicy": "none","runtimeRequestTimeout": "2m0s","hairpinMode": "promiscuous-bridge","maxPods": 60,"podPidsLimit": -1,"resolvConf": "/etc/resolv.conf","cpuCFSQuota": true,"cpuCFSQuotaPeriod": "100ms","maxOpenFiles": 1000000,"contentType": "application/vnd.kubernetes.protobuf","kubeAPIQPS": 5,"kubeAPIBurst": 10,"serializeImagePulls": true,"evictionHard": {"imagefs.available": "15%","memory.available": "100Mi","nodefs.available": "10%","nodefs.inodesFree": "5%"},"evictionPressureTransitionPeriod": "5m0s","enableControllerAttachDetach": true,"makeIPTablesUtilChains": true,"iptablesMasqueradeBit": 14,"iptablesDropBit": 15,"failSwapOn": false,"containerLogMaxSize": "10Mi","containerLogMaxFiles": 5,"configMapAndSecretChangeDetectionStrategy": "Watch","systemReserved": {"cpu": "2000m","memory": "20000Mi"},"enforceNodeAllocatable": ["pods"]}
}

main

kubelet组件的启动入口main(),在kubernetes/cmd/kubelet/kubelet.go

kubelet与kubernetes其他组件一样还是使用corba框架,进行命令行参数解析。

func main() {rand.Seed(time.Now().UnixNano())// 调用  app.NewKubeletCommand()command := app.NewKubeletCommand()logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err != nil {os.Exit(1)}
}

NewKubeletCommand

NewKubeletCommand 的执行逻辑包括:

  1. 从命令行参数与kubeletconfig中,读取参数

  2. 参数校验,配置文件校验

  3. 初始化默认的 featureGate 配置。

    具体有哪些featuregate可以参考kubernete官方文档feature-gates

  4. 使用命令行参数和配置文件的参数,构建 KubeletServer

  5. KubeletServer来 构建 kubeletDeps,kubeletDeps 包含 kubelet 运行所必须的配置

  6. 将前面创建的 featureGate, KubeletServer,kubeletDeps传入Run()函数,进行启动kubelet


// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)// 从命令行参数中读取配置kubeletFlags := options.NewKubeletFlags()// 从 Kubeletconfig 中读取配置, Kubeletconfig配置文件由 --config=xxx 指定kubeletConfig, err := options.NewKubeletConfiguration()// programmer errorif err != nil {klog.Fatal(err)}cmd := &cobra.Command{Use: componentKubelet,Long: `The kubelet is the primary "node agent" that runs on each
node. It can register the node with the apiserver using one of: the hostname; `DisableFlagParsing: true,Run: func(cmd *cobra.Command, args []string) {// initial flag parse, since we disable cobra's flag parsing// Parse 解析命令行参数if err := cleanFlagSet.Parse(args); err != nil {cmd.Usage()klog.Fatal(err)}// check if there are non-flag arguments in the command linecmds := cleanFlagSet.Args()if len(cmds) > 0 {cmd.Usage()klog.Fatalf("unknown command: %s", cmds[0])}// short-circuit on helphelp, err := cleanFlagSet.GetBool("help")if err != nil {klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)}if help {cmd.Help()return}// short-circuit on verflagverflag.PrintAndExitIfRequested()utilflag.PrintFlags(cleanFlagSet)// set feature gates from initial flags-based config// 初始化 featureGate 配置if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}// validate the initial KubeletFlags// 校验命令行参数if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {klog.Fatal(err)}// pause 容器下载的路径必须指定。--pod-infra-container-image== 指定的pause容器的image下载路径if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")}// load kubelet config file, if provided// 加载 kubeconfig 配置文件if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {kubeletConfig, err = loadConfigFile(configFile)if err != nil {klog.Fatal(err)}// We must enforce flag precedence by re-parsing the command line into the new object.// This is necessary to preserve backwards-compatibility across binary upgrades.// See issue #56171 for more details.if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {klog.Fatal(err)}// update feature gates based on new configif err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}}// We always validate the local configuration (command line + config file).// This is the default "last-known-good" config for dynamic config, and must always remain valid.// 校验 kubeletconfig 配置文件if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {klog.Fatal(err)}// use dynamic kubelet config, if enabled// 处理动态配置 var kubeletConfigController *dynamickubeletconfig.Controllerif dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfigurationdynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,func(kc *kubeletconfiginternal.KubeletConfiguration) error {return kubeletConfigFlagPrecedence(kc, args)})if err != nil {klog.Fatal(err)}// If we should just use our existing, local config, the controller will return a nil configif dynamicKubeletConfig != nil {kubeletConfig = dynamicKubeletConfig// Note: flag precedence was already enforced in the controller, prior to validation,// by our above transform function. Now we simply update feature gates from the new config.if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}}}// construct a KubeletServer from kubeletFlags and kubeletConfig// 用上面的命令行参数与kubeletconfig来构建一个 KubeletServer 对象kubeletServer := &options.KubeletServer{KubeletFlags:         *kubeletFlags,KubeletConfiguration: *kubeletConfig,}// use kubeletServer to construct the default KubeletDeps// 使用 kubeletServer 构建一个的 kubeletDeps, kubeletDeps是 kubelet启动所依赖的条件kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)if err != nil {klog.Fatal(err)}// add the kubelet config controller to kubeletDepskubeletDeps.KubeletConfigController = kubeletConfigController// set up stopCh here in order to be reused by kubelet and docker shimstopCh := genericapiserver.SetupSignalHandler()// start the experimental docker shim, if enabledif kubeletServer.KubeletFlags.ExperimentalDockershim {if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {klog.Fatal(err)}return}// run the kubelet// 启动 kubeletklog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {klog.Fatal(err)}},}// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flagskubeletFlags.AddFlags(cleanFlagSet)options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)options.AddGlobalFlags(cleanFlagSet)cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flagsconst usageFmt = "Usage:\n  %s\n\nFlags:\n%s"cmd.SetUsageFunc(func(cmd *cobra.Command) error {fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))return nil})cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))})return cmd
}

Run

Run使用给定的Dependencies,来运行指定的KubeletServer,而且它永远不应该退出。kubeDeps参数可以是nil,如果是nil的话,它将从KubeletServer上的设置初始化。否则,将假定调用者已经设置了Dependencies对象,并且不会生成默认对象。

从源码可以看到,Run对OS类型是windows时做一些处理,之后就马上进入run方法

// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {// To help debugging, immediately log versionklog.Infof("Version: %+v", version.Get())// 如果 OS 是 Windows 时,这做一些处理if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {return fmt.Errorf("failed OS init: %v", err)}// 调用run方法进一步处理if err := run(s, kubeDeps, featureGate, stopCh); err != nil {return fmt.Errorf("failed to run Kubelet: %v", err)}return nil
}

run

run() 函数很长,总结下它的做的工作包括:

  • 为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于 Alpha 状态的 FeatureGates 在组件启动时默认关闭,处于 Beta 和 GA 状态的默认开启;
  • 校验 kubelet 的参数;
  • 将当前的配置文件注册到 http server /configz URL 中;这样就可以通过curl -X GET https://127.0.0.1:10250/configz -k查看kubeletconfig的配置信息
  • 检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
  • 初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有 KubeClientEventClientHeartbeatClientAuthcadvisorContainerManager
  • 配置cgroupRoot目录。通过参数指定 --cgroup-root=。kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。
  • 检查是否以 root 用户启动;
  • 为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
  • 调用 RunKubelet 方法;
  • 检查 kubelet 是否启动了动态配置功能;
  • 启动 Healthz http server;默认是10248端口
  • 如果使用 systemd 启动,通知 systemd kubelet 已经启动;

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {// Set global feature gates based on the value on the initial KubeletServer// 设置 全局的 featureerr = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)if err != nil {return err}// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)// 对初始化的 KubeletServer 进行校验if err := options.ValidateKubeletServer(s); err != nil {return err}// Obtain Kubelet Lock Fileif s.ExitOnLockContention && s.LockFilePath == "" {return errors.New("cannot exit on lock file contention: no lock file specified")}done := make(chan struct{})if s.LockFilePath != "" {klog.Infof("acquiring file lock on %q", s.LockFilePath)if err := flock.Acquire(s.LockFilePath); err != nil {return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)}if s.ExitOnLockContention {klog.Infof("watching for inotify events for: %v", s.LockFilePath)if err := watchForLockfileContention(s.LockFilePath, done); err != nil {return err}}}// Register current configuration with /configz endpoint// 注册当前配置文到 /configz http访问点, 可以通过访问 ”curl -X GET https://<nodename>:10250/configz -k“err = initConfigz(&s.KubeletConfiguration)if err != nil {klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)}if len(s.ShowHiddenMetricsForVersion) > 0 {metrics.SetShowHidden()}// About to get clients and such, detect standaloneModestandaloneMode := trueif len(s.KubeConfig) > 0 {standaloneMode = false}// 初始化 kubeDepsif kubeDeps == nil {kubeDeps, err = UnsecuredDependencies(s, featureGate)if err != nil {return err}}if kubeDeps.Cloud == nil {if !cloudprovider.IsExternal(s.CloudProvider) {cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)if err != nil {return err}if cloud == nil {klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)} else {klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)}kubeDeps.Cloud = cloud}}// 初始 hostName, nodeNamehostName, err := nodeutil.GetHostname(s.HostnameOverride)if err != nil {return err}nodeName, err := getNodeName(kubeDeps.Cloud, hostName)if err != nil {return err}// if in standalone mode, indicate as much by setting all clients to nil// 如果是 standalone 模式,则设置 KubeClient ,EventClient,HeartbeatClient 为空switch {case standaloneMode:kubeDeps.KubeClient = nilkubeDeps.EventClient = nilkubeDeps.HeartbeatClient = nilklog.Warningf("standalone mode, no API client")// 初始化 kubeClient , EventClient, HearbeatClientcase kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)if err != nil {return err}if closeAllConns == nil {return errors.New("closeAllConns must be a valid function other than nil")}kubeDeps.OnHeartbeatFailure = closeAllConnskubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet client: %v", err)}// make a separate client for eventseventClientConfig := *clientConfigeventClientConfig.QPS = float32(s.EventRecordQPS)eventClientConfig.Burst = int(s.EventBurst)kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet event client: %v", err)}// make a separate client for heartbeat with throttling disabled and a timeout attachedheartbeatClientConfig := *clientConfigheartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration// The timeout is the minimum of the lease duration and status update frequencyleaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Secondif heartbeatClientConfig.Timeout > leaseTimeout {heartbeatClientConfig.Timeout = leaseTimeout}heartbeatClientConfig.QPS = float32(-1)kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)}}// 出还是 auth 模块if kubeDeps.Auth == nil {auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)if err != nil {return err}kubeDeps.Auth = authrunAuthenticatorCAReload(stopCh)}// 这种 cgroupRoot (linux通过cgroup现在容器的资源使用,cgroupRoot就是 cgroup 对应的根目录,默认是/sys/fs/cgroup/systemd/ 目录)// kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。// runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。 --cgroup-root=/my/cgroup/path// SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。// 这些配置项允许对 kubelet 自身和所管理的容器在 cgroup 层面进行资源限制和隔离。它们可以根据需求进行自定义配置,以满足特定的部署要求。var cgroupRoots []string// 在 systemd 就是一个 cgroupDriver 的一种cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)if err != nil {klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)} else if kubeletCgroup != "" {cgroupRoots = append(cgroupRoots, kubeletCgroup)}runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)if err != nil {klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)} else if runtimeCgroup != "" {// RuntimeCgroups is optional, so ignore if it isn't specifiedcgroupRoots = append(cgroupRoots, runtimeCgroup)}if s.SystemCgroups != "" {// SystemCgroups is optional, so ignore if it isn't specifiedcgroupRoots = append(cgroupRoots, s.SystemCgroups)}// 配置 cavdisor if kubeDeps.CAdvisorInterface == nil {imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))if err != nil {return err}}// Setup event recorder if required.// 配置 事件记录器makeEventRecorder(kubeDeps, nodeName)// 初始化 ContainerManagerif kubeDeps.ContainerManager == nil {if s.CgroupsPerQOS && s.CgroupRoot == "" {klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")s.CgroupRoot = "/"}var reservedSystemCPUs cpuset.CPUSetvar errParse errorif s.ReservedSystemCPUs != "" {reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)if errParse != nil {// invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReservedklog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)return errParse}// is it safe do use CAdvisor here ??machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()if err != nil {// if can't use CAdvisor here, fall back to non-explicit cpu list behavorklog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")reservedSystemCPUs = cpuset.NewCPUSet()} else {reservedList := reservedSystemCPUs.ToSlice()first := reservedList[0]last := reservedList[len(reservedList)-1]if first < 0 || last >= machineInfo.NumCores {// the specified cpuset is outside of the range of what the machine hasklog.Infof("Invalid cpuset specified by --reserved-cpus")return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)}}} else {reservedSystemCPUs = cpuset.NewCPUSet()}if reservedSystemCPUs.Size() > 0 {// at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be okklog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)if s.KubeReserved != nil {delete(s.KubeReserved, "cpu")}if s.SystemReserved == nil {s.SystemReserved = make(map[string]string)}s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)}kubeReserved, err := parseResourceList(s.KubeReserved)if err != nil {return err}systemReserved, err := parseResourceList(s.SystemReserved)if err != nil {return err}var hardEvictionThresholds []evictionapi.Threshold// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)if err != nil {return err}}experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)if err != nil {return err}devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter,kubeDeps.CAdvisorInterface,cm.NodeConfig{RuntimeCgroupsName:    s.RuntimeCgroups,SystemCgroupsName:     s.SystemCgroups,KubeletCgroupsName:    s.KubeletCgroups,ContainerRuntime:      s.ContainerRuntime,CgroupsPerQOS:         s.CgroupsPerQOS,CgroupRoot:            s.CgroupRoot,CgroupDriver:          s.CgroupDriver,KubeletRootDir:        s.RootDirectory,ProtectKernelDefaults: s.ProtectKernelDefaults,NodeAllocatableConfig: cm.NodeAllocatableConfig{KubeReservedCgroupName:   s.KubeReservedCgroup,SystemReservedCgroupName: s.SystemReservedCgroup,EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),KubeReserved:             kubeReserved,SystemReserved:           systemReserved,ReservedSystemCPUs:       reservedSystemCPUs,HardEvictionThresholds:   hardEvictionThresholds,},QOSReserved:                           *experimentalQOSReserved,ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,ExperimentalPodPidsLimit:              s.PodPidsLimit,EnforceCPULimits:                      s.CPUCFSQuota,CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,},s.FailSwapOn,devicePluginEnabled,kubeDeps.Recorder)if err != nil {return err}}// 检查是否以 root 权限启动if err := checkPermissions(); err != nil {klog.Error(err)}utilruntime.ReallyCrash = s.ReallyCrashForTesting// TODO(vmarmol): Do this through container config.// 配置 OOMScoreAdj 分数oomAdjuster := kubeDeps.OOMAdjusterif err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {klog.Warning(err)}err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,kubeDeps, &s.ContainerRuntimeOptions,s.ContainerRuntime,s.RuntimeCgroups,s.RemoteRuntimeEndpoint,s.RemoteImageEndpoint,s.NonMasqueradeCIDR)if err != nil {return err}// 调用 RunKubelet 方法执行后续的启动操作if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {return err}// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loopsif utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {return err}}// 启动 healthz 监控检查的http端口if s.HealthzPort > 0 {mux := http.NewServeMux()healthz.InstallHandler(mux)go wait.Until(func() {err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)if err != nil {klog.Errorf("Starting healthz server failed: %v", err)}}, 5*time.Second, wait.NeverStop)}if s.RunOnce {return nil}// If systemd is used, notify it that we have started// 如果使用的是systemd,则向 systemd 发送启动ready 的信号go daemon.SdNotify(false, "READY=1")select {case <-done:breakcase <-stopCh:break}return nil
}

RunKubelet

run函数调用 RunKubelet 方法执行后续的启动操作,RunKubelet的工作包括:

  1. 设置默认启动特性模式
  2. 调用 createAndInitKubelet ,执行 kubelet 组件的初始化
  3. 检查 kubeDeps.PodConfig. kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,以支持 Kubernetes 中的容器管理和调度任务
  4. 设置 MaxOpenFiles 。 可以通过kubelet启动参数进行调整。–max-open-files=
  5. 调用 startKubelet,启动 kubelet 中的组件
// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
//   1 Integration tests
//   2 Kubelet binary
//   3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)if err != nil {return err}// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nilnodeName, err := getNodeName(kubeDeps.Cloud, hostname)if err != nil {return err}// Setup event recorder if required.makeEventRecorder(kubeDeps, nodeName)// 1.默认使用特权模式capabilities.Initialize(capabilities.Capabilities{AllowPrivileged: true,})credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)if kubeDeps.OSInterface == nil {kubeDeps.OSInterface = kubecontainer.RealOS{}}// 2. 调用 createAndInitKubelet k, err := createAndInitKubelet(.....)if err != nil {return fmt.Errorf("failed to create kubelet: %v", err)}// NewMainKubelet should have set up a pod source config if one didn't exist// when the builder was run. This is just a precaution.// 3. 检查 kubeDeps.PodConfig// kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,// 以支持 Kubernetes 中的容器管理和调度任务if kubeDeps.PodConfig == nil {return fmt.Errorf("failed to create kubelet, pod source config was nil")}podCfg := kubeDeps.PodConfig// 4. 设置 MaxOpenFiles rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))// process pods and exit.if runOnce {if _, err := k.RunOnce(podCfg.Updates()); err != nil {return fmt.Errorf("runonce failed: %v", err)}klog.Info("Started kubelet as runonce")} else {// 5. 调用 startKubeletstartKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)klog.Info("Started kubelet")}return nil
}

createAndInitKubelet

createAndInitKubelet 中主要调用了三个方法来完成 kubelet 的初始化:

  • kubelet.NewMainKubelet 函数使用提供的配置参数创建一个新的kubelet实例。它接收一些输入参数,例如节点名称、配置对象、主节点的网络地址等,以及其他一些选项。它会初始化kubelet的各种子系统和依赖项,并返回一个指向kubelet实例的指针
  • k.BirthCry() 向 apiserver 发送一条 kubelet 启动了的 event;
  • k.StartGarbageCollection() 启动 垃圾回收,回收 container 和 images;
func createAndInitKubelet(....) (k kubelet.Bootstrap, err error) {// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop// up into "per source" synchronizations// 实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;k, err = kubelet.NewMainKubelet(....)if err != nil {return nil, err}// 向 apiserver 发送一条 kubelet 启动了的 event;k.BirthCry()// 启动 垃圾回收,回收 container 和 images;k.StartGarbageCollection()return k, nil
}

startKubelet

  1. k.Run((),启动 kubelet 中的所有模块以及主流程
  2. 启动 kubelet server,默认http监听端口是10250
  3. 启动只读状态http服务,默认是10255
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {// start the kubelet// 启动 kubelet 中的所有模块以及主流程go k.Run(podCfg.Updates())// start the kubelet server// 启动 kubelet server,默认http监听端口是10250if enableServer {go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)}// 只读状态api,默认是10255if kubeCfg.ReadOnlyPort > 0 {go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)}// 启动DefaultFeatureGate中对应http服务的相关端口if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {go k.ListenAndServePodResources()}
}

Kubelet.Run

Run 方法是启动 kubelet 的核心方法,其中会启动 kubelet 的依赖模块以及主循环逻辑,该方法的主要逻辑为:

  • 1、注册 logServer;
  • 2、判断是否需要启动 cloud provider sync manager;
  • 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块;
  • 4、启动 volume manager
  • 5、执行 kl.syncNodeStatus 定时同步 Node 状态;
  • 6、调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步;
  • 7、判断是否启用 NodeLease 机制;
  • 8、执行 kl.updateRuntimeUp 定时更新 Runtime 状态;
  • 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则;
  • 10、执行 kl.podKiller 定时清理异常 pod,当 pod 没有被 podworker 正确处理的时候,启动一个goroutine 负责 kill 掉 pod;
  • 11、启动 statusManager
  • 12、启动 probeManager
  • 13、启动 runtimeClassManager
  • 14、启动 pleg
  • 15、调用 kl.syncLoop 监听 pod 变化;

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {// 配置 logServerif kl.logServer == nil {kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))}// 检查 kubeClientif kl.kubeClient == nil {klog.Warning("No api server defined - no node status update will be sent.")}// Start the cloud provider sync manager// 启动 cloudResourceSyncManager if kl.cloudResourceSyncManager != nil {go kl.cloudResourceSyncManager.Run(wait.NeverStop)}// 在initializeModules()中会启动 imageManager,serverCertificateManager,oomWatcher,resourceAnalyzerif err := kl.initializeModules(); err != nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.Fatal(err)}// Start volume manager// 启动 volumeManagergo kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)if kl.kubeClient != nil {// Start syncing node status immediately, this may set up things the runtime needs to run.// 同步 node 信息go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)// 调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步go kl.fastStatusUpdateOnce()// start syncing lease// 启动 nodeLease控制器,nodeLease是一种节点健康检查机制go kl.nodeLeaseController.Run(wait.NeverStop)}// 执行 kl.updateRuntimeUp 定时更新 Runtime 状态,如果runtime (例如docker) 状态检查返回false,kubelet 将处于not ready状态go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)// Set up iptables util rules// 同步 iptables 规则if kl.makeIPTablesUtilChains {kl.initNetworkUtil()}// Start a goroutine responsible for killing pods (that are not properly// handled by pod workers).// 定时清理异常 podgo wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)// Start component sync loops.// 启动 statusManager、probeManager、runtimeClassManagerkl.statusManager.Start()kl.probeManager.Start()// Start syncing RuntimeClasses if enabled.if kl.runtimeClassManager != nil {kl.runtimeClassManager.Start(wait.NeverStop)}// Start the pod lifecycle event generator.// 启动 pleg 即Pod生命周期实际管理器kl.pleg.Start()// 调用 kl.syncLoop 监听 pod 变化kl.syncLoop(updates, kl)
}

nodeLease 机制是 kubelet 与控制平面交互的机制之一,用于保持节点的活性以及更新节点的状态。是Kubernetes 引入了节点健康检查机制。nodeLease 机制允许 kubelet 周期性地向控制平面报告自己的“租赁”,以证明它仍然在运行。如果 kubelet 停止发送此报告,则控制平面将认为该节点已离线,并在控制平面上更新该节点的状态为“离线”。

initializeModules

initializeModules函数负责初始化kubelet的各个子系统和功能模块,以便kubelet能够正常运行和管理容器化工作负载。具体而言,它的工作包括:

  • 1、创建kubelet工作的文件目录,由参数–root-dir指定

  • 2、创建 ContainerLogsDir

  • 3、启动 imageManager

  • 4、启动 certificate manager

  • 5、启动 oomWatcher.

  • 6、启动 resource analyzer

func (kl *Kubelet) initializeModules() error {metrics.Register(kl.runtimeCache,collectors.NewVolumeStatsCollector(kl),collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),)metrics.SetNodeName(kl.nodeName)servermetrics.Register()// 1、创建文件目录,由参数--root-dir指定if err := kl.setupDataDirs(); err != nil {return err}// 2、创建 ContainerLogsDirif _, err := os.Stat(ContainerLogsDir); err != nil {if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)}}// 3、启动 imageManagerkl.imageManager.Start()// 4、启动 certificate manager if kl.serverCertificateManager != nil {kl.serverCertificateManager.Start()}// 5、启动 oomWatcher.if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {return fmt.Errorf("failed to start OOM watcher %v", err)}// 6、启动 resource analyzerkl.resourceAnalyzer.Start()return nil
}

Kubelet.synloop

syncLoop是处理pod变化的主循环。它监视来自三个通道(file、apisserver和http)的更改,并创建它们的联合。如果发现到任何新更改,将针对期望状态和运行状态运行同步处理。如果配置没有变化,将每同步频率秒同步最后一个已知的所需状态。

Kubelet.synloop 启动for{}循环调用Kubelet.syncLoopIteration对应监听的通道的变化事件进行处理

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {klog.Info("Starting kubelet main sync loop.")// The syncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s.syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop()housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()plegCh := kl.pleg.Watch()const (base   = 100 * time.Millisecondmax    = 5 * time.Secondfactor = 2)duration := base// Responsible for checking limits in resolv.conf// The limits do not have anything to do with individual pods// Since this is called in syncLoop, we don't need to call it anywhere elseif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {kl.dnsConfigurer.CheckLimitsForResolvConf()}// 循环处理 syncLoopIteration for {if err := kl.runtimeState.runtimeErrors(); err != nil {klog.Errorf("skipping pod synchronization - %v", err)// exponential backofftime.Sleep(duration)duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration = basekl.syncLoopMonitor.Store(kl.clock.Now())if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}

Kubelet.syncLoopIteration

kubelet 的 pods 同步逻辑都在 syncLoopIteration 这里. syncLoopIteration 同时监听下面的 chan, 根据事件做不同的处理.

  • configCh: 监听 file, http, apiserver 的事件更新
  • syncCh: 定时器管道, 每隔一秒去同步最新保存的 pod 状态
  • houseKeepingCh: housekeeping 事件的管道,做 pod 清理工作
  • plegCh: 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.
  • livenessManager.Updates: 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {// 监听 file, http, apiserver 的事件更新case u, open := <-configCh:// Update from a config source; dispatch it to the right handler// callback.if !open {klog.Errorf("Update channel is closed. Exiting the sync loop.")return false}switch u.Op {case kubetypes.ADD:klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.RESTORE:klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))// These are pods restored from the checkpoint. Treat them as new// pods.handler.HandlePodAdditions(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?klog.Errorf("Kubelet does not support snapshot update")}if u.Op != kubetypes.RESTORE {// If the update type is RESTORE, it means that the update is from// the pod checkpoints and may be incomplete. Do not mark the// source as ready.// Mark the source ready after receiving at least one update from the// source. Once all the sources are marked ready, various cleanup// routines will start reclaiming resources. It is important that this// takes place only after kubelet calls the update handler to process// the update to ensure the internal pod cache is up-to-date.kl.sourcesReady.AddSource(u.Source)}// 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.case e := <-plegCh:if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)}}if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}//  每隔一秒去同步最新保存的 pod 状态case <-syncCh:// Sync pods waiting for syncpodsToSync := kl.getPodsToSync()if len(podsToSync) == 0 {break}klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))handler.HandlePodSyncs(podsToSync)// 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作case update := <-kl.livenessManager.Updates():if update.Result == proberesults.Failure {// The liveness manager detected a failure; sync the pod.// We should not use the pod from livenessManager, because it is never updated after// initialization.pod, ok := kl.podManager.GetPodByUID(update.PodUID)if !ok {// If the pod no longer exists, ignore the update.klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)break}klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))handler.HandlePodSyncs([]*v1.Pod{pod})}// housekeeping 事件的管道,做 pod 清理工作case <-housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")} else {klog.V(4).Infof("SyncLoop (housekeeping)")if err := handler.HandlePodCleanups(); err != nil {klog.Errorf("Failed cleaning pods: %v", err)}}}return true
}

kubelet工作原理示意图:

img
(图片来源于网络,如有侵权请联系作者)

此处总结一下 kubelet 启动逻辑中的调用关系如下所示:

                                                                                  |--> NewMainKubelet||--> createAndInitKubelet --|--> BirthCry|                           ||--> RunKubelet --|                           |--> StartGarbageCollection|                 ||                 |--> startKubelet --> k.Run --> kl.syncLoop --> kl.syncLoopIteration|
NewKubeletCommand --> Run --> run --|--> http.ListenAndServe||--> daemon.SdNotify

结论

本文主要介绍了 kublet的作用以及工作原理。之后从源码分析了kubelet 的启动流程,从分析过程中可以看到 kubelet 启动流程中的环节非常多,kubelet 中也包含了非常多的模块,如果要深入掌握kubelet的工作机制,则后面得对各个模板进行单独详细展开分析。

参考文档

https://feisky.gitbooks.io/kubernetes/content/components/kubelet.html

https://juejin.cn/post/6844903694618607623

https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kubelet_init.html

这篇关于kubelet组件的启动流程源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1150280

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

如何在页面调用utility bar并传递参数至lwc组件

1.在app的utility item中添加lwc组件: 2.调用utility bar api的方式有两种: 方法一,通过lwc调用: import {LightningElement,api ,wire } from 'lwc';import { publish, MessageContext } from 'lightning/messageService';import Ca

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL