Zabbix监控之从zookeeper中获取Kafka消费进度和lag

2024-06-22 19:32

本文主要是介绍Zabbix监控之从zookeeper中获取Kafka消费进度和lag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka在0.9之前,消费进度是存放在zookeeper中,在0.9及之后的版本,kafka自身提供了存放消费进度的功能。从kafka中获取消费进度请查看我另一片文章 传送门

这篇文章是转载自http://club.oneapm.com/t/zabbix-kafka/854
原文代码在调试时跑不通,上pykafka官网上看了下,貌似是有些方法过时了,没法使用,下面是加了备注和稍作修改后的代码。

在执行脚本前,需要修改host文件,将kafka服务器名和ip做下映射,不然会出现连接不上的情况。我的kafka服务器名叫做kafka
vi /etc/hosts 添加映射 10.12.11.131 kafka

#!/usr/bin/env python
#coding=utf-8import os, sys, time, json, yaml ,pdb
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kafka import KafkaClient
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayloadclass spoorerClient(object):def __init__(self, zookeeper_hosts, kafka_hosts, zookeeper_url='/', timeout=3, log_dir='/tmp/spoorer'):self.zookeeper_hosts = zookeeper_hostsself.kafka_hosts = kafka_hostsself.timeout = timeoutself.log_dir = log_dirself.log_file = log_dir + '/' + 'spoorer.log'self.kafka_logsize = {}self.result = []self.log_day_file = log_dir + '/' + 'spoorer_day.log.' + str(time.strftime("%Y-%m-%d", time.localtime()))self.log_keep_day = 1#将spoorer.yaml中的配置读取出来try:f = file(os.path.dirname(os.path.abspath(__file__)) + '/' + 'spoorer.yaml')self.white_topic_group = yaml.load(f)except IOError as e:print 'Error, spoorer.yaml is not found'sys.exit(1)else:f.close()if self.white_topic_group is None:self.white_topic_group = {}if not os.path.exists(self.log_dir):     os.mkdir(self.log_dir)#获取到的消费进度会写入到self.log_file,self.log_day_file这两个文件中,self.log_day_file用于存放历史消费进度,self.log_file存放当前最新获取到的消费进度#self.log_day_file该文件的创建时间和当前系统时间相隔一天,则删除for logfile in [x for x in os.listdir(self.log_dir) if x.split('.')[-1] != 'log' and x.split('.')[-1] != 'swp']:if int(time.mktime(time.strptime(logfile.split('.')[-1], '%Y-%m-%d'))) < int(time.time()) - self.log_keep_day * 86400:os.remove(self.log_dir + '/' + logfile)if zookeeper_url == '/':self.zookeeper_url = zookeeper_urlelse:self.zookeeper_url = zookeeper_url + '/'def spoorer(self):#连接kafka,获取topicstry:kafka_client = KafkaClient(self.kafka_hosts, timeout=self.timeout)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:kafka_topics = kafka_client.topicsfinally:kafka_client.close()#连接zk,获取当前消费进度current offsettry:zookeeper_client = KazooClient(hosts=self.zookeeper_hosts, read_only=True, timeout=self.timeout)zookeeper_client.start()except Exception as e:print "Error, cannot connect zookeeper server."sys.exit(1)try:groups = map(str,zookeeper_client.get_children(self.zookeeper_url + 'consumers'))except NoNodeError as e:print "Error, invalid zookeeper url."zookeeper_client.stop()sys.exit(2)else:for group in groups:if 'offsets' not in zookeeper_client.get_children(self.zookeeper_url + 'consumers/%s' % group): continuetopic_path = 'consumers/%s/offsets' % (group)topics = map(str,zookeeper_client.get_children(self.zookeeper_url + topic_path))if len(topics) == 0: continuefor topic in topics:if topic not in self.white_topic_group.keys():continue elif group not in self.white_topic_group[topic].replace(' ','').split(','):continuepartition_path = 'consumers/%s/offsets/%s' % (group,topic)partitions = map(int,zookeeper_client.get_children(self.zookeeper_url + partition_path))for partition in partitions:base_path = 'consumers/%s/%s/%s/%s' % (group, '%s', topic, partition)owner_path, offset_path = base_path % 'owners', base_path % 'offsets'offset = zookeeper_client.get(self.zookeeper_url + offset_path)[0]try:owner = zookeeper_client.get(self.zookeeper_url + owner_path)[0]except NoNodeError as e:owner = 'null'#消费进度放在字典metric中metric = {'datetime':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'topic':topic, 'group':group, 'partition':int(partition), 'logsize':None, 'offset':int(offset), 'lag':None, 'owner':owner}self.result.append(metric)finally:zookeeper_client.stop()#获取每个分片的logsize(此处和原文不一样,做了修改)try:client = SimpleClient(self.kafka_hosts)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:for kafka_topic in kafka_topics:self.kafka_logsize[kafka_topic] = {}partitions = client.topic_partitions[kafka_topic]offset_requests = [OffsetRequestPayload(kafka_topic, p, -1, 1) for p in partitions.keys()]offsets_responses = client.send_offset_request(offset_requests)for r in offsets_responses:self.kafka_logsize[kafka_topic][r.partition] = r.offsets[0]#logsize减去current offset等于lagwith open(self.log_file,'w') as f1, open(self.log_day_file,'a') as f2:for metric in self.result:logsize = self.kafka_logsize[metric['topic']][metric['partition']]metric['logsize'] = int(logsize)metric['lag'] = int(logsize) - int(metric['offset'])f1.write(json.dumps(metric,sort_keys=True) + '\n')f1.flush()f2.write(json.dumps(metric,sort_keys=True) + '\n')f2.flush()finally:client.close()return ''if __name__ == '__main__':check = spoorerClient(zookeeper_hosts='10.12.11.131:2181', zookeeper_url='/', kafka_hosts='10.12.11.131:9092', log_dir='/data/python-scripts/inspector/AccountInspector/otherInspector/spoorer', timeout=3)print check.spoorer()

