Cloud Foundry中collector组件的源码分析

2023-12-08 00:32

本文主要是介绍Cloud Foundry中collector组件的源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

        在Cloud Foundry中有一个叫collector的组件,该组件的功能是通过消息总线发现在Cloud Foundry中注册过的各个组件的信息,然后通过varz和healthz接口来查询它们的信息并发送到指定的存储位置。

        本文从collector的功能出发,主要讲述以上两个功能的源码实现。

发现注册组件

        在Cloud Foundry中,每个组件在启动的时候后会以一个component的形式向Cloud Foundry注册,同时也会作为一个组件,向NATS发布一些启动信息。

        首先以DEA为例,讲述该组件register与向NATS publish信息的实现。首先看以下/dea/lib/dea/agent.rb中register的代码:

        VCAP::Component.register(:type => 'DEA',:host => @local_ip,:index => @config['index'],:config => @config,:port => status_config['port'],:user => status_config['user'],:password => status_config['password'])

        这段代码表示,DEA通过VCAP::Component对象中的register方法,实现注册。以下进入vcap-common/lib/vcap/component.rb中的register方法:

      def register(opts)uuid = VCAP.secure_uuid……auth = [opts[:user] || VCAP.secure_uuid, opts[:password] || VCAP.secure_uuid]@discover = {:type => type,……:credentials => auth,:start => Time.now}……@healthz = "ok\n".freezestart_http_server(host, port, auth, logger)nats.subscribe('vcap.component.discover') do |msg, reply|update_discover_uptimenats.publish(reply, @discover.to_json)endnats.publish('vcap.component.announce', @discover.to_json)@discoverend

        可见,在实现register方法的时候,首先通过传递进来的opts参数,构建一个@discover实例变量,订阅了一个vcap.component.discover的消息,又发布了一个vcap.component.announce的消息。关于collector的发现注册组件的功能中,直接关联的register方法中发布的主题,因为collector通过订阅这个主题的消息,将json化的@discover变量取到,然后做相应的处理。

    def send_data(data)@adapters.each do |adapter|beginadapter.send_data(data)rescue => eConfig.logger.warn("collector.historian-adapter.sending-data-error", adapter: adapter.class.name, error: e, backtrace: e.backtrace)endendend

        之前已经涉及了collector组件订阅消息的话题,现在进入collector/lib/collector.rb中,实现消息的订阅还有其他的消息请求:

      @nats = NATS.connect(:uri => Config.nats_uri) doConfig.logger.info("collector.nats.connected")# Send initially to discover what's already running@nats.subscribe(ANNOUNCE_SUBJECT) { |message| process_component_discovery(message) }@inbox = NATS.create_inbox@nats.subscribe(@inbox) { |message| process_component_discovery(message) }@nats.publish(DISCOVER_SUBJECT, "", @inbox)@nats.subscribe(COLLECTOR_PING) { |message| process_nats_ping(message.to_f) }setup_timersend
        当collector接收到由ANNOUNCE_SUBJECT主题发布过来的message(也就是json化的@discover变量)后,将该内容传递后方法process_component_discovery。以下是process_component_discovery方法的代码实现:

    def process_component_discovery(message)message = Yajl::Parser.parse(message)if message["index"]Config.logger.debug1("collector.component.discovered", type: message["type"], index: message["index"], host: message["host"])instances = (@components[message["type"]] ||= {})instances[message["host"].split(":").first] = {:host => message["host"],:index => message["index"],:credentials => message["credentials"],:timestamp => Time.now.to_i}endrescue => eConfig.logger.warn("collector.component.discovery-failure", error: e.message, backtrace: e.backtrace)end
        在该方法中,首先对message对象进行解析,若新产生的message对象中index键,则继续往下的操作:在@components对象中,视情况添加一个instance。如贴出的代码,其中两行标注为红色的代码需要理解:首先如果@components[message["type"]]不为空,则将@components[message["type"]]赋值给instances;若为空的话,那就把@components[message["type"]]赋为空,如果之前不存在message["type"]这个键的话,那就先创建一个这样的键,然后再赋为空,最后还是将@components[message["type"]]赋值给instances。在这里需要注意的是,instances与@components[message["type"]]是的首地址是相同的,所以之后给instances添加键值对的时候也是向@components[message["type"]]中添加键值对。需要提一下的是:一个instances代表Cloud Foundry中同一种类型的组件,instances中的每一个instance代表该类型组件的一个实际节点,而且可以发现instance是通过IP来设立键的,因此可见,在Cloud Foundry中相同类型的组件是不能或者不建议共存在同一个节点上或者共享一张网卡的。

        一般情况下,当Cloud Foundry的组件启动是发布vcap.component.announce消息后,很快在@components中就会有相应的信息,这样的话,也就是实现了“发现注册组件”的功能。


