使用Python基于metricbeat和heartbeat采集数据进行告警

2024-01-20 03:52

本文主要是介绍使用Python基于metricbeat和heartbeat采集数据进行告警,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、系统架构

IP主机名角色备注
11.0.1.11kafka1kafka和MySQL
11.0.1.12kafka2kafka
11.0.1.13kafka3kafka
11.0.1.14demo1metricbeat和heartbeat

二、部署Kafka
省略

二、部署Metricbeat和Heartbeat
metricbeat配置:

metricbeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: falsefields:ip: 11.0.1.14output.kafka:hosts: ["11.0.1.11:9092","11.0.1.12:9092","11.0.1.13:9092"]topic: "ELK-metricbeat"

heartbeat配置:

heartbeat.config.monitors:path: ${path.config}/monitors.d/*.ymlreload.enabled: falsereload.period: 5s# ----------------------------  Kafka Output ----------------------------
output.kafka:hosts: ["11.0.1.11:9092","11.0.1.12:9092","11.0.1.13:9092"]topic: "ELK-heartbeat"

heartbeat的tcp.yml配置:

- type: tcp id: my-tcp-monitorname: My TCP monitorenabled: trueschedule: '@every 20s' hosts: ["11.0.1.14:80","11.0.1.13:80","11.0.1.12:80"]ipv4: trueipv6: truemode: all

三、MariaDB表结构
cmdb_app表(存储应用系统的信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for cmdb_app
-- ----------------------------
DROP TABLE IF EXISTS `cmdb_app`;
CREATE TABLE `cmdb_app`  (`id` int(11) NOT NULL AUTO_INCREMENT,`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_dep` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;SET FOREIGN_KEY_CHECKS = 1;

解释:
app_name:系统名称
ops_user:运维人员姓名
ops_tel:运维人员手机号
ops_dep:运维责任部门

cmdb_os表(存储服务器信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for cmdb_os
-- ----------------------------
DROP TABLE IF EXISTS `cmdb_os`;
CREATE TABLE `cmdb_os`  (`id` int(11) NOT NULL AUTO_INCREMENT,`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`eip` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`module` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;SET FOREIGN_KEY_CHECKS = 1;

解释:
app_name:系统信息
eip:服务器IP
module:服务器用途

alert_list表(存储告警信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for alert_list
-- ----------------------------
DROP TABLE IF EXISTS `alert_list`;
CREATE TABLE `alert_list`  (`id` int(11) NOT NULL AUTO_INCREMENT,`timestamp` datetime NULL DEFAULT NULL,`url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`status` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`ops_dep` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`module` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;SET FOREIGN_KEY_CHECKS = 1;

四、使用Python程序,从Kafka读取数据,并将cmdb_os和cmdb_app信息根据kafka数据中的ip信息匹配起来,并将新的数据写入到新的Kafka

安装依赖:

pip install kafka-python pymysql apscheduler pyyaml

先说metricbeat_replace.py:

import json
import logging
from logging.handlers import RotatingFileHandler
import pymysql
from kafka import KafkaConsumer, KafkaProducer
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTriggerclass DatabaseConnectionError(Exception):def __init__(self, message="数据库连接失败"):self.message = messagesuper().__init__(self.message)class KafkaCMDBProcessor:def __init__(self, kafka_config, mysql_config):self.kafka_config = kafka_configself.mysql_config = mysql_configself.logger = self.setup_logger()self.cmdb_data = None# 初始化调度器self.scheduler = BackgroundScheduler()self.scheduler.start()# 添加定时任务,每个整点、10分、20分、30分、40分、50分的时候执行self.scheduler.add_job(self.load_cmdb_data, CronTrigger(minute='0,10,20,30,40,50'))@staticmethoddef setup_logger():logger = logging.getLogger(__name__)logger.setLevel(logging.DEBUG)# 创建控制台处理程序并设置级别为调试ch = logging.StreamHandler()ch.setLevel(logging.DEBUG)# 创建文件处理程序并设置级别为调试,最大文件大小为1 MB,保留备份文件3个fh = RotatingFileHandler('metricbeat_replace.log', maxBytes=1e6, backupCount=3)fh.setLevel(logging.DEBUG)# 创建格式化器formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')# 将格式化器添加到处理程序ch.setFormatter(formatter)fh.setFormatter(formatter)# 将处理程序添加到记录器logger.addHandler(ch)logger.addHandler(fh)return loggerdef start_processing(self):self.connect_to_database()  # 初始化时第一次连接数据库self.load_cmdb_data()  # 初始化时加载数据到内存self.logger.info("开始处理...")consumer = KafkaConsumer(self.kafka_config['input_topic'],group_id=self.kafka_config['consumer_group_id'],bootstrap_servers=self.kafka_config['bootstrap_servers'],auto_offset_reset='earliest')self.logger.info("Kafka 消费者已创建.")producer = KafkaProducer(bootstrap_servers=self.kafka_config['bootstrap_servers'])self.logger.info("Kafka 生产者已创建.")try:for msg in consumer:metricbeat_data = msg.value.decode('utf-8')ip = self.extract_ip(metricbeat_data)cmdb_data = self.get_cmdb_data(ip)self.process_and_send_message(producer, metricbeat_data, cmdb_data)except KeyboardInterrupt:self.logger.info("接收到 KeyboardInterrupt。正在优雅地关闭。")except Exception as e:self.logger.error(f"发生错误:{str(e)}")# 如果在处理过程中发生异常,可以在这里添加适当的处理逻辑finally:consumer.close()producer.close()def connect_to_database(self):try:self.logger.info("正在连接数据库...")db = pymysql.connect(host=self.mysql_config['host'],port=self.mysql_config['port'],user=self.mysql_config['user'],password=self.mysql_config['password'],database=self.mysql_config['db'])self.logger.info("数据库连接成功.")self.db_connection_error_logged = False  # 连接成功后重置连接错误标志except pymysql.Error as e:error_message = f"连接数据库时发生错误:{str(e)}"self.logger.error(error_message.split('\n')[0])raise DatabaseConnectionError(error_message) from efinally:if db:db.close()def load_cmdb_data(self):db = Nonecursor = Nonetry:self.logger.info("开始加载数据.")db = pymysql.connect(host=self.mysql_config['host'],port=self.mysql_config['port'],user=self.mysql_config['user'],password=self.mysql_config['password'],database=self.mysql_config['db'])cursor = db.cursor()# 查询 cmdb_os 表中的数据sql_cmdb_os = "SELECT app_name, eip, module FROM cmdb_os"cursor.execute(sql_cmdb_os)cmdb_os_result = cursor.fetchall()# 查询 cmdb_app 表中的数据sql_cmdb_app = "SELECT app_name, ops_user, ops_tel, ops_dep FROM cmdb_app"cursor.execute(sql_cmdb_app)cmdb_app_result = cursor.fetchall()# 将数据保存到内存中self.cmdb_data = {"cmdb_os": cmdb_os_result,"cmdb_app": cmdb_app_result}self.logger.info("数据加载完成.")except pymysql.Error as e:error_message = f"加载数据时发生数据库错误:{str(e)}"self.logger.error(error_message.split('\n')[0])self.logger.warning("数据库连接异常,继续沿用上次的 CMDB 数据.")finally:if cursor:cursor.close()if db:db.close()@staticmethoddef extract_ip(metricbeat_data):data = json.loads(metricbeat_data)return data.get('fields', {}).get('ip', '')def get_cmdb_data(self, ip):if self.cmdb_data:# 在内存中查找数据cmdb_os_data = [row for row in self.cmdb_data["cmdb_os"] if row[1] == ip]cmdb_app_data = [row for row in self.cmdb_data["cmdb_app"] if row[0] == cmdb_os_data[0][0]]return cmdb_os_data, cmdb_app_dataelse:return Nonedef process_and_send_message(self, producer, original_data, cmdb_data):original_data_str = original_data.decode('utf-8') if isinstance(original_data, bytes) else original_datanew_message = json.loads(original_data_str)if cmdb_data:cmdb_os_data, cmdb_app_data = cmdb_datanew_message["cmdb_data"] = {"app_name": cmdb_os_data[0][0],"eip": cmdb_os_data[0][1],"module": cmdb_os_data[0][2],"ops_user": cmdb_app_data[0][1],"ops_tel": cmdb_app_data[0][2],"ops_dep": cmdb_app_data[0][3]}else:new_message["cmdb_data"] = Noneproducer.send(self.kafka_config['output_topic'], value=json.dumps(new_message, ensure_ascii=False).encode('utf-8'))producer.flush()if __name__ == "__main__":try:with open('application.yml', 'r') as config_file:config_data = yaml.safe_load(config_file)kafka_config_data = config_data.get('kafka', {})mysql_config_data = config_data.get('mysql', {})processor = KafkaCMDBProcessor(kafka_config_data, mysql_config_data)processor.start_processing()except FileNotFoundError:print("错误:找不到配置文件 'application.yml'。")except Exception as e:print(f"发生意外错误:{str(e)}")

application.yml配置如下:

kafka:bootstrap_servers:- '11.0.1.11:9092'- '11.0.1.12:9092'- '11.0.1.13:9092'consumer_group_id: 'metricbeat_replace'input_topic: 'ELK-metricbeat'output_topic: 'ELK-system_metricbeat'mysql:host: '11.0.1.11'port: 13306user: 'root'password: '123456'db: 'zll_python_test'

heartbeat_replace.py如下:

import json
import logging
from logging.handlers import RotatingFileHandler
import pymysql
from kafka import KafkaConsumer, KafkaProducer
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTriggerclass DatabaseConnectionError(Exception):def __init__(self, message="数据库连接失败"):self.message = messagesuper().__init__(self.message)class KafkaCMDBProcessor:def __init__(self, kafka_config, mysql_config):self.kafka_config = kafka_configself.mysql_config = mysql_configself.logger = self.setup_logger()self.cmdb_data = None# 初始化调度器self.scheduler = BackgroundScheduler()self.scheduler.start()# 添加定时任务,每个整点、10分、20分、30分、40分、50分的时候执行self.scheduler.add_job(self.load_cmdb_data, CronTrigger(minute='0,10,20,30,40,50'))@staticmethoddef setup_logger():logger = logging.getLogger(__name__)logger.setLevel(logging.DEBUG)# 创建控制台处理程序并设置级别为调试ch = logging.StreamHandler()ch.setLevel(logging.DEBUG)# 创建文件处理程序并设置级别为调试,最大文件大小为1 MB,保留备份文件3个fh = RotatingFileHandler('heartbeat_replace.log', maxBytes=1e6, backupCount=3)fh.setLevel(logging.DEBUG)# 创建格式化器formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')# 将格式化器添加到处理程序ch.setFormatter(formatter)fh.setFormatter(formatter)# 将处理程序添加到记录器logger.addHandler(ch)logger.addHandler(fh)return loggerdef start_processing(self):self.connect_to_database()  # 初始化时第一次连接数据库self.load_cmdb_data()  # 初始化时加载数据到内存self.logger.info("开始处理...")consumer = KafkaConsumer(self.kafka_config['input_topic'],group_id=self.kafka_config['consumer_group_id'],bootstrap_servers=self.kafka_config['bootstrap_servers'],auto_offset_reset='earliest')self.logger.info("Kafka 消费者已创建.")producer = KafkaProducer(bootstrap_servers=self.kafka_config['bootstrap_servers'])self.logger.info("Kafka 生产者已创建.")try:for msg in consumer:heartbeat_data = msg.value.decode('utf-8')ip = self.extract_url_domain(heartbeat_data)cmdb_data = self.get_cmdb_data(ip)self.process_and_send_message(producer, heartbeat_data, cmdb_data)except KeyboardInterrupt:self.logger.info("接收到 KeyboardInterrupt。正在优雅地关闭。")except Exception as e:self.logger.error(f"发生错误:{str(e)}")# 如果在处理过程中发生异常,可以在这里添加适当的处理逻辑finally:consumer.close()producer.close()def connect_to_database(self):try:self.logger.info("正在连接数据库...")db = pymysql.connect(host=self.mysql_config['host'],port=self.mysql_config['port'],user=self.mysql_config['user'],password=self.mysql_config['password'],database=self.mysql_config['db'])self.logger.info("数据库连接成功.")self.db_connection_error_logged = False  # 连接成功后重置连接错误标志except pymysql.Error as e:error_message = f"连接数据库时发生错误:{str(e)}"self.logger.error(error_message.split('\n')[0])raise DatabaseConnectionError(error_message) from efinally:if db:db.close()def load_cmdb_data(self):db = Nonecursor = Nonetry:self.logger.info("开始加载数据.")db = pymysql.connect(host=self.mysql_config['host'],port=self.mysql_config['port'],user=self.mysql_config['user'],password=self.mysql_config['password'],database=self.mysql_config['db'])cursor = db.cursor()# 查询 cmdb_os 表中的数据sql_cmdb_os = "SELECT app_name, eip, module FROM cmdb_os"cursor.execute(sql_cmdb_os)cmdb_os_result = cursor.fetchall()# 查询 cmdb_app 表中的数据sql_cmdb_app = "SELECT app_name, ops_user, ops_tel, ops_dep FROM cmdb_app"cursor.execute(sql_cmdb_app)cmdb_app_result = cursor.fetchall()# 将数据保存到内存中self.cmdb_data = {"cmdb_os": cmdb_os_result,"cmdb_app": cmdb_app_result}self.logger.info("数据加载完成.")except pymysql.Error as e:error_message = f"加载数据时发生数据库错误:{str(e)}"self.logger.error(error_message.split('\n')[0])self.logger.warning("数据库连接异常,继续沿用上次的 CMDB 数据.")finally:if cursor:cursor.close()if db:db.close()@staticmethoddef extract_url_domain(heartbeat_data):data = json.loads(heartbeat_data)return data.get('url', {}).get('domain', '')def get_cmdb_data(self, ip):if self.cmdb_data:# 在内存中查找数据cmdb_os_data = [row for row in self.cmdb_data["cmdb_os"] if row[1] == ip]cmdb_app_data = [row for row in self.cmdb_data["cmdb_app"] if row[0] == cmdb_os_data[0][0]]return cmdb_os_data, cmdb_app_dataelse:return Nonedef process_and_send_message(self, producer, original_data, cmdb_data):original_data_str = original_data.decode('utf-8') if isinstance(original_data, bytes) else original_datanew_message = json.loads(original_data_str)if cmdb_data:cmdb_os_data, cmdb_app_data = cmdb_datanew_message["cmdb_data"] = {"app_name": cmdb_os_data[0][0],"eip": cmdb_os_data[0][1],"module": cmdb_os_data[0][2],"ops_user": cmdb_app_data[0][1],"ops_tel": cmdb_app_data[0][2],"ops_dep": cmdb_app_data[0][3]}else:new_message["cmdb_data"] = Noneproducer.send(self.kafka_config['output_topic'], value=json.dumps(new_message, ensure_ascii=False).encode('utf-8'))producer.flush()if __name__ == "__main__":try:with open('application.yml', 'r') as config_file:config_data = yaml.safe_load(config_file)kafka_config_data = config_data.get('kafka', {})mysql_config_data = config_data.get('mysql', {})processor = KafkaCMDBProcessor(kafka_config_data, mysql_config_data)processor.start_processing()except FileNotFoundError:print("错误:找不到配置文件 'application.yml'。")except Exception as e:print(f"发生意外错误:{str(e)}")

application.yml配置如下:

kafka:bootstrap_servers:- '11.0.1.11:9092'- '11.0.1.12:9092'- '11.0.1.13:9092'consumer_group_id: 'heartbeat_replace'input_topic: 'ELK-heartbeat'output_topic: 'ELK-system_heartbeat'mysql:host: '11.0.1.11'port: 13306user: 'root'password: '123456'db: 'zll_python_test'

总的来说,metricbeat_heartbeat和heartbeat_replace代码基本一致,只是个别地方heartbeat换成metricbeat,return data.get(‘fields’, {}).get(‘ip’, ‘’)和return data.get(‘url’, {}).get(‘domain’, ‘’)的差别而已

五、heartbeat告警
heartbeat_alarm.py如下:

# heartbeat_alarm.pyimport json
import logging
import mysql.connector
from collections import defaultdict
from datetime import datetime, timedelta
from kafka import KafkaConsumer
import yaml# 配置日志记录器
logging.basicConfig(level=logging.INFO,filename='heartbeat_checker.log',format='%(asctime)s [%(levelname)s] - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)class HeartbeatChecker:def __init__(self, config_path='application.yml'):# 初始化 HeartbeatChecker 对象self.config_path = config_pathself.kafka_bootstrap_servers = Noneself.kafka_group_id = Noneself.kafka_topic = Noneself.mysql_host = Noneself.mysql_port = Noneself.mysql_user = Noneself.mysql_password = Noneself.mysql_database = Noneself.consecutive_down_threshold = Noneself.consecutive_up_threshold = None# 从 YAML 文件加载配置self.load_config()self.kafka_consumer = Nonedef load_config(self):try:# 从 YAML 文件加载配置with open(self.config_path, 'r') as file:config = yaml.safe_load(file)# 提取 Kafka 配置self.kafka_bootstrap_servers = config['kafka']['bootstrap_servers']self.kafka_group_id = config['kafka']['group_id']self.kafka_topic = config['kafka']['topic']# 提取 MySQL 配置self.mysql_host = config['mysql']['host']self.mysql_port = config['mysql']['port']self.mysql_user = config['mysql']['user']self.mysql_password = config['mysql']['password']self.mysql_database = config['mysql']['database']# 提取连续 down 和连续 up 的阈值self.consecutive_down_threshold = config['thresholds']['consecutive_down']self.consecutive_up_threshold = config['thresholds']['consecutive_up']except Exception as e:# 处理配置加载错误logger.error(f"加载配置时发生错误: {e}")raisedef create_kafka_consumer(self):try:# 创建 Kafka Consumer 实例self.kafka_consumer = KafkaConsumer(self.kafka_topic,bootstrap_servers=self.kafka_bootstrap_servers,group_id=self.kafka_group_id,auto_offset_reset='latest',enable_auto_commit=True,value_deserializer=lambda x: json.loads(x.decode('utf-8')))except Exception as e:# 处理创建 Kafka Consumer 错误logger.error(f"创建 Kafka Consumer 时发生错误: {e}")raisedef check_heartbeat_alerts(self):# 初始化 defaultdict 以存储每个 URL 的监测状态列表url_groups = defaultdict(list)mysql_connection = Nonetry:# 创建 Kafka Consumer 并连接到 MySQL 数据库self.create_kafka_consumer()mysql_connection = mysql.connector.connect(host=self.mysql_host,port=self.mysql_port,user=self.mysql_user,password=self.mysql_password,database=self.mysql_database)mysql_cursor = mysql_connection.cursor()# 遍历 Kafka 消息for message in self.kafka_consumer:json_data = message.valueurl = json_data.get('url', {}).get('full')monitor_status = json_data.get('monitor', {}).get('status')timestamp_str = json_data.get('@timestamp')cmdb_data = json_data.get('cmdb_data')if url and monitor_status and timestamp_str:timestamp = self.convert_to_local_time(timestamp_str)# 处理连续 up 的情况if monitor_status == 'up' and self.url_exists_down_in_mysql(mysql_cursor, url):url_groups[url].append(monitor_status)mysql_cursor.fetchall()if len(url_groups[url]) >= self.consecutive_up_threshold and all(status == 'up' for status in url_groups[url][-self.consecutive_up_threshold:]):self.delete_from_mysql(mysql_cursor, url, mysql_connection)logger.info(f"URL: {url} - 由于连续 up 被从 MySQL 中移除")else:# 处理连续 down 的情况if monitor_status == 'down' and not self.url_exists_down_in_mysql(mysql_cursor, url):url_groups[url].append(monitor_status)mysql_cursor.fetchall()if len(url_groups[url]) >= self.consecutive_down_threshold and all(status == 'down' for status in url_groups[url][-self.consecutive_down_threshold:]):self.send_alert(url)self.write_to_mysql(mysql_cursor, timestamp, url, monitor_status, mysql_connection, cmdb_data)url_groups[url] = []logger.info(f"URL: {url} - 被添加到 MySQL 中")elif monitor_status == 'up' and self.url_exists_down_in_mysql(mysql_cursor, url):url_groups[url].append(monitor_status)mysql_cursor.fetchall()if len(url_groups[url]) >= self.consecutive_up_threshold and all(status == 'up' for status in url_groups[url][-self.consecutive_up_threshold:]):self.delete_from_mysql(mysql_cursor, url, mysql_connection)url_groups[url] = []logger.info(f"URL: {url} - 由于连续 up 被从 MySQL 中移除")except Exception as e:# 处理运行时错误logger.error(f"发生错误: {e}")finally:# 关闭 Kafka Consumer 和 MySQL 连接if self.kafka_consumer:self.kafka_consumer.close()if mysql_connection:mysql_connection.close()def send_alert(self, url):# 记录告警信息logger.info(f"告警: URL {url} 连续 {self.consecutive_down_threshold} 次掉线")@staticmethoddef write_to_mysql(cursor, timestamp, url, status, connection, cmdb_data=None):try:# 插入数据到 MySQL,包括 "cmdb_data" 字段insert_query = """INSERT INTO alert_list (timestamp, url, status, app_name, module, ops_user, ops_tel, ops_dep)VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"""cursor.execute(insert_query, (timestamp,url,status,cmdb_data.get('app_name', '') if cmdb_data else '',cmdb_data.get('module', '') if cmdb_data else '',cmdb_data.get('ops_user', '') if cmdb_data else '',cmdb_data.get('ops_tel', '') if cmdb_data else '',cmdb_data.get('ops_dep', '') if cmdb_data else '') if cmdb_data else (timestamp, url, status, '', '', '', '', ''))connection.commit()logging.info(f"Inserted into MySQL: URL {url}, Status {status}, cmdb_data {cmdb_data}")except Exception as e:# 处理写入 MySQL 错误logger.error(f"Error writing to MySQL: {e}")@staticmethoddef delete_from_mysql(cursor, url, connection):try:# 从 MySQL 删除数据delete_query = "DELETE FROM alert_list WHERE url = %s AND status = 'down'"cursor.execute(delete_query, (url,))connection.commit()logging.info(f"从 MySQL 中删除: URL {url}")except Exception as e:# 处理从 MySQL 删除错误logger.error(f"从 MySQL 中删除时发生错误: {e}")@staticmethoddef url_exists_down_in_mysql(cursor, url):try:# 检查 URL 是否存在于 MySQL 中query = "SELECT * FROM alert_list WHERE url = %s AND status = 'down'"cursor.execute(query, (url,))return bool(cursor.fetchone())except Exception as e:# 处理检查 URL 存在性错误logger.error(f"检查 URL 是否存在于 MySQL 中时发生错误: {e}")return False@staticmethoddef convert_to_local_time(timestamp_str):# 将 UTC 时间转换为本地时间timestamp_utc = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%fZ")timestamp_local = timestamp_utc + timedelta(hours=8)return timestamp_local.strftime("%Y-%m-%d %H:%M:%S")if __name__ == "__main__":try:# 运行主程序heartbeat_checker = HeartbeatChecker()heartbeat_checker.check_heartbeat_alerts()except KeyboardInterrupt:print("退出...")

appllication.yml如下:

# application.ymlkafka:bootstrap_servers:- '11.0.1.11:9092'- '11.0.1.12:9092'- '11.0.1.13:9092'group_id: 'python_alert'topic: 'ELK-system_heartbeat'mysql:host: '11.0.1.11'port: 13306user: 'root'password: '123456'database: 'zll_python_test'thresholds:consecutive_down: 1consecutive_up: 1

其中consecutive_down表示连续down几次触发告警,consecutive_up表示连续up几次告警恢复。

六、metricbeat告警

metricbeat可以配置的告警比较多,比如CPU、内存、文件系统、网络流量和进程数等,下一步研究。

这篇关于使用Python基于metricbeat和heartbeat采集数据进行告警的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/624709

相关文章

从零教你安装pytorch并在pycharm中使用

《从零教你安装pytorch并在pycharm中使用》本文详细介绍了如何使用Anaconda包管理工具创建虚拟环境,并安装CUDA加速平台和PyTorch库,同时在PyCharm中配置和使用PyTor... 目录背景介绍安装Anaconda安装CUDA安装pytorch报错解决——fbgemm.dll连接p

Vue项目的甘特图组件之dhtmlx-gantt使用教程和实现效果展示(推荐)

《Vue项目的甘特图组件之dhtmlx-gantt使用教程和实现效果展示(推荐)》文章介绍了如何使用dhtmlx-gantt组件来实现公司的甘特图需求,并提供了一个简单的Vue组件示例,文章还分享了一... 目录一、首先 npm 安装插件二、创建一个vue组件三、业务页面内 引用自定义组件:四、dhtmlx

使用Python创建一个能够筛选文件的PDF合并工具

《使用Python创建一个能够筛选文件的PDF合并工具》这篇文章主要为大家详细介绍了如何使用Python创建一个能够筛选文件的PDF合并工具,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录背景主要功能全部代码代码解析1. 初始化 wx.Frame 窗口2. 创建工具栏3. 创建布局和界面控件4

一文详解如何在Python中使用Requests库

《一文详解如何在Python中使用Requests库》:本文主要介绍如何在Python中使用Requests库的相关资料,Requests库是Python中常用的第三方库,用于简化HTTP请求的发... 目录前言1. 安装Requests库2. 发起GET请求3. 发送带有查询参数的GET请求4. 发起PO

Python与DeepSeek的深度融合实战

《Python与DeepSeek的深度融合实战》Python作为最受欢迎的编程语言之一,以其简洁易读的语法、丰富的库和广泛的应用场景,成为了无数开发者的首选,而DeepSeek,作为人工智能领域的新星... 目录一、python与DeepSeek的结合优势二、模型训练1. 数据准备2. 模型架构与参数设置3

Python进行PDF文件拆分的示例详解

《Python进行PDF文件拆分的示例详解》在日常生活中,我们常常会遇到大型的PDF文件,难以发送,将PDF拆分成多个小文件是一个实用的解决方案,下面我们就来看看如何使用Python实现PDF文件拆分... 目录使用工具将PDF按页数拆分将PDF的每一页拆分为单独的文件将PDF按指定页数拆分根据页码范围拆分

Java中的Cursor使用详解

《Java中的Cursor使用详解》本文介绍了Java中的Cursor接口及其在大数据集处理中的优势,包括逐行读取、分页处理、流控制、动态改变查询、并发控制和减少网络流量等,感兴趣的朋友一起看看吧... 最近看代码,有一段代码涉及到Cursor,感觉写法挺有意思的。注意是Cursor,而不是Consumer

javaScript在表单提交时获取表单数据的示例代码

《javaScript在表单提交时获取表单数据的示例代码》本文介绍了五种在JavaScript中获取表单数据的方法:使用FormData对象、手动提取表单数据、使用querySelector获取单个字... 方法 1:使用 FormData 对象FormData 是一个方便的内置对象,用于获取表单中的键值

Node.js net模块的使用示例

《Node.jsnet模块的使用示例》本文主要介绍了Node.jsnet模块的使用示例,net模块支持TCP通信,处理TCP连接和数据传输,具有一定的参考价值,感兴趣的可以了解一下... 目录简介引入 net 模块核心概念TCP (传输控制协议)Socket服务器TCP 服务器创建基本服务器服务器配置选项服

Python中常用的四种取整方式分享

《Python中常用的四种取整方式分享》在数据处理和数值计算中,取整操作是非常常见的需求,Python提供了多种取整方式,本文为大家整理了四种常用的方法,希望对大家有所帮助... 目录引言向零取整(Truncate)向下取整(Floor)向上取整(Ceil)四舍五入(Round)四种取整方式的对比综合示例应