open yurt之yurt-tunel-server iptables规则源码分析

2023-11-08 00:18

本文主要是介绍open yurt之yurt-tunel-server iptables规则源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

yurt-tunel-server是open yurt开源的用于转发来自K8s API server的包含logs、exec和metrics等运维指令请求到边缘节点yurt-tunel-agent的反向代理。
image.png

源码分析

版本v0.4.1,入口函数中:

	if cfg.EnableIptables {
//新建IptablesManager对象iptablesMgr := iptables.NewIptablesManager(cfg.Client,  //KAS clientcfg.SharedInformerFactory.Core().V1().Nodes(),  //node informercfg.ListenAddrForMaster,  //https监听地址端口,默认端口是10250cfg.ListenInsecureAddrForMaster,   //http监听地址端口,默认“127.0.0.1:10255”cfg.IptablesSyncPeriod,   //同步更新iptalbes规则的时间间隔,默认60秒,最小15秒。)if iptablesMgr == nil {return fmt.Errorf("fail to create a new IptableManager")}wg.Add(1)go iptablesMgr.Run(stopCh, &wg)   //启动规则同步任务}
// 定时更新规则
func (im *iptablesManager) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {defer wg.Done()// 等待节点缓存完成同步,也就是从k8s里获取到所有节点信息并缓存起来if !cache.WaitForCacheSync(stopCh,im.nodeInformer.Informer().HasSynced) {klog.Error("sync node cache timeout")return}// 服务启动后首次更新iptables规则im.syncIptableSetting()ticker := time.NewTicker(time.Duration(im.syncPeriod) * time.Second)defer ticker.Stop()for {select {case <-stopCh:klog.Info("stop the iptablesManager")im.cleanupIptableSetting()returncase <-ticker.C:im.syncIptableSetting() //定时更新iptables规则}}
}
//更新iptables规则
func (im *iptablesManager) syncIptableSetting() {//从k8s里指定configMap对象存里提取出除10250和10255之外还需要进行DNAT到insecurePort的边缘侧端口数组dnatPorts, err := util.GetConfiguredDnatPorts(im.kubeClient, im.insecurePort)if err != nil {klog.Errorf("failed to sync iptables rules, %v", err)return}//和上一次缓存的端口数组进行比对,分出哪些是被删除了的,内部源码略去portsChanged, deletedDnatPorts := im.getDeletedPorts(dnatPorts)//将端口20150和10255加入到端口数组中currentDnatPorts := append(dnatPorts, kubeletSecurePort, kubeletInsecurePort)// 从k8s查询出所有不带有agent标记的节点,也就是所有云端节点的internal IPnodesIP := im.getIPOfNodesWithoutAgent()//和上一次缓存的节点internal IP数组进行比对,分出哪些节点新增的,哪些节点是已删除了的,内部源码略去nodesChanged, addedNodesIP, deletedNodesIP := im.getAddedAndDeletedNodes(nodesIP)//将回环境地址127.0.0.1加入到端口数组中currentNodesIP := append(nodesIP, loopbackAddr)// 更新iptables规则err = im.ensurePortsIptables(currentDnatPorts, deletedDnatPorts, currentNodesIP, deletedNodesIP)if err != nil {klog.Errorf("failed to ensurePortsIptables: %v", err)return}// 如果端口有变化,更新端口数组缓存lastDnatPorts if portsChanged {im.lastDnatPorts = dnatPortsif len(deletedDnatPorts) != 0 {// 针对删除的DNAT端口,需要清除conntrack相关的规则im.clearConnTrackEntries(currentNodesIP, deletedDnatPorts)}klog.Infof("dnat ports changed, %v", dnatPorts)}// 如果云端节点internal IP有变化,更新云端节点internal IP数组lastNodesIPif nodesChanged {im.lastNodesIP = nodesIPim.clearConnTrackEntries(append(addedNodesIP, deletedNodesIP...), currentDnatPorts)klog.Infof("directly access nodes changed, %v for ports %v", nodesIP, currentDnatPorts)}
}
// 从k8s里指定命名空间里查询出指定名称的configMap对象,存有DNAT端口映射对,返回需要进行DANT的端口数组。也就是说,除10250和10255之外的边缘侧端口,如果还需要进行DNAT转发,就在这里配置
func GetConfiguredDnatPorts(client clientset.Interface, insecurePort string) ([]string, error) {ports := make([]string, 0)c, err := client.CoreV1().ConfigMaps(YurttunnelServerDnatConfigMapNs).Get(context.Background(), YurttunnelServerDnatConfigMapName, metav1.GetOptions{})if err != nil {if apierrors.IsNotFound(err) {return nil, fmt.Errorf("configmap %s/%s is not found",YurttunnelServerDnatConfigMapNs,YurttunnelServerDnatConfigMapName)} else {return nil, fmt.Errorf("fail to get configmap %s/%s: %v",YurttunnelServerDnatConfigMapNs,YurttunnelServerDnatConfigMapName, err)}}pairStr, ok := c.Data[yurttunnelServerDnatDataKey]if !ok || len(pairStr) == 0 {return ports, nil}portsPair := strings.Split(pairStr, ",")for _, pair := range portsPair {portPair := strings.Split(pair, "=")if len(portPair) == 2 && portPair[1] == insecurePort && len(portPair[0]) != 0 {if portPair[0] != "10250" && portPair[0] != "10255" {ports = append(ports, portPair[0])}}}return ports, nil
}
//从k8s查询出所有不带有agent标记的节点,也就是所有云端节点的internal IP
func (im *iptablesManager) getIPOfNodesWithoutAgent() []string {var nodesIP []stringnodes, err := im.nodeInformer.Lister().List(labels.Everything())if err != nil {klog.Errorf("failed to list nodes for iptables: %v", err)return nodesIP}for i := range nodes {if withoutAgent(nodes[i]) && isNodeReady(nodes[i]) {nodeIPs := getNodeInternalIPs(nodes[i])nodesIP = append(nodesIP, nodeIPs...)}}klog.V(4).Infof("nodes without %s: %s", projectinfo.GetAgentName(), strings.Join(nodesIP, ","))metrics.Metrics.ObserveCloudNodes(len(nodesIP))return nodesIP
}
var (tunnelCommentStr   = strings.ReplaceAll(projectinfo.GetTunnelName(), "-", " ")//全局变量:iptables跳转规则链数组,初始化一条跳转规则:在DNAT表的OUTPUT链,tcp协议数据包跳到yurttunnelServerPortChain子链处理iptablesJumpChains = []iptablesJumpChain{{table:     iptables.TableNAT,dstChain:  yurttunnelServerPortChain,srcChain:  iptables.ChainOutput,comment:   fmt.Sprintf("%s server port", tunnelCommentStr),extraArgs: []string{"-p", "tcp"},},}
)func (im *iptablesManager) ensurePortsIptables(currentPorts, deletedPorts, currentIPs, deletedIPs []string) error {//针对每个端口,确保有一条从yurttunnelServerPortChain子链跳到该端口号作为后缀命名的二级子链jumpChains := iptablesJumpChainsfor _, port := range currentPorts {jumpChains = append(jumpChains, iptablesJumpChain{table:     iptables.TableNAT,dstChain:  iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port)),srcChain:  yurttunnelServerPortChain,comment:   fmt.Sprintf("jump to port %s", port),extraArgs: []string{"-p", "tcp", "--dport", port},})}if err := im.ensureJumpChains(jumpChains); err != nil {klog.Errorf("Failed to ensure jump chain, %v", err)return err}// 针对每个端口,确保二级子链中的规则,于是重点就在ensurePortIptables函数中for _, port := range currentPorts {err := im.ensurePortIptables(port, currentIPs, deletedIPs)if err != nil {return err}}if len(deletedPorts) == 0 {return nil}// 如果某个端口被删除,则清除相应的二级子链var deletedJumpChains []iptablesJumpChainfor _, port := range deletedPorts {deletedJumpChains = append(deletedJumpChains, iptablesJumpChain{table:     iptables.TableNAT,dstChain:  iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port)),srcChain:  yurttunnelServerPortChain,comment:   fmt.Sprintf("jump to port %s", port),extraArgs: []string{"-p", "tcp", "--dport", port},})}if err := im.deleteJumpChains(deletedJumpChains); err != nil {klog.Errorf("Failed to delete jump chain, %v", err)return err}return nil
}
func (im *iptablesManager) ensurePortIptables(port string, currentIPs, deletedIPs []string) error {//port 对应的二级子链名portChain := iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port))//如果没有云端节点,则清除该二级链下的所有规则,实际上这个条件应该不会出现true的情况if len(currentIPs) == 0 {_ = im.iptables.FlushChain(iptables.TableNAT, portChain)return nil}// 确保port对应的二级子链存在,没有则添加if _, err := im.iptables.EnsureChain(iptables.TableNAT, portChain); err != nil {klog.Errorf("could not ensure chain for tunnel server port(%s), %v", port, err)return err}//proxyDest用来放本地DNAT重定向目的地址。 如果API请求端口是10250,那么代理地址就是本地https监听地址,否则是本地http监听地址proxyDest := im.insecureDnatDestif port == kubeletSecurePort {proxyDest = im.secureDnatDest}// 针对所有现存云端节点的internal IP, tcp协议数据包全部直接RETURN操作,即不做DNAT处理。也就是说,如果API请求的目的IP是云端节点,那就不进行DNAT转发。for _, ip := range currentIPs {reqReturnPortIptablesArgs := reqReturnIptablesArgs(reqReturnComment, port, ip)_, err := im.iptables.EnsureRule(iptables.Prepend,iptables.TableNAT, portChain, reqReturnPortIptablesArgs...)if err != nil {klog.Errorf("could not ensure -j RETURN iptables rule for %s:%s: %v", ip, port, err)return err}}// 其它的tcp协议数据包,重定向到代理地址。也就是说,如果API请求的目的IP是边缘节点,那就进行DNAT转发到本地DNAT重定向目的地址,进而通过隧道转发到边缘dnatPortIptablesArgs := dnatIptablesArgs(dnatToTunnelComment, port, proxyDest)_, err := im.iptables.EnsureRule(iptables.Append,iptables.TableNAT, portChain, dnatPortIptablesArgs...)if err != nil {klog.Errorf("could not ensure dnat iptables rule for %s, %v", port, err)return err}//针对所有已删除云端节点的internal IP,删除对应的RETURN规则。for _, ip := range deletedIPs {deletedIPIptablesArgs := reqReturnIptablesArgs(reqReturnComment, port, ip)err = im.iptables.DeleteRule(iptables.TableNAT,portChain, deletedIPIptablesArgs...)if err != nil {klog.Errorf("could not delete old iptables rules for %s:%s: %v", ip, port, err)return err}}return nil
}
//公共规则参数,也就是匹配API请求的目的端口
func iptablesCommonArgs(msg, destPort string, destIP net.IP) []string {args := []string{"-p", "tcp","-m", "comment",}if len(msg) != 0 {args = append(args, "--comment", msg)}if len(destPort) != 0 {args = append(args, "--dport", destPort)}if destIP != nil {ip := toCIDR(destIP)args = append(args, "-d", ip)}return args
}//DNAT操作规则参数
func dnatIptablesArgs(msg, destPort, proxyDest string) []string {args := iptablesCommonArgs(msg, destPort, nil)args = append(args, "-j", "DNAT", "--to-destination", proxyDest)return args
}//RETURN操作规则参数,非DNAT
func reqReturnIptablesArgs(msg, destPort, ip string) []string {destIP := net.ParseIP(ip)args := iptablesCommonArgs(msg, destPort, destIP)args = append(args, "-j", "RETURN")return args
}

总结

  • 定时从k8s获取云端节点信息(主要是节点的internal IP)列表和需要代理转发的目的端口(10250和10255不需要在configMap中配置,其它端口需要在在configMap中配置)列表,进行iptables规则更新。
  • 凡是目的地址是云端节点的tcp包都不做DNAT处理,非云端节点的tcp包就做DNAT处理进而交由tunel转发到边缘节点。

这篇关于open yurt之yurt-tunel-server iptables规则源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/366916

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

查询SQL Server数据库服务器IP地址的多种有效方法

《查询SQLServer数据库服务器IP地址的多种有效方法》作为数据库管理员或开发人员,了解如何查询SQLServer数据库服务器的IP地址是一项重要技能,本文将介绍几种简单而有效的方法,帮助你轻松... 目录使用T-SQL查询方法1:使用系统函数方法2:使用系统视图使用SQL Server Configu

Ollama整合open-webui的步骤及访问

《Ollama整合open-webui的步骤及访问》:本文主要介绍如何通过源码方式安装OpenWebUI,并详细说明了安装步骤、环境要求以及第一次使用时的账号注册和模型选择过程,需要的朋友可以参考... 目录安装环境要求步骤访问选择PjrIUE模型开始对话总结 安装官方安装地址:https://docs.

SQL Server数据库迁移到MySQL的完整指南

《SQLServer数据库迁移到MySQL的完整指南》在企业应用开发中,数据库迁移是一个常见的需求,随着业务的发展,企业可能会从SQLServer转向MySQL,原因可能是成本、性能、跨平台兼容性等... 目录一、迁移前的准备工作1.1 确定迁移范围1.2 评估兼容性1.3 备份数据二、迁移工具的选择2.1

关于Gateway路由匹配规则解读

《关于Gateway路由匹配规则解读》本文详细介绍了SpringCloudGateway的路由匹配规则,包括基本概念、常用属性、实际应用以及注意事项,路由匹配规则决定了请求如何被转发到目标服务,是Ga... 目录Gateway路由匹配规则一、基本概念二、常用属性三、实际应用四、注意事项总结Gateway路由

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Redis 多规则限流和防重复提交方案实现小结

《Redis多规则限流和防重复提交方案实现小结》本文主要介绍了Redis多规则限流和防重复提交方案实现小结,包括使用String结构和Zset结构来记录用户IP的访问次数,具有一定的参考价值,感兴趣... 目录一:使用 String 结构记录固定时间段内某用户 IP 访问某接口的次数二:使用 Zset 进行

SQL Server使用SELECT INTO实现表备份的代码示例

《SQLServer使用SELECTINTO实现表备份的代码示例》在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误,在SQLServer中,可以使用SELECTINT... 在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误。在 SQL Server 中,可以使用 SE

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操