获取组件varz和healthz并发送

        在以上功能中,collector只是实现了发现组件,只包括组件的ip:port信息,index,crdentials等信息,并不带有其他关于组件运行时产生的数据信息。而通过varz和healthz访问那些注册的组件,正好可以做到这些收集varz和healthz信息。

        实现的过程中,首先是使用添加周期性定时器:EM.add_periodic_timer(Config.varz_interval) { fetch_varz },在一定的周期内,执行fetch_varz方法,以下进入fetch_varz方法:

    def fetch_varzfetch(:varz) do |http, job, index|……endend
        进入fetch_varz方法后,首先是调用fetch方法,参数为:varz,可见通过fetch方法会返回三个值,并作为之后的代码块的参数传入。现在进入fetch方法:

    def fetch(type)@components.each do |job, instances|instances.each do |index, instance|next unless credentials_ok?(job, instance)host = instance[:host]uri = "http://#{host}/#{type}"http = EventMachine::HttpRequest.new(uri).get(:head => authorization_headers(instance))……http.callback dobeginyield http, job, instance[:index]rescue => e……endendendendend
        在发现组件该模块中,以及讲解过@components变量的含义,该方法中首先遍历@components中的每一类组件,再遍历每一类组件中的每一个组件实例,对并该组件实例发起获取varz的请求。实现请求的过程中,首先查阅instance的credentails是否具有,然后组件请求的uri,然后通过EventMachine发送http请求,当请求响应返回的时候,通过yield关键字,将http,job以及instance[:index]返回给fetch_varz方法的代码块,fetch_varz代码块的实现如下:

        varz = Yajl::Parser.parse(http.response)now = Time.now.to_ihandler = Handler.handler(@historian, job)Config.logger.debug("collector.job.process", job: job, handler: handler)ctx = HandlerContext.new(index, now, varz)handler.do_process(ctx)
        首先对http.response进行解析,然后通过Handler类的handler方法创建一个handler对象,其中需要注意的是调用了一个@historian对象,在Collector对象的初始化中,有代码:@historian = ::Collector::Historian.build。

        @historian对象的功能是创建了网络连接,具体代码实现在/collector/lib/collector/historian.rb中:

