go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化

本文主要是介绍go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

在我们实际生产中,我们常常因为新的项目或者新的功能进而要对配置文件进行修改,但是在生产环境下我们不是每次配置文件发生变化都重启一次系统,这无疑是不切实际的,所以我们需要对配置文件进行实时监控,而今天我们所要展示的也就是如何基于etcd来监控配置文件的变化。

etcd对配置项监控的流程

需求分析

首先我们来看我们日志收集服务的主要工作流程:

func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}

在上述主要工作逻辑的基础上,现在我们需要etcd来实现对配置文件的实时监控,而这就需要我们在后态去运行一个监控程序来实时监控查看需要见监控的配置文件是否变化。并且将变化发送到tailFile模块中

实现Watch监控

所以这里我们对main.go进行一点简单的修改,添加一个后台程序 go etcd.WatchConf(ConfigObj.Etcdaddress.Key):

package mainimport ("fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run() {select {}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)go etcd.WatchConf(ConfigObj.Etcdaddress.Key)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}

我们在来看这个函数的具体逻辑:

func WatchConf(key string) {rch := client.Watch(context.Background(), key)var newConf []common.CollectEntryfor wresp := range rch {logrus.Infof("get new conf fromn etcd")for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)err := json.Unmarshal(ev.Kv.Value, &newConf)if err != nil {logrus.Error("json unmarshal failed,err:%v", err)continue}tailFile.SendNewConf(newConf)}}
}

与之前有关etcd的文章中的操作例子不同,这里我们并没有定义上下文,主要是因为这里我们不确定什么时候终止这个程序,所以不使用上下文了。

发送新配置到tailFile中

在上面我们已经完成etcd的监控,现在我们需要把新的配置消息发送到tailFile,这里我们第一反应是写一个死循环一直独缺,但是这样其实不大方便,毕竟储蓄一直运行会占掉大量不必要消耗的资源,这里我们可以让双方使用管道来进行通信,平时管道处于阻塞状态,只有监测到新配置才会进行通信,这样会使资源得到最大化的利用,我们来看一看具体的代码实现:

  • 首先我们来定义一下用于通信的管道
var (confchan chan []common.CollectEntry
)
  • 然后我们要对管道进行初始化,并且读取管道中新的配置信息:
confchan = make(chan []common.CollectEntry)newConf := <-confchanlogrus.Infof("get newconf from etcd", newConf)

最后,由于我们这里管道只用于etcd模块与tailFile模块之间的通信,所以这里我们就不暴露管道,而是选择暴露函数:

func SendNewConf(newConf []common.CollectEntry) {confchan <- newConf
}

结语

最后附上上述变化模块的代码:

  • main.go
package mainimport ("fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run() {select {}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)go etcd.WatchConf(ConfigObj.Etcdaddress.Key)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}
  • etcd.go
package etcdimport ("encoding/json""fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"clientv3 "go.etcd.io/etcd/client/v3""golang.org/x/net/context""log-agent/common""log-agent/tailFile""time"
)var client *clientv3.Clientfunc Init(address []string) (err error) {client, err = clientv3.New(clientv3.Config{Endpoints:   address,DialTimeout: 5 * time.Second,})if err != nil {logrus.Error("etcd client connect failed,err:%v", err)return}return
}func GetConf(key string) (err error, collectEntryList []common.CollectEntry) {ctx, cancel := context.WithTimeout(context.Background(), time.Second)response, err := client.Get(ctx, key)cancel()if err != nil {logrus.Error("get conf from etcd failed,err:%v", err)return}if len(response.Kvs) == 0 {logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)return}fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryListif err != nil {logrus.Error("json unmarshal failed,err:%v", err)return}return
}func WatchConf(key string) {rch := client.Watch(context.Background(), key)var newConf []common.CollectEntryfor wresp := range rch {logrus.Infof("get new conf fromn etcd")for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)err := json.Unmarshal(ev.Kv.Value, &newConf)if err != nil {logrus.Error("json unmarshal failed,err:%v", err)continue}tailFile.SendNewConf(newConf)}}
}
  • tailFile.go
package tailFileimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail""log-agent/Kafka""log-agent/common""strings""time"
)type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}var (confchan chan []common.CollectEntry
)func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}//初始化新配置的管道confchan = make(chan []common.CollectEntry)newConf := <-confchanlogrus.Infof("get newconf from etcd", newConf)return
}func (t *tailTask) run() {for {line, ok := <-t.TailObj.Linesif !ok {logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)time.Sleep(2 * time.Second)continue}if len(strings.Trim(line.Text, "\r")) == 0 {continue}msg := &sarama.ProducerMessage{}msg.Topic = t.topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MesChan(msg)}
}func SendNewConf(newConf []common.CollectEntry) {confchan <- newConf
}

这篇关于go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967123

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

不懂推荐算法也能设计推荐系统

本文以商业化应用推荐为例,告诉我们不懂推荐算法的产品,也能从产品侧出发, 设计出一款不错的推荐系统。 相信很多新手产品,看到算法二字,多是懵圈的。 什么排序算法、最短路径等都是相对传统的算法(注:传统是指科班出身的产品都会接触过)。但对于推荐算法,多数产品对着网上搜到的资源,都会无从下手。特别当某些推荐算法 和 “AI”扯上关系后,更是加大了理解的难度。 但,不了解推荐算法,就无法做推荐系

基于人工智能的图像分类系统

目录 引言项目背景环境准备 硬件要求软件安装与配置系统设计 系统架构关键技术代码示例 数据预处理模型训练模型预测应用场景结论 1. 引言 图像分类是计算机视觉中的一个重要任务,目标是自动识别图像中的对象类别。通过卷积神经网络(CNN)等深度学习技术,我们可以构建高效的图像分类系统,广泛应用于自动驾驶、医疗影像诊断、监控分析等领域。本文将介绍如何构建一个基于人工智能的图像分类系统,包括环境

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl