elk笔记22.2--通过api快速创建索引

2024-05-30 23:48

本文主要是介绍elk笔记22.2--通过api快速创建索引,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

elk笔记22.2--通过api快速创建索引

  • 1 简介
  • 2 功能实现
    • 2.1 源码
    • 2.2 测试
  • 3 注意事项
  • 说明

1 简介

本文基于 elk笔记22–通过api快速创建索引 继续通过 api 快速创建索引。本节将追加一个db模块存储索引,并添加通过flask程序提供对外的api,后续会在此基础之上新增一个简单的UI界面,同时会追加一个索引定期删除功能。

2 功能实现

本文前置条件需要安装一套elk实例和mysql,具体安装方法可以参考笔者博文:
elk笔记1–搭建elk集群
elk笔记2–使用docker启一套elk实例
docker笔记5–配置MySQL

本文设计的主要模块包括如下内容:

  1. config.py,记录常见es、kibana、mysql 等基础配置
  2. db_helper.py,封装常见的mysql 操作
  3. es_helper.py,封装常见的elk操作
  4. create_index.py,flask主程序,用于实现创建和查看索引的主逻辑

2.1 源码

config.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-ES_URL = "http://127.0.0.1:9200"
KIBANA_URL = "http://127.0.0.1:5601"
ES_USER = "elastic"
ES_PWD = "elastic"
KIBANA_USER = "elastic"
KIBANA_PWD = "elastic"SQL_HOST = '127.0.0.1'
SQL_USER = 'root'
SQL_PWD = '111111'
SQL_DBNAME = 'bigdata_sre_log'

db_helper.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-import pymysql
import tracebackclass MysqlHelper:def __init__(self, sql_host, sql_user, sql_pwd, db_name):self.host = sql_hostself.usr = sql_userself.pwd = sql_pwdself.dbname = db_nameself.port = 3306self.charset = 'utf8'self.db = pymysql.connect(host=self.host, user=self.usr, passwd=self.pwd, db=self.dbname, charset=self.charset)def close_db(self):self.db.close()def show_version(self):# 使用 cursor() 方法创建一个游标对象 cursorcursor = self.db.cursor()# 使用 execute()  方法执行 SQL 查询cursor.execute("SELECT VERSION()")# 使用 fetchone() 方法获取单条数据.data = cursor.fetchone()print(f"Database version : {data}")def get_indices_list(self):ret_list = []instance = Nonesql = f"select number, cluster, index_name,save_days, ilp_name, datetime from es_indices"try:cursor = self.db.cursor()cursor.execute(sql)ret = cursor.fetchall()if ret is None:return []for item in ret:instance = {'number': item[0], 'cluster': item[1], 'index_name': item[2], 'save_days': item[3],'ilp_name': item[4], 'datetime': item[5].strftime("%Y-%m-%d %H:%M:%S")}ret_list.append(instance)print(f"get_indices_list succeed!")return True, ret_listexcept Exception as e:self.db.rollback()print(f"get_indices_list error:\n{e}, {traceback.print_exc()}")return False, f"get_indices_list error:\n{e}, {traceback.print_exc()}"def get_index_instance(self, cluster, index_name):sql = f"select number, cluster, index_name, save_days, ilp_name, datetime from es_indices " \f"where cluster='{cluster}' and index_name='{index_name}'"try:cursor = self.db.cursor()cursor.execute(sql)item = cursor.fetchone()if item is None:return False, f"no {cluster}/{index_name}"instance = {'number': item[0], 'cluster': item[1], 'index_name': item[2], 'save_days': item[3],'ilp_name': item[4], 'datetime': item[5].strftime("%Y-%m-%d %H:%M:%S")}print(f"get_index_instance succeed!")return True, instanceexcept Exception as e:print(f"get_index_instance error:\n{e}, {traceback.print_exc()}")return False, f"get_index_instance error:\n{e}, {traceback.print_exc()}"def judge_index_instance_exist(self, cluster, index_name):tag, ret = self.get_index_instance(cluster, index_name)if tag:return Truereturn Falsedef add_index_instance(self, instance):if self.judge_index_instance_exist(instance['cluster'], instance['index_name']):return False, f"{instance['cluster']}/{instance['index_name']} exist, please do not repeat!"cursor = self.db.cursor()sql = "INSERT INTO es_indices(cluster, index_name, save_days, ilp_name) VALUES ( %s, %s, %s, %s)"try:cursor.execute(sql, [instance['cluster'], instance['index_name'], instance['save_days'],instance['ilp_name']])self.db.commit()return True, f"add_index_instance succeed!"except Exception as e:self.db.rollback()print(f"add_index_instance error:\n{e}, {traceback.print_exc()}")return False, f"add_index_instance error:\n{e}, {traceback.print_exc()}"if __name__ == "__main__":sql_host = '127.0.0.1'sql_user = 'root'sql_pwd = '111111'sql_dbname = 'bigdata_sre_log'sql = MysqlHelper(sql_host, sql_user, sql_pwd, sql_dbname)sql.show_version()instance = {"index_name": "test-elk", "cluster": "sre-elk","save_days": 10, "ilp_name": "ilp-default"}print(sql.add_index_instance(instance))print(sql.get_indices_list())sql.close_db()