class Historiandef self.buildhistorian = newif Config.tsdbhistorian.add_adapter(Historian::Tsdb.new(Config.tsdb_host, Config.tsdb_port))Config.logger.info("collector.historian-adapter.added-opentsdb", host: Config.tsdb_host)endif Config.aws_cloud_watchhistorian.add_adapter(Historian::CloudWatch.new(Config.aws_access_key_id, Config.aws_secret_access_key))Config.logger.info("collector.historian-adapter.added-cloudwatch")endif Config.datadoghistorian.add_adapter(Historian::DataDog.new(Config.datadog_api_key, HTTParty))Config.logger.info("collector.historian-adapter.added-datadog")endhistorianend……
end
        可以看到@Historian对象的创建是添加了三个网络适配器,或者说是三条连接分别是Tsdb,aws_cloud_watch以及DataDog。当之后需要@historian对象发送数据的时候,也就是通过者三条连接,将数据发送出去,本文马上将涉及这一块。

        现在回到fetch_varz中,创建为handler实例对象之后,还创建了一个ctx对象,最后通过handler的do_process方法处理了ctx对象,现在进入collector/lib/collector/handler.rb中,查看do_process方法:

    def do_process(context)varz = context.varzsend_metric("mem_free_bytes", varz["mem_free_bytes"], context) if varz["mem_free_bytes"]……varz.fetch("log_counts", {}).each do |level, count|next unless %w(fatal error warn).include?(level)send_metric("log_count", count, context, {"level" => level})endprocess(context)end
        首先解析出varz对象,并通过send_metric方法将数据发送出去,Handler的子类在执行process方法时,都会调用自己覆写的process方法,以DEA为例,collector/lib/collector/handlers/dea.rb中的process方法为:

      def process(context)send_metric("can_stage", context.varz["can_stage"], context)……state_counts(context).each do |state, count|send_metric("dea_registry_#{state.downcase}", count, context)endmetrics = registry_usage(context)send_metric("dea_registry_mem_reserved", metrics[:mem], context)send_metric("dea_registry_disk_reserved", metrics[:disk], context)end
        可以看到其实在process方法也仅仅是将不同组件各自对应的属性值,通过send_metirc方法发送出去。以下进入collector/lib/collector/handler.rb中的send_metric方法中:

    def send_metric(name, value, context, tags = {})tags.merge!(additional_tags(context))……@historian.send_data({key: name,timestamp: context.now,value: value,tags: tags})end
       在send_metric代码中,最后通过@historian对象中的send_data实现数据的发送,也就是之前说到的,@historian向@adapter的三条连接中,发送相应的数据,在collector/lib/collector/handler.rb中:

    def send_data(data)@adapters.each do |adapter|beginadapter.send_data(data)rescue => eConfig.logger.warn("collector.historian-adapter.sending-data-error", adapter: adapter.class.name, error: e, backtrace: e.backtrace)endendend

        以上讲述从获取varz到发送给TSDB等数据存储模块的流程,关于发送的是什么信息,还没有具体深入。这里以router为例,讲述发送的信息的类型。源码于collector/lib/collector/handlers/router.rb中:

      def process(context)varz = context.varzsend_metric("router.total_requests", varz["requests"], context)send_metric("router.total_routes", varz["urls"], context)send_metric("router.ms_since_last_registry_update", varz["ms_since_last_registry_update"], context)send_metric("router.bad_requests", varz["bad_requests"], context)send_metric("router.bad_gateways", varz["bad_gateways"], context)return unless varz["tags"]varz["tags"].each do |key, values|values.each do |value, metrics|if key == "component" && value.start_with?("dea-")# dea_id looks like "dea-1", "dea-2", etcdea_id = value.split("-")[1]# These are app requests, not requests to the dea. So we change the component to "app".tags = {:component => "app", :dea_index => dea_id }elsetags = {key => value}endsend_metric("router.requests", metrics["requests"], context, tags)send_latency_metric("router.latency.1m", metrics["latency"], context, tags)["2xx", "3xx", "4xx", "5xx", "xxx"].each do |status_code|send_metric("router.responses", metrics["responses_#{status_code}"], context, tags.merge("status" => status_code))endendendend
        可见在接收到router的varz信息之后,collector会从中取出过个键值,并进行发送,比如说router的“total_requests”,"total_routes","bad_requests","bad_gateways","router.latency.1m","response_2xxx"等。也正式collector通过分析varz并王TSDB发送这样的信息,所以TSDB中可以存有这些信息,最终DashBoard可以通过获取TSDB中的信息,并显示给用户,当然用户可以看到router组件的请求数,请求延迟,请求的响应时间等,也就不足为怪了。
        

        综上代码分析,可以得到框架图如下:


        以上便是Cloud Foundry中collector组件的功能分析。


关于作者:

孙宏亮,DAOCLOUD软件工程师。两年来在云计算方面主要研究PaaS领域的相关知识与技术。坚信轻量级虚拟化容器的技术,会给PaaS领域带来深度影响,甚至决定未来PaaS技术的走向。


转载请注明出处。

这篇文档更多出于我本人的理解,肯定在一些地方存在不足和错误。希望本文能够对接触Cloud Foundry中collector组件的人有些帮助,如果你对这方面感兴趣,并有更好的想法和建议,也请联系我。

我的邮箱:allen.sun@daocloud.io
新浪微博: @莲子弗如清


这篇关于Cloud Foundry中collector组件的源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/467874

相关文章

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟 开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚 第一站:海量资源,应有尽有 走进“智听

如何在页面调用utility bar并传递参数至lwc组件

1.在app的utility item中添加lwc组件: 2.调用utility bar api的方式有两种: 方法一,通过lwc调用: import {LightningElement,api ,wire } from 'lwc';import { publish, MessageContext } from 'lightning/messageService';import Ca

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud