Zabbix监控之从Kafka中获取消费进度和lag

2024-06-22 19:32

本文主要是介绍Zabbix监控之从Kafka中获取消费进度和lag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在0.9及之后的版本,kafka自身提供了存放消费进度的功能。本文讲解的是如何从kafka自身获取消费进度。从zookeeper中获取消费进度请阅读我的另一片文章传送门

https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
这是官网上的教程,提供了scala版本的获取消费状态和提交消费状态的代码。仅供参考。

http://pykafka.readthedocs.io/en/latest/api/broker.html
这是pykafka官网提供获取消费状态的API,试过不知道怎么用,网上也找不到相关代码。

http://kafka-python.readthedocs.io/en/latest/usage.html
这是python-kafka官网,找不到想要的API,没试过。

获取消费进度之前,一定要先弄明白kafka的存储结构以及消费进度是存放在zookeeper中还是kafka中,否则可能会发现到头来,自己都不知道自己在干什么。以上几种方式我都试过,但是都没成功,最后选择命令行的方式获取到消费状态,将消费状态写入文件中,再解析文件。
Kafka管理工具
https://www.iteblog.com/archives/1605.html
http://orchome.com/454

使用指令可以获取该组下每个consumer的消费进度

/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.12.11.131:9092 --group kafkaTestGroup --describe

然后再将其中的数据取出来,echo到文件中,可以使用crontab来执行指令,定时更新文件。

/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.1.8.74:9092 --group datasync.server.10.1.2.118 --describe |grep datasync.server.10.1.2.118 | awk '{print $1,$2,$3,$4,$5,$6,$7}'

将消费状态存放在kafka.log文件中,再解析文件,我这里监控阀值设置为1000,将lag值大于1000的数据取出来并输出。下面是解析文件的python脚本。

#!/usr/bin/env python
#coding=utf-8import os.path
import time
import pdb
from fileStatus import Fileif __name__=="__main__":filePath='/data/python-scripts/inspector/AccountInspector/otherInspector/kafka.log'f=open(filePath,"r")columnNameList=['GROUP','TOPIC','PARTITION','CURRENT-OFFSET','LOG-END-OFFSET','LAG','OWNER']result='no topicPartition lag is over allowedRange'resultDic={}overAllowedLagDic={}for line in f:#使用命令行处理时有时会得到Consumer group is not exists 或者Consumer group is rebalancing等不正常的结果,这种数据忽略不处理if 'Consumer group' not in line: line=line.strip('\n')lineSplit=line.split(' ')dicKey=lineSplit[0]+'_'+lineSplit[1]+'_'+lineSplit[2]dicValue={}for i in range(0,len(lineSplit),1):dicValue[columnNameList[i]]=lineSplit[i]#由于我设置的阀值时lag值为1000时就告警,此处LOG-END-OFFSET就是logsize,当logsize小于1000时可以忽略(因为lag总是小于logsize的)if dicValue['LOG-END-OFFSET']<='1000':dicValue['CURRENT-OFFSET']='0'dicValue['LAG']='0'resultDic[dicKey]=dicValue#使用命令行有缺陷,经常会出现取出来的值为unknown的情况,出现这种情况也当作告警处理if dicValue['LAG'] == 'unknown':overAllowedLagDic[dicKey]=dicValueelse:if int(dicValue['LAG'])>1000:overAllowedLagDic[dicKey]=dicValueif len(overAllowedLagDic)>0:result=''for key in overAllowedLagDic:dicValue=overAllowedLagDic[key]lag=dicValue['LAG']result=key+':'+lag+'; '+resultprint result

方式很low,而且还有漏洞,后面有时间研究下使用API的方式获取消费进度。

这篇关于Zabbix监控之从Kafka中获取消费进度和lag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085208

相关文章

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

springboot2.1.3 hystrix集成及hystrix-dashboard监控详解

《springboot2.1.3hystrix集成及hystrix-dashboard监控详解》Hystrix是Netflix开源的微服务容错工具,通过线程池隔离和熔断机制防止服务崩溃,支持降级、监... 目录Hystrix是Netflix开源技术www.chinasem.cn栈中的又一员猛将Hystrix熔

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

SpringBoot监控API请求耗时的6中解决解决方案

《SpringBoot监控API请求耗时的6中解决解决方案》本文介绍SpringBoot中记录API请求耗时的6种方案,包括手动埋点、AOP切面、拦截器、Filter、事件监听、Micrometer+... 目录1. 简介2.实战案例2.1 手动记录2.2 自定义AOP记录2.3 拦截器技术2.4 使用Fi

Python获取浏览器Cookies的四种方式小结

《Python获取浏览器Cookies的四种方式小结》在进行Web应用程序测试和开发时,获取浏览器Cookies是一项重要任务,本文我们介绍四种用Python获取浏览器Cookies的方式,具有一定的... 目录什么是 Cookie?1.使用Selenium库获取浏览器Cookies2.使用浏览器开发者工具

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Java获取当前时间String类型和Date类型方式

《Java获取当前时间String类型和Date类型方式》:本文主要介绍Java获取当前时间String类型和Date类型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录Java获取当前时间String和Date类型String类型和Date类型输出结果总结Java获取

Spring Boot Actuator应用监控与管理的详细步骤

《SpringBootActuator应用监控与管理的详细步骤》SpringBootActuator是SpringBoot的监控工具,提供健康检查、性能指标、日志管理等核心功能,支持自定义和扩展端... 目录一、 Spring Boot Actuator 概述二、 集成 Spring Boot Actuat