这篇关于Zabbix监控之从zookeeper中获取Kafka消费进度和lag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085207

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

如何利用Java获取当天的开始和结束时间

《如何利用Java获取当天的开始和结束时间》:本文主要介绍如何使用Java8的LocalDate和LocalDateTime类获取指定日期的开始和结束时间,展示了如何通过这些类进行日期和时间的处... 目录前言1. Java日期时间API概述2. 获取当天的开始和结束时间代码解析运行结果3. 总结前言在J

java获取图片的大小、宽度、高度方式

《java获取图片的大小、宽度、高度方式》文章介绍了如何将File对象转换为MultipartFile对象的过程,并分享了个人经验,希望能为读者提供参考... 目China编程录Java获取图片的大小、宽度、高度File对象(该对象里面是图片)MultipartFile对象(该对象里面是图片)总结java获取图片

Java通过反射获取方法参数名的方式小结

《Java通过反射获取方法参数名的方式小结》这篇文章主要为大家详细介绍了Java如何通过反射获取方法参数名的方式,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1、前言2、解决方式方式2.1: 添加编译参数配置 -parameters方式2.2: 使用Spring的内部工具类 -

Java如何获取视频文件的视频时长

《Java如何获取视频文件的视频时长》文章介绍了如何使用Java获取视频文件的视频时长,包括导入maven依赖和代码案例,同时,也讨论了在运行过程中遇到的SLF4J加载问题,并给出了解决方案... 目录Java获取视频文件的视频时长1、导入maven依赖2、代码案例3、SLF4J: Failed to lo

使用Java实现获取客户端IP地址

《使用Java实现获取客户端IP地址》这篇文章主要为大家详细介绍了如何使用Java实现获取客户端IP地址,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 首先是获取 IP,直接上代码import org.springframework.web.context.request.Requ

通过prometheus监控Tomcat运行状态的操作流程

《通过prometheus监控Tomcat运行状态的操作流程》文章介绍了如何安装和配置Tomcat,并使用Prometheus和TomcatExporter来监控Tomcat的运行状态,文章详细讲解了... 目录Tomcat安装配置以及prometheus监控Tomcat一. 安装并配置tomcat1、安装

C++实现获取本机MAC地址与IP地址

《C++实现获取本机MAC地址与IP地址》这篇文章主要为大家详细介绍了C++实现获取本机MAC地址与IP地址的两种方式,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 实际工作中,项目上常常需要获取本机的IP地址和MAC地址,在此使用两种方案获取1.MFC中获取IP和MAC地址获取

C/C++通过IP获取局域网网卡MAC地址

《C/C++通过IP获取局域网网卡MAC地址》这篇文章主要为大家详细介绍了C++如何通过Win32API函数SendARP从IP地址获取局域网内网卡的MAC地址,感兴趣的小伙伴可以跟随小编一起学习一下... C/C++通过IP获取局域网网卡MAC地址通过win32 SendARP获取MAC地址代码#i

5分钟获取deepseek api并搭建简易问答应用

《5分钟获取deepseekapi并搭建简易问答应用》本文主要介绍了5分钟获取deepseekapi并搭建简易问答应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1、获取api2、获取base_url和chat_model3、配置模型参数方法一:终端中临时将加