本文主要是介绍Zabbix监控之从Kafka中获取消费进度和lag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在0.9及之后的版本,kafka自身提供了存放消费进度的功能。本文讲解的是如何从kafka自身获取消费进度。从zookeeper中获取消费进度请阅读我的另一片文章传送门
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
这是官网上的教程,提供了scala版本的获取消费状态和提交消费状态的代码。仅供参考。
http://pykafka.readthedocs.io/en/latest/api/broker.html
这是pykafka官网提供获取消费状态的API,试过不知道怎么用,网上也找不到相关代码。
http://kafka-python.readthedocs.io/en/latest/usage.html
这是python-kafka官网,找不到想要的API,没试过。
获取消费进度之前,一定要先弄明白kafka的存储结构以及消费进度是存放在zookeeper中还是kafka中,否则可能会发现到头来,自己都不知道自己在干什么。以上几种方式我都试过,但是都没成功,最后选择命令行的方式获取到消费状态,将消费状态写入文件中,再解析文件。
Kafka管理工具
https://www.iteblog.com/archives/1605.html
http://orchome.com/454
使用指令可以获取该组下每个consumer的消费进度
/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.12.11.131:9092 --group kafkaTestGroup --describe
然后再将其中的数据取出来,echo到文件中,可以使用crontab来执行指令,定时更新文件。
/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.1.8.74:9092 --group datasync.server.10.1.2.118 --describe |grep datasync.server.10.1.2.118 | awk '{print $1,$2,$3,$4,$5,$6,$7}'
将消费状态存放在kafka.log文件中,再解析文件,我这里监控阀值设置为1000,将lag值大于1000的数据取出来并输出。下面是解析文件的python脚本。
#!/usr/bin/env python
#coding=utf-8import os.path
import time
import pdb
from fileStatus import Fileif __name__=="__main__":filePath='/data/python-scripts/inspector/AccountInspector/otherInspector/kafka.log'f=open(filePath,"r")columnNameList=['GROUP','TOPIC','PARTITION','CURRENT-OFFSET','LOG-END-OFFSET','LAG','OWNER']result='no topicPartition lag is over allowedRange'resultDic={}overAllowedLagDic={}for line in f:#使用命令行处理时有时会得到Consumer group is not exists 或者Consumer group is rebalancing等不正常的结果,这种数据忽略不处理if 'Consumer group' not in line: line=line.strip('\n')lineSplit=line.split(' ')dicKey=lineSplit[0]+'_'+lineSplit[1]+'_'+lineSplit[2]dicValue={}for i in range(0,len(lineSplit),1):dicValue[columnNameList[i]]=lineSplit[i]#由于我设置的阀值时lag值为1000时就告警,此处LOG-END-OFFSET就是logsize,当logsize小于1000时可以忽略(因为lag总是小于logsize的)if dicValue['LOG-END-OFFSET']<='1000':dicValue['CURRENT-OFFSET']='0'dicValue['LAG']='0'resultDic[dicKey]=dicValue#使用命令行有缺陷,经常会出现取出来的值为unknown的情况,出现这种情况也当作告警处理if dicValue['LAG'] == 'unknown':overAllowedLagDic[dicKey]=dicValueelse:if int(dicValue['LAG'])>1000:overAllowedLagDic[dicKey]=dicValueif len(overAllowedLagDic)>0:result=''for key in overAllowedLagDic:dicValue=overAllowedLagDic[key]lag=dicValue['LAG']result=key+':'+lag+'; '+resultprint result
方式很low,而且还有漏洞,后面有时间研究下使用API的方式获取消费进度。
这篇关于Zabbix监控之从Kafka中获取消费进度和lag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!