es_helper.py

#!/usr/bin/python
# -*- coding:utf-8 -*-import requests
from base64 import b64encodedef get_base64_str(str_text):str_user_pwd = b64encode(str_text.encode('ascii'))return str_user_pwd.decode('ascii')class ElasticHelper:def __init__(self, es_url, es_user, es_pwd, es_index_name, es_ilp_name):self.url = es_urlself.user = es_userself.pwd = es_pwdself.index_name = es_index_nameself.ilp_name = es_ilp_namedef get_index_template(self):print(f"get_index_template: {self.index_name}")url_full = self.url + f"/_template/{self.index_name}"re = requests.get(url_full, auth=(self.user, self.pwd))if re.status_code == 200:return re.json()else:print("request error, not 200")return Nonedef put_index_template(self):body = {'order': 0,'index_patterns': [f"{self.index_name}-*"],'settings': {'index': {'number_of_shards': '2','number_of_replicas': '0'}},'mappings': {"properties": {"@timestamp": {"type": "date"}}},'aliases': {}}url_full = self.url + f"/_template/{self.index_name}"re = requests.put(url_full, json=body, auth=(self.user, self.pwd))if re.status_code == 200:print(re.json())else:print("request error, not 200")def update_index_template_with_ilp(self):ret = self.get_index_template()if ret is None:print(f"index template {self.index_name}-* not exist, please create one!")body = ret[self.index_name]body['settings']['index']['lifecycle'] = {'name': f'{self.ilp_name}','rollover_alias': f'{self.index_name}_write'}re = requests.post(f"{self.url}/_template/{self.index_name}", json=body, auth=('elastic', 'elastic'))if re.status_code == 200:print(f"put_index_template(index={self.index_name},ilp={self.ilp_name}) succeed:\ninfo {re.status_code}, "f"{re.text}")else:print(f"put_index_template(index={self.index_name},ilp={self.ilp_name}) failed:\ninfo {re.status_code}, "f"{re.text}")def create_rollover_index(self):body = {"aliases": {f"{self.index_name}_write": {}}}re = requests.put(url=self.url+f"/%3C{self.index_name}-%7Bnow%2Fd%7D-000001%3E", json=body, auth=(self.user,self.pwd))if re.status_code == 200:print(re.json())else:print(f"create rollover index {self.index_name} failed!\n{re.status_code},\n{re.text}")def judge_alias_exist(self):re = requests.get(f"{self.url}/_cat/aliases", auth=('elastic', 'elastic'))if re.status_code == 200:ret_str = re.textif ret_str.find(f"{self.index_name}_write") != -1:return Trueelse:Falseelse:print(f"judge_alias_exist {self.index_name}_write failed!\n{re.status_code},\n{re.text}")return Falseclass KibanaHelper:def __init__(self, kibana_url, kibana_user, kibana_pwd, kibana_index_pattern_name):self.url = kibana_urlself.user = kibana_userself.pwd = kibana_pwdself.index_pattern_name = kibana_index_pattern_namedef get_index_pattern_id(self):url = f'{self.url}/api/saved_objects/_find?fields=title&fields=type&per_page=10000&type=index-pattern'headers = {'Authorization': 'Basic ' + get_base64_str(self.user + ':' + self.pwd)}ret = requests.get(url=url, headers=headers)for item in (ret.json())['saved_objects']:if item['attributes']['title'] == self.index_pattern_name:return item['id']return Nonedef get_kibana_index_pattern(self):print(f"get_kibana_index_pattern {index_pattern_name}-*")headers = {'Authorization': 'Basic ' + get_base64_str(self.user + ':' + self.pwd), 'kbn-xsrf': 'reporting'}id = self.get_index_pattern_id()url_full = self.url + f"/api/saved_objects/_bulk_get"body = [{"id": id, "type": "index-pattern"}]re = requests.post(url=url_full, json=body, headers=headers)if re.status_code == 200:print(re.text)else:print(f"error:\n {re.text}")def create_kibana_index_pattern(self):print(f"index_pattern={self.index_pattern_name}")ret = self.get_index_pattern_id()if ret is not None:print(f"{self.index_pattern_name} has been existed, please do not repeat")exit(0)headers = {'Authorization': 'Basic ' + get_base64_str(self.user + ':' + self.pwd), 'kbn-xsrf': 'reporting'}url_full = self.url + "/api/saved_objects/index-pattern"body = {"attributes": {"title": self.index_pattern_name,"timeFieldName": "@timestamp"}}re = requests.post(url_full, json=body, headers=headers)if re.status_code == 200:print(re.text)else:print(f"error:\n {re.text}")def delete_index_pattern_id(self, index_pattern_id):print(f"delete index_pattern_id={index_pattern_id}")headers = {'Authorization': 'Basic ' + get_base64_str(self.user + ':' + self.pwd), 'kbn-xsrf': 'reporting'}url_full = self.url + f"/api/saved_objects/index-pattern/{index_pattern_id}?force=false"re = requests.delete(url_full, headers=headers)if re.status_code == 200:print(f"delete_index_pattern_id({index_pattern_id}) succeed:\ninfo {re.status_code}, {re.text}")else:print(f"delete_index_pattern_id({index_pattern_id}) error:\ninfo {re.status_code}, {re.text}")def get_ilm(self):url_full = self.url+"/api/index_lifecycle_management/policies?withIndices=true"headers = {'Authorization': 'Basic ' + get_base64_str(self.user + ':' + self.pwd), 'kbn-xsrf': 'reporting'}re = requests.get(url_full, headers=headers)if re.status_code == 200:print(f"get_ilm succeed")return re.json()else:print(f"get_ilm failed\ninfo {re.status_code},{re.text}")return Nonedef judge_ilp_exist(self, ilp_name):ilp_list = self.get_ilm()if ilp_list is None:return Falseelse:for item in ilp_list:if ilp_name == item["name"]:return Truereturn Falseif __name__ == "__main__":es_url = "http://127.0.0.1:9200"es_user = "elastic"es_pwd = "elastic"kibana_url = "http://127.0.0.1:5601"kibana_user = "elastic"kibana_pwd = "elastic"index_name = "test-elk"index_pattern_name = index_name + "-*"ilp_name = "ilp-default"es = ElasticHelper(es_url, es_user, es_pwd, index_name, ilp_name)kibana = KibanaHelper(kibana_url, kibana_user, kibana_pwd, index_pattern_name)es.put_index_template()if not kibana.judge_ilp_exist(ilp_name):print(f"ilp_name={ilp_name} not exist, please change another one")exit(0)es.update_index_template_with_ilp()if es.judge_alias_exist():print(f"alias_name={index_name}_write exists, please change another one")exit(0)es.create_rollover_index()kibana.create_kibana_index_pattern()

