filebeat源码分析服务启动

2024-05-10 17:58

本文主要是介绍filebeat源码分析服务启动,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在开始源码分析之前先说一下filebeat是什么?beats是知名的ELK日志分析套件的一部分。它的前身是logstash-forwarder,用于收集日志并转发给后端(logstash、elasticsearch、redis、kafka等等)。filebeat是beats项目中的一种beats,负责收集日志文件的新增内容。当前的代码分支是最新的6.x的代码。
先看我们服务启动配置文件的一个例子,这个是filebeat采集k8s的日志的一个例子:

filebeat.prospectors:
- type: logpaths:- /var/lib/docker/containers/*/*-json.log- /var/log/filelog/containers/*/*/*/*.logprocessors:
- add_docker_metadata:host: "unix:///var/run/docker.sock"
- add_fields:fields:log: '{message}'
- decode_json_fields:when:regexp:message: "{*}"fields: ["message"]overwrite_keys: truetarget: ""
- drop_fields:fields: ["docker.container.labels.annotation.io.kubernetes.container.terminationMessagePath", "docker.container.labels.annotation.io.kubernetes.container.hash", "docker.container.labels.annotation.io.kubernetes.container.terminationMessagePolicy", "docker.container.labels.annotation.io.kubernetes.pod.terminationGracePeriod", "beat.version", "docker.container.labels.annotation.io.kubernetes.container.ports", "docker.container.labels.io.kubernetes.container.terminationMessagePath", "docker.container.labels.io.kubernetes.container.restartCount", "docker.container.labels.io.kubernetes.container.ports", "docker.container.labels.io.kubernetes.container.hash", "docker.container.labels.io.kubernetes.pod.terminationGracePeriod", "docker.container.labels.annotation.io.kubernetes.container.restartCount", "message"]
- parse_level:levels: ["fatal", "error", "warn", "info", "debug"]field: "log"logging.level: info
setup.template.enabled: true
setup.template.name: "filebeat-%{+yyyy.MM.dd}"
setup.template.pattern: "filebeat-*"
setup.template.fields: "${path.config}/fields.yml"
setup.template.overwrite: true
setup.template.settings:index:analysis:analyzer:enncloud_analyzer:filter: ["standard", "lowercase", "stop"]char_filter: ["my_filter"]type: customtokenizer: standardchar_filter:my_filter:type: mappingmappings: ["-=>_"]output:elasticsearch:hosts: ["paasdev.enncloud.cn:9200"]index: "filebeat-%{+yyyy.MM.dd}"

filebeat启动时候会加载这个配置文件。再看看总结的接口libbeat/beat/beat.go

type Beater interface {// The main event loop. This method should block until signalled to stop by an// invocation of the Stop() method.Run(b *Beat) error// Stop is invoked to signal that the Run method should finish its execution.// It will be invoked at most once.Stop()
}

这个是每个beat都需要实现的两个接口,当然filebeat也不例外,filebeat/beater/filebeat.go
这个里面是filebeat的具体实现,篇幅有限就,省略的粘贴一下

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {var err errorconfig := fb.configif !fb.moduleRegistry.Empty() {err = fb.loadModulesPipelines(b)if err != nil {return err}}// Setup registrar to persist stateregistrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger)if err != nil {logp.Err("Could not init registrar: %v", err)return err}err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{ACKEvents: newEventACKer(registrarChannel).ackEvents,})if err != nil {logp.Err("Failed to install the registry with the publisher pipeline: %v", err)return err}crawler, err := crawler.New(channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,config.Prospectors,b.Info.Version,fb.done,*once)if err != nil {logp.Err("Could not init crawler: %v", err)return err}err = registrar.Start()if err != nil {return fmt.Errorf("Could not start registrar: %v", err)}var pipelineLoaderFactory fileset.PipelineLoaderFactoryif b.Config.Output.Name() == "elasticsearch" {pipelineLoaderFactory = newPipelineLoaderFactory(b.Config.Output.Config())} else {logp.Warn(pipelinesWarning)}err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory)if err != nil {crawler.Stop()return err}var adiscover *autodiscover.Autodiscoverif fb.config.Autodiscover != nil {adapter := NewAutodiscoverAdapter(crawler.ProspectorsFactory, crawler.ModulesFactory)adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover)if err != nil {return err}}adiscover.Start()return nil
}// Stop is called on exit to stop the crawling, spooling and registration processes.
func (fb *Filebeat) Stop() {logp.Info("Stopping filebeat")// Stop Filebeatclose(fb.done)
}

上面的代码省略的介绍了start和stop的函数,stop天简单,就是一个关闭的总开关,就不说了。
详细分析一下这个start方法,它是整个filebeat最核心的地方。
filebeat支持采集特定程序的日志,譬如redis、nginx等,这些都是通过module支持的,所以在程序开始时候先确定elasticsearch里面有没有这些关联的pipeline、ingest,

    if !fb.moduleRegistry.Empty() {err = fb.loadModulesPipelines(b)if err != nil {return err}}

深入看看注册方法

func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {if b.Config.Output.Name() != "elasticsearch" {logp.Warn(pipelinesWarning)return nil}// 这里注册一个回调的方法,每当和一个es建立连接的时候,都会重新和es确认这些pipelinecallback := func(esClient *elasticsearch.Client) error {return fb.moduleRegistry.LoadPipelines(esClient)}elasticsearch.RegisterConnectCallback(callback)return nil
}

上面的代码主要是先和es确认一下pipeline,下面接着看启动,然后就创建registrar,registrar是啥呢?

registrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger)if err != nil {logp.Err("Could not init registrar: %v", err)return err}

其实它是注册日志读取进度的,通过记录offset,下面就是我截取的一段registry文件。

{"source":"/var/lib/docker/containers/e892ad615535e877c8af5856bd27631937d050d00b4ca55554bec41e3391685f/e892ad615535e877c8af5856bd27631937d050d00b4ca55554bec41e3391685f-json.log","offset":0,"timestamp":"2017-11-29T17:01:28.645203497Z","ttl":-1,"type":"log","FileStateOS":{"inode":526963,"device":64769}}

这个json文件里面保存了容器和对应的offset,这样当filebeat重启过后则能继续工作。

然后创建crawler,这个是负责日志采集的。

crawler, err := crawler.New(channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,config.Prospectors,b.Info.Version,fb.done,*once)

通过config.Prospectors,crawler就知道采集哪些目标。然后就通过

err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory)if err != nil {crawler.Stop()return err}

启动采集任务。下面看看具体启动任务地方,filebeat/crawler/crawler.go

for _, prospectorConfig := range c.prospectorConfigs {err := c.startProspector(prospectorConfig, r.GetStates())if err != nil {return err}}

就来到filebeat/prospector/prospector.go里面

func (p *Prospector) Run() {// Initial prospector runp.prospectorer.Run()// Shuts down after the first complete run of all prospectorsif p.Once {return}for {select {case <-p.done:logp.Info("Prospector ticker stopped")returncase <-time.After(p.config.ScanFrequency):logp.Debug("prospector", "Run prospector")p.prospectorer.Run()}}
}

这个prospectorer.Run是一个接口,可以支持从UDP/STDIN/LOG/REDIS/DOCKER里面直接获取日志,我们看一个log的filebeat/prospector/log/prospector.go

func (p *Prospector) Run() {
...
p.scan()
...
}

这里如果发现文件需要被采集,则创建采集任务

if lastState.IsEmpty() {logp.Debug("prospector", "Start harvester for new file: %s", newState.Source)err := p.startHarvester(newState, 0)if err != nil {logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err)}} else {p.harvestExistingFile(newState, lastState)
}

startHarvester启动日志采集,还是相同的套路,先创建createHarvester,然后启动harvesters.Start(h)。
这个里面通过for死循环里面执行

message, err := h.reader.Next()

这样分批读取。启动程序的内容先说到这里。还有很多细节后面逐一描述。

这篇关于filebeat源码分析服务启动的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/977149

相关文章

SpringBoot基于配置实现短信服务策略的动态切换

《SpringBoot基于配置实现短信服务策略的动态切换》这篇文章主要为大家详细介绍了SpringBoot在接入多个短信服务商(如阿里云、腾讯云、华为云)后,如何根据配置或环境切换使用不同的服务商,需... 目录目标功能示例配置(application.yml)配置类绑定短信发送策略接口示例:阿里云 & 腾

springboot项目如何开启https服务

《springboot项目如何开启https服务》:本文主要介绍springboot项目如何开启https服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录springboot项目开启https服务1. 生成SSL证书密钥库使用keytool生成自签名证书将

Java程序进程起来了但是不打印日志的原因分析

《Java程序进程起来了但是不打印日志的原因分析》:本文主要介绍Java程序进程起来了但是不打印日志的原因分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java程序进程起来了但是不打印日志的原因1、日志配置问题2、日志文件权限问题3、日志文件路径问题4、程序

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

使用Node.js制作图片上传服务的详细教程

《使用Node.js制作图片上传服务的详细教程》在现代Web应用开发中,图片上传是一项常见且重要的功能,借助Node.js强大的生态系统,我们可以轻松搭建高效的图片上传服务,本文将深入探讨如何使用No... 目录准备工作搭建 Express 服务器配置 multer 进行图片上传处理图片上传请求完整代码示例

Spring LDAP目录服务的使用示例

《SpringLDAP目录服务的使用示例》本文主要介绍了SpringLDAP目录服务的使用示例... 目录引言一、Spring LDAP基础二、LdapTemplate详解三、LDAP对象映射四、基本LDAP操作4.1 查询操作4.2 添加操作4.3 修改操作4.4 删除操作五、认证与授权六、高级特性与最佳

Python 迭代器和生成器概念及场景分析

《Python迭代器和生成器概念及场景分析》yield是Python中实现惰性计算和协程的核心工具,结合send()、throw()、close()等方法,能够构建高效、灵活的数据流和控制流模型,这... 目录迭代器的介绍自定义迭代器省略的迭代器生产器的介绍yield的普通用法yield的高级用法yidle

Redis在windows环境下如何启动

《Redis在windows环境下如何启动》:本文主要介绍Redis在windows环境下如何启动的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Redis在Windows环境下启动1.在redis的安装目录下2.输入·redis-server.exe

解决SpringBoot启动报错:Failed to load property source from location 'classpath:/application.yml'

《解决SpringBoot启动报错:Failedtoloadpropertysourcefromlocationclasspath:/application.yml问题》这篇文章主要介绍... 目录在启动SpringBoot项目时报如下错误原因可能是1.yml中语法错误2.yml文件格式是GBK总结在启动S