本文主要是介绍Zabbix监控之从zookeeper中获取Kafka消费进度和lag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Kafka在0.9之前,消费进度是存放在zookeeper中,在0.9及之后的版本,kafka自身提供了存放消费进度的功能。从kafka中获取消费进度请查看我另一片文章 传送门
这篇文章是转载自http://club.oneapm.com/t/zabbix-kafka/854
原文代码在调试时跑不通,上pykafka官网上看了下,貌似是有些方法过时了,没法使用,下面是加了备注和稍作修改后的代码。
在执行脚本前,需要修改host文件,将kafka服务器名和ip做下映射,不然会出现连接不上的情况。我的kafka服务器名叫做kafka
vi /etc/hosts 添加映射 10.12.11.131 kafka
#!/usr/bin/env python
#coding=utf-8import os, sys, time, json, yaml ,pdb
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kafka import KafkaClient
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayloadclass spoorerClient(object):def __init__(self, zookeeper_hosts, kafka_hosts, zookeeper_url='/', timeout=3, log_dir='/tmp/spoorer'):self.zookeeper_hosts = zookeeper_hostsself.kafka_hosts = kafka_hostsself.timeout = timeoutself.log_dir = log_dirself.log_file = log_dir + '/' + 'spoorer.log'self.kafka_logsize = {}self.result = []self.log_day_file = log_dir + '/' + 'spoorer_day.log.' + str(time.strftime("%Y-%m-%d", time.localtime()))self.log_keep_day = 1#将spoorer.yaml中的配置读取出来try:f = file(os.path.dirname(os.path.abspath(__file__)) + '/' + 'spoorer.yaml')self.white_topic_group = yaml.load(f)except IOError as e:print 'Error, spoorer.yaml is not found'sys.exit(1)else:f.close()if self.white_topic_group is None:self.white_topic_group = {}if not os.path.exists(self.log_dir): os.mkdir(self.log_dir)#获取到的消费进度会写入到self.log_file,self.log_day_file这两个文件中,self.log_day_file用于存放历史消费进度,self.log_file存放当前最新获取到的消费进度#self.log_day_file该文件的创建时间和当前系统时间相隔一天,则删除for logfile in [x for x in os.listdir(self.log_dir) if x.split('.')[-1] != 'log' and x.split('.')[-1] != 'swp']:if int(time.mktime(time.strptime(logfile.split('.')[-1], '%Y-%m-%d'))) < int(time.time()) - self.log_keep_day * 86400:os.remove(self.log_dir + '/' + logfile)if zookeeper_url == '/':self.zookeeper_url = zookeeper_urlelse:self.zookeeper_url = zookeeper_url + '/'def spoorer(self):#连接kafka,获取topicstry:kafka_client = KafkaClient(self.kafka_hosts, timeout=self.timeout)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:kafka_topics = kafka_client.topicsfinally:kafka_client.close()#连接zk,获取当前消费进度current offsettry:zookeeper_client = KazooClient(hosts=self.zookeeper_hosts, read_only=True, timeout=self.timeout)zookeeper_client.start()except Exception as e:print "Error, cannot connect zookeeper server."sys.exit(1)try:groups = map(str,zookeeper_client.get_children(self.zookeeper_url + 'consumers'))except NoNodeError as e:print "Error, invalid zookeeper url."zookeeper_client.stop()sys.exit(2)else:for group in groups:if 'offsets' not in zookeeper_client.get_children(self.zookeeper_url + 'consumers/%s' % group): continuetopic_path = 'consumers/%s/offsets' % (group)topics = map(str,zookeeper_client.get_children(self.zookeeper_url + topic_path))if len(topics) == 0: continuefor topic in topics:if topic not in self.white_topic_group.keys():continue elif group not in self.white_topic_group[topic].replace(' ','').split(','):continuepartition_path = 'consumers/%s/offsets/%s' % (group,topic)partitions = map(int,zookeeper_client.get_children(self.zookeeper_url + partition_path))for partition in partitions:base_path = 'consumers/%s/%s/%s/%s' % (group, '%s', topic, partition)owner_path, offset_path = base_path % 'owners', base_path % 'offsets'offset = zookeeper_client.get(self.zookeeper_url + offset_path)[0]try:owner = zookeeper_client.get(self.zookeeper_url + owner_path)[0]except NoNodeError as e:owner = 'null'#消费进度放在字典metric中metric = {'datetime':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'topic':topic, 'group':group, 'partition':int(partition), 'logsize':None, 'offset':int(offset), 'lag':None, 'owner':owner}self.result.append(metric)finally:zookeeper_client.stop()#获取每个分片的logsize(此处和原文不一样,做了修改)try:client = SimpleClient(self.kafka_hosts)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:for kafka_topic in kafka_topics:self.kafka_logsize[kafka_topic] = {}partitions = client.topic_partitions[kafka_topic]offset_requests = [OffsetRequestPayload(kafka_topic, p, -1, 1) for p in partitions.keys()]offsets_responses = client.send_offset_request(offset_requests)for r in offsets_responses:self.kafka_logsize[kafka_topic][r.partition] = r.offsets[0]#logsize减去current offset等于lagwith open(self.log_file,'w') as f1, open(self.log_day_file,'a') as f2:for metric in self.result:logsize = self.kafka_logsize[metric['topic']][metric['partition']]metric['logsize'] = int(logsize)metric['lag'] = int(logsize) - int(metric['offset'])f1.write(json.dumps(metric,sort_keys=True) + '\n')f1.flush()f2.write(json.dumps(metric,sort_keys=True) + '\n')f2.flush()finally:client.close()return ''if __name__ == '__main__':check = spoorerClient(zookeeper_hosts='10.12.11.131:2181', zookeeper_url='/', kafka_hosts='10.12.11.131:9092', log_dir='/data/python-scripts/inspector/AccountInspector/otherInspector/spoorer', timeout=3)print check.spoorer()
这篇关于Zabbix监控之从zookeeper中获取Kafka消费进度和lag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!