create_index.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-'''
add_index
list/<cluster>/<index_name>
'''import json
from flask import Flask
from flask import requestfrom db_helper import MysqlHelper
from es_helper import ElasticHelper, KibanaHelper
from config import *app = Flask(__name__)@app.route('/')
def voice_root():return 'Hello, this web service is for create es index!\n'@app.route('/add_index/', methods=['GET', 'POST'])
def add_index():if request.method == 'POST':post_data = request.datapost_dict = json.loads(post_data)# print(f"add_index post_data:\n{post_dict}")# post_json_str = json.dumps(post_dict)# print(f"add_index post_json_str:\n{post_json_str}")instance = {'index_name': post_dict['index_name'], "cluster": post_dict['cluster'],"save_days": post_dict['save_days'], "ilp_name": post_dict['ilp_name']}sql = MysqlHelper(SQL_HOST, SQL_USER, SQL_PWD, SQL_DBNAME)ret = sql.judge_index_instance_exist(post_dict['cluster'], post_dict['index_name'])if ret:return f"index {post_dict['cluster']}/{post_dict['index_name']} exists, please do not repeat"index_name = post_dict['index_name']index_pattern_name = index_name + "-*"ilp_name = post_dict['ilp_name']es = ElasticHelper(ES_URL, ES_USER, ES_PWD, index_name, ilp_name)kibana = KibanaHelper(KIBANA_URL, KIBANA_USER, KIBANA_PWD, index_pattern_name)if not kibana.judge_ilp_exist(ilp_name):print(f"ilp_name={ilp_name} not exist, please change another one")return f"ilp_name={ilp_name} not exist, please change another one"es.put_index_template()es.update_index_template_with_ilp()if es.judge_alias_exist():print(f"alias_name={index_name}_write exists, please change another one")return f"alias_name={index_name}_write exists, please change another one"es.create_rollover_index()kibana.create_kibana_index_pattern()tag, ret = sql.add_index_instance(instance)return f"{tag}\n{ret}"else:return 'add_index, please post data!\n'@app.route('/list/')
@app.route('/list/<cluster>/<index_name>/')
def list_index(cluster=None, index_name=None):print(f"1,{cluster},2{index_name}")if (cluster is None) and (index_name is None):sql = MysqlHelper(SQL_HOST, SQL_USER, SQL_PWD, SQL_DBNAME)tag, ret = sql.get_indices_list()sql.close_db()if tag:print(ret)return json.dumps(ret)else:return retelse:sql = MysqlHelper(SQL_HOST, SQL_USER, SQL_PWD, SQL_DBNAME)tag, ret = sql.get_index_instance(cluster, index_name)sql.close_db()if tag:print(ret)return json.dumps(ret)else:return retif __name__ == '__main__':app.run(host="0.0.0.0", port=8080)

