本文主要是介绍kubelet组件的启动流程源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
概述
摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。
正文
kubelet的作用
这里对kubelet的作用做一个简单总结。
-
节点管理
-
节点的注册
-
节点状态更新
-
-
容器管理(pod生命周期管理)
-
监听apiserver的容器事件
-
容器的创建、删除(CRI)
-
容器的网络的创建与删除(CNI)
-
容器状态监控
-
容器的驱逐
-
-
监控
- cadvisor
- healthz
kubelet的原理
(图片来源于网络,如有侵权请联系作者)
如下 kubelet 内部组件结构图所示,Kubelet 由许多内部组件构成
Kubelet API
,包括 10250 端口的认证 API、4194 端口的 cAdvisor API、10255 端口的只读 API 以及 10248 端口的健康检查 APIsyncLoop
:从 API 或者 manifest 目录接收 Pod 更新,发送到 podWorkers 处理,大量使用 channel 处理来处理异步请求- 辅助的
manager
,如 cAdvisor、PLEG、Volume Manager 等,处理 syncLoop 以外的其他工作 CRI
:容器执行引擎接口,负责与 container runtime shim 通信- 容器执行引擎,如 dockershim、rkt 等
网络插件
,目前支持 CNI 和 kubenet
kubelet的启动参数
/usr/local/bin/kubelet--address=<node-name># 指定 kubelet 与 apiserver 通信的端口--port=10250--healthz-bind-address=0.0.0.0# 指定 只读api 的端口--read-only-port=10255# 注册 node 节点使用的hostname--hostname_override=<node-name># 重要!!! 指定kubeletconfig配置文件的路径--config=/home/kube/kubernetes/conf/config.yaml# 指定 pause 容器的image下载路径--pod-infra-container-image=mirrors.myoas.com/nebula-docker/seg/pod/pause:3.1# 指定 kubelet 在节点上存储数据和文件的根目录。--root-dir=/home/kube/kubernetes/lib/kubelet# 每个宿主最大能创建多少个POD--max-pods=60# 指定日志输出记录级别,4表示记录含有调试信息的所有信息--v=4# 指定cni插件--network-plugin=cni--cni-conf-dir=/etc/cni/net.d--cni-bin-dir=/opt/cni/bin# 指定了 kubelet 从 Kubernetes API Server 同步更新的时间间隔(以秒为单位),默认是60秒--sync-frequency=5s# 指定kubeconfig的路径,用于节点注册的认证--kubeconfig=/home/kube/kubernetes/conf/kubelet.kubeconfig# 存放认证文件的目录--cert-dir=/home/kube/ssl/pkc# 设置系统保留资源--system-reserved=cpu=2000m,memory=20000Mi# 支持宿主节点使用swap--fail-swap-on=false
kubelet监听的端口
kubelet 默认监听三个端口,分别为 10250 、10255、10248(有些k8s版本的kubelet也包括cadvisor的4194端口)
-
10250: kubelet server 与 apiserver 通信的端口,定期请求 apiserver 获取自己所应当处理的任务,通过该端口可以访问获取 node 资源以及状态。
-
10255: 提供了 pod 、 node、metric和cadvisor 的信息,接口以只读形式暴露出去,访问该端口不需要认证和鉴权。
root@ubuntu:~# curl http://10.234.12.78:10255/stats/summary {"node": {"nodeName": "10.234.12.78","systemContainers": [{"name": "pods","startTime": "2024-09-07T06:00:48Z","cpu": {"time": "2024-09-07T11:49:09Z","usageNanoCores": 12571621,"usageCoreNanoSeconds": 261530338504},// 输出略 }
root@ubuntu:~# curl http://10.234.12.77:10255/metrics |head -10% Total % Received % Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0# HELP apiserver_audit_event_total [ALPHA] Counter of audit events generated and sent to the audit backend.
# TYPE apiserver_audit_event_total counter
apiserver_audit_event_total 0
# HELP apiserver_audit_requests_rejected_total [ALPHA] Counter of apiserver requests rejected due to an error in audit logging backend.
# TYPE apiserver_audit_requests_rejected_total counter
apiserver_audit_requests_rejected_total 0
# HELP apiserver_client_certificate_expiration_seconds [ALPHA] Distribution of the remaining lifetime on the certificate used to authenticate a request.
# TYPE apiserver_client_certificate_expiration_seconds histogram
apiserver_client_certificate_expiration_seconds_bucket{le="0"} 0
apiserver_client_certificate_expiration_seconds_bucket{le="1800"} 0
// 输出略
root@ubuntu:~# curl http://10.234.12.77:4194/metrics |head -10
# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="511ec9ef",cadvisorVersion="v0.33.0",dockerVersion="18.09.7",kernelVersion="4.15.0-147-generic",osVersion="Alpine Linux v3.8"} 1
# HELP container_cpu_cfs_periods_total Number of elapsed enforcement period intervals.
# TYPE container_cpu_cfs_periods_total counter
container_cpu_cfs_periods_total{container_label_annotation_io_kubernetes_container_hash="",container_label_annotation_io_kubernetes_container_ports="",container_label_annotation_io_kubernetes_container_restartCount="",container_label_annotation_io_kubernetes_container_terminationMessagePath="",container_label_annotation_io_kubernetes_container_terminationMessagePolicy="",container_label_annotation_io_kubernetes_pod_terminationGracePeriod="",container_label_annotation_kubernetes_io_config_seen="",container_label_annotation_kubernetes_io_config_source="",container_label_app="",container_label_controller_revision_hash="",container_label_io_kubernetes_container_logpath="",container_label_io_kubernetes_container_name="",container_label_io_kubernetes_docker_type="",container_label_io_kubernetes_pod_name="",container_label_io_kubernetes_pod_namespace="",container_label_io_kubernetes_pod_uid="",container_label_io_kubernetes_sandbox_id="",container_label_pod_template_generation="",id="/kubepods/burstable/podc387ad42-a56c-4e29-bc78-0264f44614b7",image="",name=""} 212064 1725710124084
// 输出略
-
10248: 通过访问该端口可以判断 kubelet 是否正常工作, 通过 kubelet 的启动参数
--healthz-port
和--healthz-bind-address
来指定监听的地址和端口。root@ubuntu:~# curl http://10.234.12.78:10248/healthz ok
查询 Node 汇总信息
- 在集群内部可以直接访问 kubelet 的 10255 端口
curl http://<node-name>:10255/stats/summary
- 查询kubelet健康状态
curl http://<node-name>:10248/healthz
- 查询kubelet的metrics
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics
- 查询cadvisor
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics/cadvisor
pod创建流程
(图片来源于网络,如有侵权请联系作者)
上图是一个典型的pod创建流程图,kubelet通过syncloop 监听到 apiserver 将 pod调度到本机上,之后kubelet通过与dockershim交互,创建container,准备cni,准备image.创建完成container后,kubelet将container的状态信息反馈给apiserver.
kubelet启动源码解析
说明:基于 kubernetes
v1.18.0
源码分析
再对kubelet的基础知识有一定了解后,我们下面正式进入kubelet启动流程的源码分析。源码位于k8s.io/kubernetes/cmd/kubelet/kubelet.go
kubeletconfig
在进行源码分析之前,我们分析Kubelet使用的配置文件kubeletconfig,kubeletconfig包括了kubelet程序运行的重要参数信息。
查看kubeletconfig的内容
- 方法一: 直接访问
root@ubuntu:~# curl -X GET https://10.234.12.78:10250/configz -k |jq
方法二:通过kubectl proxy 间接访问
root@ubuntu:~# kubectl proxy
Starting to serve on 127.0.0.1:8001查看kubeletconfig的内容curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .
kubeletconfig的内容如下
root@ubuntu:~# curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .% Total % Received % Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed
100 1985 100 1985 0 0 351k 0 --:--:-- --:--:-- --:--:-- 387k
{"kubeletconfig": {"staticPodPath": "/etc/kubernetes/manifests","syncFrequency": "5s","fileCheckFrequency": "20s","httpCheckFrequency": "20s","address": "<node-name>","port": 10250,"tlsCertFile": "/home/kube/ssl/pkc/kubelet.crt","tlsPrivateKeyFile": "/home/kube/ssl/pkc/kubelet.key","rotateCertificates": true,"authentication": {"x509": {"clientCAFile": "/home/kube/ssl/pkc/ca.crt"},"webhook": {"enabled": true,"cacheTTL": "2m0s"},"anonymous": {"enabled": true}},"authorization": {"mode": "Webhook","webhook": {"cacheAuthorizedTTL": "5m0s","cacheUnauthorizedTTL": "30s"}},"registryPullQPS": 5,"registryBurst": 10,"eventRecordQPS": 5,"eventBurst": 10,"enableDebuggingHandlers": true,"healthzPort": 10248,"healthzBindAddress": "0.0.0.0","oomScoreAdj": -999,"clusterDomain": "cluster.local","clusterDNS": ["10.96.0.10"],"streamingConnectionIdleTimeout": "4h0m0s","nodeStatusUpdateFrequency": "10s","nodeStatusReportFrequency": "1m0s","nodeLeaseDurationSeconds": 40,"imageMinimumGCAge": "2m0s","imageGCHighThresholdPercent": 85,"imageGCLowThresholdPercent": 80,"volumeStatsAggPeriod": "1m0s","cgroupsPerQOS": true,"cgroupDriver": "cgroupfs","cpuManagerPolicy": "none","cpuManagerReconcilePeriod": "10s","topologyManagerPolicy": "none","runtimeRequestTimeout": "2m0s","hairpinMode": "promiscuous-bridge","maxPods": 60,"podPidsLimit": -1,"resolvConf": "/etc/resolv.conf","cpuCFSQuota": true,"cpuCFSQuotaPeriod": "100ms","maxOpenFiles": 1000000,"contentType": "application/vnd.kubernetes.protobuf","kubeAPIQPS": 5,"kubeAPIBurst": 10,"serializeImagePulls": true,"evictionHard": {"imagefs.available": "15%","memory.available": "100Mi","nodefs.available": "10%","nodefs.inodesFree": "5%"},"evictionPressureTransitionPeriod": "5m0s","enableControllerAttachDetach": true,"makeIPTablesUtilChains": true,"iptablesMasqueradeBit": 14,"iptablesDropBit": 15,"failSwapOn": false,"containerLogMaxSize": "10Mi","containerLogMaxFiles": 5,"configMapAndSecretChangeDetectionStrategy": "Watch","systemReserved": {"cpu": "2000m","memory": "20000Mi"},"enforceNodeAllocatable": ["pods"]}
}
main
kubelet组件的启动入口main(),在kubernetes/cmd/kubelet/kubelet.go
中
kubelet与kubernetes其他组件一样还是使用corba框架,进行命令行参数解析。
func main() {rand.Seed(time.Now().UnixNano())// 调用 app.NewKubeletCommand()command := app.NewKubeletCommand()logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err != nil {os.Exit(1)}
}
NewKubeletCommand
NewKubeletCommand 的执行逻辑包括:
-
从命令行参数与
kubeletconfig
中,读取参数 -
参数校验,配置文件校验
-
初始化默认的 featureGate 配置。
具体有哪些featuregate可以参考kubernete官方文档feature-gates
-
使用命令行参数和配置文件的参数,构建
KubeletServer
-
用
KubeletServer
来 构建kubeletDeps
,kubeletDeps
包含 kubelet 运行所必须的配置 -
将前面创建的
featureGate
,KubeletServer
,kubeletDeps
传入Run()函数,进行启动kubelet
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)// 从命令行参数中读取配置kubeletFlags := options.NewKubeletFlags()// 从 Kubeletconfig 中读取配置, Kubeletconfig配置文件由 --config=xxx 指定kubeletConfig, err := options.NewKubeletConfiguration()// programmer errorif err != nil {klog.Fatal(err)}cmd := &cobra.Command{Use: componentKubelet,Long: `The kubelet is the primary "node agent" that runs on each
node. It can register the node with the apiserver using one of: the hostname; `DisableFlagParsing: true,Run: func(cmd *cobra.Command, args []string) {// initial flag parse, since we disable cobra's flag parsing// Parse 解析命令行参数if err := cleanFlagSet.Parse(args); err != nil {cmd.Usage()klog.Fatal(err)}// check if there are non-flag arguments in the command linecmds := cleanFlagSet.Args()if len(cmds) > 0 {cmd.Usage()klog.Fatalf("unknown command: %s", cmds[0])}// short-circuit on helphelp, err := cleanFlagSet.GetBool("help")if err != nil {klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)}if help {cmd.Help()return}// short-circuit on verflagverflag.PrintAndExitIfRequested()utilflag.PrintFlags(cleanFlagSet)// set feature gates from initial flags-based config// 初始化 featureGate 配置if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}// validate the initial KubeletFlags// 校验命令行参数if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {klog.Fatal(err)}// pause 容器下载的路径必须指定。--pod-infra-container-image== 指定的pause容器的image下载路径if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")}// load kubelet config file, if provided// 加载 kubeconfig 配置文件if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {kubeletConfig, err = loadConfigFile(configFile)if err != nil {klog.Fatal(err)}// We must enforce flag precedence by re-parsing the command line into the new object.// This is necessary to preserve backwards-compatibility across binary upgrades.// See issue #56171 for more details.if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {klog.Fatal(err)}// update feature gates based on new configif err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}}// We always validate the local configuration (command line + config file).// This is the default "last-known-good" config for dynamic config, and must always remain valid.// 校验 kubeletconfig 配置文件if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {klog.Fatal(err)}// use dynamic kubelet config, if enabled// 处理动态配置 var kubeletConfigController *dynamickubeletconfig.Controllerif dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfigurationdynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,func(kc *kubeletconfiginternal.KubeletConfiguration) error {return kubeletConfigFlagPrecedence(kc, args)})if err != nil {klog.Fatal(err)}// If we should just use our existing, local config, the controller will return a nil configif dynamicKubeletConfig != nil {kubeletConfig = dynamicKubeletConfig// Note: flag precedence was already enforced in the controller, prior to validation,// by our above transform function. Now we simply update feature gates from the new config.if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {klog.Fatal(err)}}}// construct a KubeletServer from kubeletFlags and kubeletConfig// 用上面的命令行参数与kubeletconfig来构建一个 KubeletServer 对象kubeletServer := &options.KubeletServer{KubeletFlags: *kubeletFlags,KubeletConfiguration: *kubeletConfig,}// use kubeletServer to construct the default KubeletDeps// 使用 kubeletServer 构建一个的 kubeletDeps, kubeletDeps是 kubelet启动所依赖的条件kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)if err != nil {klog.Fatal(err)}// add the kubelet config controller to kubeletDepskubeletDeps.KubeletConfigController = kubeletConfigController// set up stopCh here in order to be reused by kubelet and docker shimstopCh := genericapiserver.SetupSignalHandler()// start the experimental docker shim, if enabledif kubeletServer.KubeletFlags.ExperimentalDockershim {if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {klog.Fatal(err)}return}// run the kubelet// 启动 kubeletklog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {klog.Fatal(err)}},}// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flagskubeletFlags.AddFlags(cleanFlagSet)options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)options.AddGlobalFlags(cleanFlagSet)cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flagsconst usageFmt = "Usage:\n %s\n\nFlags:\n%s"cmd.SetUsageFunc(func(cmd *cobra.Command) error {fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))return nil})cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))})return cmd
}
Run
Run使用给定的Dependencies,来运行指定的KubeletServer,而且它永远不应该退出。kubeDeps参数可以是nil,如果是nil的话,它将从KubeletServer上的设置初始化。否则,将假定调用者已经设置了Dependencies对象,并且不会生成默认对象。
从源码可以看到,Run对OS类型是windows时做一些处理,之后就马上进入run方法
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {// To help debugging, immediately log versionklog.Infof("Version: %+v", version.Get())// 如果 OS 是 Windows 时,这做一些处理if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {return fmt.Errorf("failed OS init: %v", err)}// 调用run方法进一步处理if err := run(s, kubeDeps, featureGate, stopCh); err != nil {return fmt.Errorf("failed to run Kubelet: %v", err)}return nil
}
run
run() 函数很长,总结下它的做的工作包括:
- 为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于
Alpha
状态的 FeatureGates 在组件启动时默认关闭,处于Beta
和 GA 状态的默认开启; - 校验 kubelet 的参数;
- 将当前的配置文件注册到 http server
/configz
URL 中;这样就可以通过curl -X GET https://127.0.0.1:10250/configz -k
查看kubeletconfig的配置信息 - 检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
- 初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有
KubeClient
、EventClient
、HeartbeatClient
、Auth
、cadvisor
、ContainerManager
; - 配置cgroupRoot目录。通过参数指定 --cgroup-root=。kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。
- 检查是否以 root 用户启动;
- 为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
- 调用
RunKubelet
方法; - 检查 kubelet 是否启动了动态配置功能;
- 启动 Healthz http server;默认是10248端口
- 如果使用 systemd 启动,通知 systemd kubelet 已经启动;
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {// Set global feature gates based on the value on the initial KubeletServer// 设置 全局的 featureerr = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)if err != nil {return err}// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)// 对初始化的 KubeletServer 进行校验if err := options.ValidateKubeletServer(s); err != nil {return err}// Obtain Kubelet Lock Fileif s.ExitOnLockContention && s.LockFilePath == "" {return errors.New("cannot exit on lock file contention: no lock file specified")}done := make(chan struct{})if s.LockFilePath != "" {klog.Infof("acquiring file lock on %q", s.LockFilePath)if err := flock.Acquire(s.LockFilePath); err != nil {return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)}if s.ExitOnLockContention {klog.Infof("watching for inotify events for: %v", s.LockFilePath)if err := watchForLockfileContention(s.LockFilePath, done); err != nil {return err}}}// Register current configuration with /configz endpoint// 注册当前配置文到 /configz http访问点, 可以通过访问 ”curl -X GET https://<nodename>:10250/configz -k“err = initConfigz(&s.KubeletConfiguration)if err != nil {klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)}if len(s.ShowHiddenMetricsForVersion) > 0 {metrics.SetShowHidden()}// About to get clients and such, detect standaloneModestandaloneMode := trueif len(s.KubeConfig) > 0 {standaloneMode = false}// 初始化 kubeDepsif kubeDeps == nil {kubeDeps, err = UnsecuredDependencies(s, featureGate)if err != nil {return err}}if kubeDeps.Cloud == nil {if !cloudprovider.IsExternal(s.CloudProvider) {cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)if err != nil {return err}if cloud == nil {klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)} else {klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)}kubeDeps.Cloud = cloud}}// 初始 hostName, nodeNamehostName, err := nodeutil.GetHostname(s.HostnameOverride)if err != nil {return err}nodeName, err := getNodeName(kubeDeps.Cloud, hostName)if err != nil {return err}// if in standalone mode, indicate as much by setting all clients to nil// 如果是 standalone 模式,则设置 KubeClient ,EventClient,HeartbeatClient 为空switch {case standaloneMode:kubeDeps.KubeClient = nilkubeDeps.EventClient = nilkubeDeps.HeartbeatClient = nilklog.Warningf("standalone mode, no API client")// 初始化 kubeClient , EventClient, HearbeatClientcase kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)if err != nil {return err}if closeAllConns == nil {return errors.New("closeAllConns must be a valid function other than nil")}kubeDeps.OnHeartbeatFailure = closeAllConnskubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet client: %v", err)}// make a separate client for eventseventClientConfig := *clientConfigeventClientConfig.QPS = float32(s.EventRecordQPS)eventClientConfig.Burst = int(s.EventBurst)kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet event client: %v", err)}// make a separate client for heartbeat with throttling disabled and a timeout attachedheartbeatClientConfig := *clientConfigheartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration// The timeout is the minimum of the lease duration and status update frequencyleaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Secondif heartbeatClientConfig.Timeout > leaseTimeout {heartbeatClientConfig.Timeout = leaseTimeout}heartbeatClientConfig.QPS = float32(-1)kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)if err != nil {return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)}}// 出还是 auth 模块if kubeDeps.Auth == nil {auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)if err != nil {return err}kubeDeps.Auth = authrunAuthenticatorCAReload(stopCh)}// 这种 cgroupRoot (linux通过cgroup现在容器的资源使用,cgroupRoot就是 cgroup 对应的根目录,默认是/sys/fs/cgroup/systemd/ 目录)// kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。// runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。 --cgroup-root=/my/cgroup/path// SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。// 这些配置项允许对 kubelet 自身和所管理的容器在 cgroup 层面进行资源限制和隔离。它们可以根据需求进行自定义配置,以满足特定的部署要求。var cgroupRoots []string// 在 systemd 就是一个 cgroupDriver 的一种cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)if err != nil {klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)} else if kubeletCgroup != "" {cgroupRoots = append(cgroupRoots, kubeletCgroup)}runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)if err != nil {klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)} else if runtimeCgroup != "" {// RuntimeCgroups is optional, so ignore if it isn't specifiedcgroupRoots = append(cgroupRoots, runtimeCgroup)}if s.SystemCgroups != "" {// SystemCgroups is optional, so ignore if it isn't specifiedcgroupRoots = append(cgroupRoots, s.SystemCgroups)}// 配置 cavdisor if kubeDeps.CAdvisorInterface == nil {imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))if err != nil {return err}}// Setup event recorder if required.// 配置 事件记录器makeEventRecorder(kubeDeps, nodeName)// 初始化 ContainerManagerif kubeDeps.ContainerManager == nil {if s.CgroupsPerQOS && s.CgroupRoot == "" {klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")s.CgroupRoot = "/"}var reservedSystemCPUs cpuset.CPUSetvar errParse errorif s.ReservedSystemCPUs != "" {reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)if errParse != nil {// invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReservedklog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)return errParse}// is it safe do use CAdvisor here ??machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()if err != nil {// if can't use CAdvisor here, fall back to non-explicit cpu list behavorklog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")reservedSystemCPUs = cpuset.NewCPUSet()} else {reservedList := reservedSystemCPUs.ToSlice()first := reservedList[0]last := reservedList[len(reservedList)-1]if first < 0 || last >= machineInfo.NumCores {// the specified cpuset is outside of the range of what the machine hasklog.Infof("Invalid cpuset specified by --reserved-cpus")return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)}}} else {reservedSystemCPUs = cpuset.NewCPUSet()}if reservedSystemCPUs.Size() > 0 {// at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be okklog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)if s.KubeReserved != nil {delete(s.KubeReserved, "cpu")}if s.SystemReserved == nil {s.SystemReserved = make(map[string]string)}s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)}kubeReserved, err := parseResourceList(s.KubeReserved)if err != nil {return err}systemReserved, err := parseResourceList(s.SystemReserved)if err != nil {return err}var hardEvictionThresholds []evictionapi.Threshold// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)if err != nil {return err}}experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)if err != nil {return err}devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter,kubeDeps.CAdvisorInterface,cm.NodeConfig{RuntimeCgroupsName: s.RuntimeCgroups,SystemCgroupsName: s.SystemCgroups,KubeletCgroupsName: s.KubeletCgroups,ContainerRuntime: s.ContainerRuntime,CgroupsPerQOS: s.CgroupsPerQOS,CgroupRoot: s.CgroupRoot,CgroupDriver: s.CgroupDriver,KubeletRootDir: s.RootDirectory,ProtectKernelDefaults: s.ProtectKernelDefaults,NodeAllocatableConfig: cm.NodeAllocatableConfig{KubeReservedCgroupName: s.KubeReservedCgroup,SystemReservedCgroupName: s.SystemReservedCgroup,EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),KubeReserved: kubeReserved,SystemReserved: systemReserved,ReservedSystemCPUs: reservedSystemCPUs,HardEvictionThresholds: hardEvictionThresholds,},QOSReserved: *experimentalQOSReserved,ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,ExperimentalPodPidsLimit: s.PodPidsLimit,EnforceCPULimits: s.CPUCFSQuota,CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,},s.FailSwapOn,devicePluginEnabled,kubeDeps.Recorder)if err != nil {return err}}// 检查是否以 root 权限启动if err := checkPermissions(); err != nil {klog.Error(err)}utilruntime.ReallyCrash = s.ReallyCrashForTesting// TODO(vmarmol): Do this through container config.// 配置 OOMScoreAdj 分数oomAdjuster := kubeDeps.OOMAdjusterif err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {klog.Warning(err)}err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,kubeDeps, &s.ContainerRuntimeOptions,s.ContainerRuntime,s.RuntimeCgroups,s.RemoteRuntimeEndpoint,s.RemoteImageEndpoint,s.NonMasqueradeCIDR)if err != nil {return err}// 调用 RunKubelet 方法执行后续的启动操作if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {return err}// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loopsif utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {return err}}// 启动 healthz 监控检查的http端口if s.HealthzPort > 0 {mux := http.NewServeMux()healthz.InstallHandler(mux)go wait.Until(func() {err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)if err != nil {klog.Errorf("Starting healthz server failed: %v", err)}}, 5*time.Second, wait.NeverStop)}if s.RunOnce {return nil}// If systemd is used, notify it that we have started// 如果使用的是systemd,则向 systemd 发送启动ready 的信号go daemon.SdNotify(false, "READY=1")select {case <-done:breakcase <-stopCh:break}return nil
}
RunKubelet
run函数调用 RunKubelet 方法执行后续的启动操作,RunKubelet的工作包括:
- 设置默认启动特性模式
- 调用 createAndInitKubelet ,执行 kubelet 组件的初始化
- 检查 kubeDeps.PodConfig. kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,以支持 Kubernetes 中的容器管理和调度任务
- 设置 MaxOpenFiles 。 可以通过kubelet启动参数进行调整。–max-open-files=
- 调用 startKubelet,启动 kubelet 中的组件
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)if err != nil {return err}// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nilnodeName, err := getNodeName(kubeDeps.Cloud, hostname)if err != nil {return err}// Setup event recorder if required.makeEventRecorder(kubeDeps, nodeName)// 1.默认使用特权模式capabilities.Initialize(capabilities.Capabilities{AllowPrivileged: true,})credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)if kubeDeps.OSInterface == nil {kubeDeps.OSInterface = kubecontainer.RealOS{}}// 2. 调用 createAndInitKubelet k, err := createAndInitKubelet(.....)if err != nil {return fmt.Errorf("failed to create kubelet: %v", err)}// NewMainKubelet should have set up a pod source config if one didn't exist// when the builder was run. This is just a precaution.// 3. 检查 kubeDeps.PodConfig// kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,// 以支持 Kubernetes 中的容器管理和调度任务if kubeDeps.PodConfig == nil {return fmt.Errorf("failed to create kubelet, pod source config was nil")}podCfg := kubeDeps.PodConfig// 4. 设置 MaxOpenFiles rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))// process pods and exit.if runOnce {if _, err := k.RunOnce(podCfg.Updates()); err != nil {return fmt.Errorf("runonce failed: %v", err)}klog.Info("Started kubelet as runonce")} else {// 5. 调用 startKubeletstartKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)klog.Info("Started kubelet")}return nil
}
createAndInitKubelet
createAndInitKubelet
中主要调用了三个方法来完成 kubelet 的初始化:
kubelet.NewMainKubelet
函数使用提供的配置参数创建一个新的kubelet实例。它接收一些输入参数,例如节点名称、配置对象、主节点的网络地址等,以及其他一些选项。它会初始化kubelet的各种子系统和依赖项,并返回一个指向kubelet实例的指针k.BirthCry()
向 apiserver 发送一条 kubelet 启动了的 event;k.StartGarbageCollection()
启动 垃圾回收,回收 container 和 images;
func createAndInitKubelet(....) (k kubelet.Bootstrap, err error) {// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop// up into "per source" synchronizations// 实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;k, err = kubelet.NewMainKubelet(....)if err != nil {return nil, err}// 向 apiserver 发送一条 kubelet 启动了的 event;k.BirthCry()// 启动 垃圾回收,回收 container 和 images;k.StartGarbageCollection()return k, nil
}
startKubelet
k.Run(()
,启动 kubelet 中的所有模块以及主流程- 启动 kubelet server,默认http监听端口是10250
- 启动只读状态http服务,默认是10255
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {// start the kubelet// 启动 kubelet 中的所有模块以及主流程go k.Run(podCfg.Updates())// start the kubelet server// 启动 kubelet server,默认http监听端口是10250if enableServer {go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)}// 只读状态api,默认是10255if kubeCfg.ReadOnlyPort > 0 {go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)}// 启动DefaultFeatureGate中对应http服务的相关端口if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {go k.ListenAndServePodResources()}
}
Kubelet.Run
Run
方法是启动 kubelet 的核心方法,其中会启动 kubelet 的依赖模块以及主循环逻辑,该方法的主要逻辑为:
- 1、注册 logServer;
- 2、判断是否需要启动 cloud provider sync manager;
- 3、调用
kl.initializeModules
首先启动不依赖 container runtime 的一些模块; - 4、启动
volume manager
; - 5、执行
kl.syncNodeStatus
定时同步 Node 状态; - 6、调用
kl.fastStatusUpdateOnce
更新容器运行时启动时间以及执行首次状态同步; - 7、判断是否启用
NodeLease
机制; - 8、执行
kl.updateRuntimeUp
定时更新 Runtime 状态; - 9、执行
kl.syncNetworkUtil
定时同步 iptables 规则; - 10、执行
kl.podKiller
定时清理异常 pod,当 pod 没有被 podworker 正确处理的时候,启动一个goroutine 负责 kill 掉 pod; - 11、启动
statusManager
; - 12、启动
probeManager
; - 13、启动
runtimeClassManager
; - 14、启动
pleg
; - 15、调用
kl.syncLoop
监听 pod 变化;
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {// 配置 logServerif kl.logServer == nil {kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))}// 检查 kubeClientif kl.kubeClient == nil {klog.Warning("No api server defined - no node status update will be sent.")}// Start the cloud provider sync manager// 启动 cloudResourceSyncManager if kl.cloudResourceSyncManager != nil {go kl.cloudResourceSyncManager.Run(wait.NeverStop)}// 在initializeModules()中会启动 imageManager,serverCertificateManager,oomWatcher,resourceAnalyzerif err := kl.initializeModules(); err != nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.Fatal(err)}// Start volume manager// 启动 volumeManagergo kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)if kl.kubeClient != nil {// Start syncing node status immediately, this may set up things the runtime needs to run.// 同步 node 信息go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)// 调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步go kl.fastStatusUpdateOnce()// start syncing lease// 启动 nodeLease控制器,nodeLease是一种节点健康检查机制go kl.nodeLeaseController.Run(wait.NeverStop)}// 执行 kl.updateRuntimeUp 定时更新 Runtime 状态,如果runtime (例如docker) 状态检查返回false,kubelet 将处于not ready状态go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)// Set up iptables util rules// 同步 iptables 规则if kl.makeIPTablesUtilChains {kl.initNetworkUtil()}// Start a goroutine responsible for killing pods (that are not properly// handled by pod workers).// 定时清理异常 podgo wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)// Start component sync loops.// 启动 statusManager、probeManager、runtimeClassManagerkl.statusManager.Start()kl.probeManager.Start()// Start syncing RuntimeClasses if enabled.if kl.runtimeClassManager != nil {kl.runtimeClassManager.Start(wait.NeverStop)}// Start the pod lifecycle event generator.// 启动 pleg 即Pod生命周期实际管理器kl.pleg.Start()// 调用 kl.syncLoop 监听 pod 变化kl.syncLoop(updates, kl)
}
nodeLease 机制是 kubelet 与控制平面交互的机制之一,用于保持节点的活性以及更新节点的状态。是Kubernetes 引入了节点健康检查机制。nodeLease 机制允许 kubelet 周期性地向控制平面报告自己的“租赁”,以证明它仍然在运行。如果 kubelet 停止发送此报告,则控制平面将认为该节点已离线,并在控制平面上更新该节点的状态为“离线”。
initializeModules
initializeModules
函数负责初始化kubelet的各个子系统和功能模块,以便kubelet能够正常运行和管理容器化工作负载。具体而言,它的工作包括:
-
1、创建kubelet工作的文件目录,由参数–root-dir指定
-
2、创建 ContainerLogsDir
-
3、启动 imageManager
-
4、启动 certificate manager
-
5、启动 oomWatcher.
-
6、启动 resource analyzer
func (kl *Kubelet) initializeModules() error {metrics.Register(kl.runtimeCache,collectors.NewVolumeStatsCollector(kl),collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),)metrics.SetNodeName(kl.nodeName)servermetrics.Register()// 1、创建文件目录,由参数--root-dir指定if err := kl.setupDataDirs(); err != nil {return err}// 2、创建 ContainerLogsDirif _, err := os.Stat(ContainerLogsDir); err != nil {if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)}}// 3、启动 imageManagerkl.imageManager.Start()// 4、启动 certificate manager if kl.serverCertificateManager != nil {kl.serverCertificateManager.Start()}// 5、启动 oomWatcher.if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {return fmt.Errorf("failed to start OOM watcher %v", err)}// 6、启动 resource analyzerkl.resourceAnalyzer.Start()return nil
}
Kubelet.synloop
syncLoop是处理pod变化的主循环。它监视来自三个通道(file、apisserver和http)的更改,并创建它们的联合。如果发现到任何新更改,将针对期望状态和运行状态运行同步处理。如果配置没有变化,将每同步频率秒同步最后一个已知的所需状态。
Kubelet.synloop 启动for{}循环调用Kubelet.syncLoopIteration
对应监听的通道的变化事件进行处理
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {klog.Info("Starting kubelet main sync loop.")// The syncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s.syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop()housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()plegCh := kl.pleg.Watch()const (base = 100 * time.Millisecondmax = 5 * time.Secondfactor = 2)duration := base// Responsible for checking limits in resolv.conf// The limits do not have anything to do with individual pods// Since this is called in syncLoop, we don't need to call it anywhere elseif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {kl.dnsConfigurer.CheckLimitsForResolvConf()}// 循环处理 syncLoopIteration for {if err := kl.runtimeState.runtimeErrors(); err != nil {klog.Errorf("skipping pod synchronization - %v", err)// exponential backofftime.Sleep(duration)duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration = basekl.syncLoopMonitor.Store(kl.clock.Now())if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}
Kubelet.syncLoopIteration
kubelet 的 pods 同步逻辑都在 syncLoopIteration
这里. syncLoopIteration
同时监听下面的 chan, 根据事件做不同的处理.
- configCh: 监听 file, http, apiserver 的事件更新
- syncCh: 定时器管道, 每隔一秒去同步最新保存的 pod 状态
- houseKeepingCh: housekeeping 事件的管道,做 pod 清理工作
- plegCh: 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.
- livenessManager.Updates: 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {// 监听 file, http, apiserver 的事件更新case u, open := <-configCh:// Update from a config source; dispatch it to the right handler// callback.if !open {klog.Errorf("Update channel is closed. Exiting the sync loop.")return false}switch u.Op {case kubetypes.ADD:klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.RESTORE:klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))// These are pods restored from the checkpoint. Treat them as new// pods.handler.HandlePodAdditions(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?klog.Errorf("Kubelet does not support snapshot update")}if u.Op != kubetypes.RESTORE {// If the update type is RESTORE, it means that the update is from// the pod checkpoints and may be incomplete. Do not mark the// source as ready.// Mark the source ready after receiving at least one update from the// source. Once all the sources are marked ready, various cleanup// routines will start reclaiming resources. It is important that this// takes place only after kubelet calls the update handler to process// the update to ensure the internal pod cache is up-to-date.kl.sourcesReady.AddSource(u.Source)}// 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.case e := <-plegCh:if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)}}if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}// 每隔一秒去同步最新保存的 pod 状态case <-syncCh:// Sync pods waiting for syncpodsToSync := kl.getPodsToSync()if len(podsToSync) == 0 {break}klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))handler.HandlePodSyncs(podsToSync)// 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作case update := <-kl.livenessManager.Updates():if update.Result == proberesults.Failure {// The liveness manager detected a failure; sync the pod.// We should not use the pod from livenessManager, because it is never updated after// initialization.pod, ok := kl.podManager.GetPodByUID(update.PodUID)if !ok {// If the pod no longer exists, ignore the update.klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)break}klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))handler.HandlePodSyncs([]*v1.Pod{pod})}// housekeeping 事件的管道,做 pod 清理工作case <-housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")} else {klog.V(4).Infof("SyncLoop (housekeeping)")if err := handler.HandlePodCleanups(); err != nil {klog.Errorf("Failed cleaning pods: %v", err)}}}return true
}
kubelet工作原理示意图:
(图片来源于网络,如有侵权请联系作者)
此处总结一下 kubelet 启动逻辑中的调用关系如下所示:
|--> NewMainKubelet||--> createAndInitKubelet --|--> BirthCry| ||--> RunKubelet --| |--> StartGarbageCollection| || |--> startKubelet --> k.Run --> kl.syncLoop --> kl.syncLoopIteration|
NewKubeletCommand --> Run --> run --|--> http.ListenAndServe||--> daemon.SdNotify
结论
本文主要介绍了 kublet的作用以及工作原理。之后从源码分析了kubelet 的启动流程,从分析过程中可以看到 kubelet 启动流程中的环节非常多,kubelet 中也包含了非常多的模块,如果要深入掌握kubelet的工作机制,则后面得对各个模板进行单独详细展开分析。
参考文档
https://feisky.gitbooks.io/kubernetes/content/components/kubelet.html
https://juejin.cn/post/6844903694618607623
https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kubelet_init.html
这篇关于kubelet组件的启动流程源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!