2.2 测试

新建数据库:

数据库名称:bigdata_sre_log
表名称:es_indices
字段:
number INT,自增
cluster VARCHAR
index_name VARCHAR
save_days INT
ilp_name VARCHAR
datetime DATETIME, 默认值 CURRENT_TIMESTAMP

post 如下内容:

POST http://127.0.0.1:8080/add_index
{"index_name": "test-elk", "cluster": "sre-elk", "save_days": 10, "ilp_name": "ilp-default"}

在这里插入图片描述
kibana 查看数据(先写入1条数据):

POST test-elk_write/_doc
{"@timestamp": "2021-08-30T23:00:00+08:00","name": "test001"
}

在这里插入图片描述
调试结果(正常输入相关日志):
在这里插入图片描述
查看索引:
GET http://127.0.0.1:8080/list/
在这里插入图片描述

3 注意事项

  1. 本文只提供了简单的索引增加和查找功能,实际中最好增加配置修改和索引删除功能。

说明

软件环境:
ubuntu 20.04 desktop
elk 7.10.0(docker)
python 3.8
mysql 8.0(docker)
参考文档:
1 elasticsearch/reference/current/index.html
2 kibana/current/index.html
3 docker笔记5–配置MySQL

这篇关于elk笔记22.2--通过api快速创建索引的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1017093

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

电脑桌面文件删除了怎么找回来?别急,快速恢复攻略在此

在日常使用电脑的过程中,我们经常会遇到这样的情况:一不小心,桌面上的某个重要文件被删除了。这时,大多数人可能会感到惊慌失措,不知所措。 其实,不必过于担心,因为有很多方法可以帮助我们找回被删除的桌面文件。下面,就让我们一起来了解一下这些恢复桌面文件的方法吧。 一、使用撤销操作 如果我们刚刚删除了桌面上的文件,并且还没有进行其他操作,那么可以尝试使用撤销操作来恢复文件。在键盘上同时按下“C

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

hdu 4565 推倒公式+矩阵快速幂

题意 求下式的值: Sn=⌈ (a+b√)n⌉%m S_n = \lceil\ (a + \sqrt{b}) ^ n \rceil\% m 其中: 0<a,m<215 0< a, m < 2^{15} 0<b,n<231 0 < b, n < 2^{31} (a−1)2<b<a2 (a-1)^2< b < a^2 解析 令: An=(a+b√)n A_n = (a +

v0.dev快速开发

探索v0.dev:次世代开发者之利器 今之技艺日新月异,开发者之工具亦随之进步不辍。v0.dev者,新兴之开发者利器也,迅速引起众多开发者之瞩目。本文将引汝探究v0.dev之基本功能与优势,助汝速速上手,提升开发之效率。 何谓v0.dev? v0.dev者,现代化之开发者工具也,旨在简化并加速软件开发之过程。其集多种功能于一体,助开发者高效编写、测试及部署代码。无论汝为前端开发者、后端开发者

顺序表之创建,判满,插入,输出

文章目录 🍊自我介绍🍊创建一个空的顺序表,为结构体在堆区分配空间🍊插入数据🍊输出数据🍊判断顺序表是否满了,满了返回值1,否则返回0🍊main函数 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以:点赞+关注+评论+收藏(一键四连)哦~ 🍊自我介绍   Hello,大家好,我是小珑也要变强(也是小珑),我是易编程·终身成长社群的一名“创始团队·嘉宾”