(5)EII Kapacitor模块详解(附机器学习案例实操)

2023-10-21 11:59

本文主要是介绍(5)EII Kapacitor模块详解(附机器学习案例实操),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 前言

基于EII系列的前4篇文章,我们基本了解了EII的安装,EII消息总线,以及时序数据的采集(Telegraf模块),存储(InfluxDB模块)和可视化(Grafana)过程。

本文我们将进入EII时序栈中最为复杂的一个模块:数据处理模块Kapacitor。

EII3.0中用到的Kapacitor版本是1.5.4,官方教程链接:https://docs.influxdata.com/kapacitor/v1.5/

Kapacitor是一个开源的数据处理框架,同样由InfluxData公司发布,是TICK栈的其中一个组件。使用Kapacitor可以很方便的创建报警,ETL,异常检测等任务。Kapacitor主要特点如下:

  • 支持流处理和批处理两种模式。
  • 支持定时任务。
  • 全面兼容InfluxQL语法。
  • 支持将处理结果存回InfluxDB数据库。
  • 支持集成用户自定义的处理算法。
  • 支持集成多种报警软件: HipChat, OpsGenie, Alerta, Sensu, PagerDuty, Slack等。

2. 案例实操

2.1 温度监控案例

Kapacitor有着自己的一套编写规则的脚本语言TICKScript,我们仍然以温度监控的案例,来实操一下,实现一个简单的温度值过滤的案例。

2.1.1 配置阶段

首先,我们去到EII Kapacitor模块的目录下“IEdgeInsights/Kapacitor”。Kapacitor模块下所有的规则脚本都保存在目录"tick_scripts"下。我们在其中创建一个新的脚本文件"temp_filter.tick",并在其中输入如下内容:

dbrp "datain"."autogen"var data0 = stream|from().database('datain').retentionPolicy('autogen').measurement('point_data').where(lambda: "temperature" < 20 OR "temperature" > 25)|influxDBOut().buffer(0).database('datain').measurement('point_filter').retentionPolicy('autogen')

上述TICKScript实现了一个简单的数据过滤的功能,首先它通过"from()"接口从数据库"datain"的数据表"point_data"中读取数据,然后它通过"where()"语句进行了数据的筛选,将小于20和大于25的数据筛选出来,那么处于20到25之间的数据就过滤掉了。然后,再通过"influxDBOut()"接口将数据输出到数据库"datain"的另外一张表"point_filter"里。

完成了处理规则TICKScript的编写,接着我们要在EII的配置文件中引用它。打开"Kapacitor/config.json"文件,在其中的"task"字段增加一个新的任务,将刚刚定义的"temp_filter.tick"脚本引用进来。"Kapacitor/config.json"文件内容,参考如下:

{"config": {"cert_type": ["zmq", "pem"],"influxdb": {},"task": [{"task_name": "go_point_classifier","tick_script": "go_point_classifier.tick","udfs": [{"type": "go","name": "go_classifier"}]},{"task_name": "temp_filter","tick_script": "temp_filter.tick"}]},"interfaces": {}
}

完成上述配置之后,我们需要将目录"tick_scripts"挂载到容器中,这样可以避免稍后重新编译Kapacitor的docker容器镜像。在文件"Kapacitor/docker-compose.yml"中"volumes"字段下,增加"tick_scripts"目录的挂载,参考如下(最后一行):

volumes:- "vol_temp_kapacitor:/tmp/"- "vol_eii_socket:${SOCKET_DIR}"- "vol_dev_shm:/dev/shm"- ./Certificates/Kapacitor:/run/secrets/Kapacitor:ro- ./Certificates/rootca/cacert.pem:/run/secrets/rootca/cacert.pem:ro- ../Kapacitor/tick_scripts:/EII/tick_scripts

2.1.2 运行阶段

接下来,我们就可以参考先前文章中温度监控案例的步骤,将EII软件栈运行起来。

首先,在目录"IEdgeInsights/build/usecases"下,创建一个模块清单文件"test-mqtt.yml",并添加如下模块信息:

AppContexts:
- ConfigMgrAgent
- tools/mqtt/broker
- tools/mqtt/publisher
- Telegraf
- InfluxDBConnector
- Grafana
- Kapacitor

可以看到,这里引用了EII时序栈相关的4个模块(Telegraf, InfluxDB, Grafana, Kapacitor),并使用了EII自带的MQTT发布模块"tools/mqtt/publisher"来发布测试数据。

接下来,先确认MQTT发布模块(目录"tools/mqtt/publisher")发布的数据类型。打开文件"tools/mqtt/publisher/docker-compose.yml",确认文件末尾的"command"是如下内容:

command: ["--temperature", "10:30"]

基于上述配置,MQTT发布模块将会发送10到30之间随机的温度值,默认的发送频率是1秒,默认topic的名字是"temperature/simulated/0"。

接着,确认"Telegraf"模块的配置。先看到"Telegraf/config.json"文件,在先前文章的实操中,大家可能配置了EII Message Bus相关的"Subscribers"接口,如果配置了这里可能先去掉 (避免遇到Bug),将"Telegraf/config.json"文件还原到初始设置,内容参考如下:

{"config": {"cert_type": ["zmq", "pem"],"influxdb": {"dbname": "datain"},"publisher": {"measurements": ["*"],"profiling": "false"}},"interfaces": {"Publishers": [{"Name": "publisher","Type": "zmq_tcp","EndPoint": "0.0.0.0:65077","Topics": ["*"],"AllowedClients": ["*"]}]}
}

然后,我们检查"Telegraf/config/Telegraf/Telegraf_devmode.conf"配置文件,在其中搜索(VSCode快捷键Ctrl + F)关键字"temperature/simulated/0",找到telegraf mqtt input插件的定义(EII已默认配置好mqtt input插件,这里不用修改什么,只是检查一下)。可以看到Telegraf接收topic名为"temperature/simulated/0"的消息,并将其重命名为"point_data",这里的’point_data"对应的就是数据存到数据库之后,数据表的名字。

[[inputs.mqtt_consumer]]
#   ## MQTT broker URLs to be used. The format should be scheme://host:port,
#   ## schema can be tcp, ssl, or ws.servers = ["tcp://$MQTT_BROKER_HOST:1883"]
#
#   ## MQTT QoS, must be 0, 1, or 2
#   qos = 0
#   ## Connection timeout for initial connection in seconds
#   connection_timeout = "30s"
#
#   ## Topics to subscribe totopics = ["temperature/simulated/0",  ]name_override = "point_data"data_format = "json"
#
#   # if true, messages that can't be delivered while the subscriber is offline
#   # will be delivered when it comes back (such as on service restart).
#   # NOTE: if true, client_id MUST be setpersistent_session = false
#   # If empty, a random client ID will be generated.client_id = ""
#
#   ## username and password to connect MQTT server.username = ""password = ""

最后,确认"Telegraf/docker-compose.yml"文件中,是否将配置文件挂载到了容器内,参考如下(最后一行):

volumes:- "vol_temp_telegraf:/tmp/"- "vol_eii_socket:${SOCKET_DIR}"- ./Certificates/Telegraf:/run/secrets/Telegraf:ro- ./Certificates/rootca/cacert.pem:/run/secrets/rootca/cacert.pem:ro- "../Telegraf/config/Telegraf/Telegraf_devmode.conf:/etc/Telegraf/Telegraf/Telegraf_devmode.conf"

完成以上确认,现在我们终于可以启动EII软件栈了(经过前面几篇文章的实操,读者应该比较熟悉EII软件栈启动流程了)。

  • 首先编译配置文件。
$ cd IEdgeInsights/build
$ sudo -E python3 builder.py -f usecases/test-mqtt.yml
  • 由于我们修改的文件都是通过挂载的方式挂载到容器中,所以无需重新编译docker镜像,直接启动EII软件栈即可。
$ ./eii_start.sh
  • 按照先前文章介绍的方法,进入Grafana界面,创建一个Dashboard。然后创建一个图表显示“point_data”表中的数据,再创建另外一个图标显示"point_filter"表中的数据。
    在这里插入图片描述
    通过对比两个表格中的数据可以验证,Kapacitor模块中"temp_filter.tick"脚本的数据过滤生效了。

  • 关闭EII软件栈的命令。

$ docker-compose down

2.2 钢板缺陷检测案例

上一节,我们尝试通过Kapacitor的TICKScript来编写了一个简单的数据过滤算法。

这一节,我们来实操一个更加复杂的案例:钢板缺陷检测案例。该案例中会用到随机森林(Random Forest)机器学习算法。

在前言中介绍Kapacitor特点时,其中有一条提到"支持集成用户自定义的处理算法",那么机器学习算法就是通过Kapacitor的用户自定义功能块进行集成的,英文叫做UDF(User Defined Function)。

2.2.1 数据集准备

笔者在Kaggle平台上找到了一份公开的"钢板缺陷"的数据集"Faulty Steel Plates"。

该数据集一共有1941笔数据。提供了27个维度的信息,最终可以分析出6种钢板缺陷的分类。

原始数据可从如下链接下载:
https://www.kaggle.com/datasets/uciml/faulty-steel-plates

拿到数据后,我们需要做一些预处理。

  1. 需要将原始数据的标签从多列的方式转化成单列。
    在这里插入图片描述
  2. 需要将原始数据拆分成"训练集"和"测试集"。"训练集"用于训练,"测试集"用于测试。笔者已经将拆分好的数据集上传,大家可通过下面链接直接下载。1552笔数据用于训练,389笔数据用于测试。
  • 训练集(1552笔数据):https://pan.quark.cn/s/0d64b27d1ce3
  • 测试集(389笔数据):https://pan.quark.cn/s/99d438361e42

接下来,我们按照数据流的顺便依次配置EII的各个模块,数据流顺序如下:MQTT发布模块 → Telegraf模块 → InfluxDB模块 → Kapacitor模块 → Grafana模块。

InfluxDB模块和Grafana模块,我们可以采用EII默认的配置,无需修改。那么接下来我们依次介绍MQTT发布模块,Telegraf模块,Kapacitor模块的配置步骤。

2.2.2 配置MQTT发布模块

MQTT发布模块(位于EII源码目录"tools/mqtt/publisher"下)是数据产生的源头,我们需要让它将steel plate"测试集"中的数据通过MQTT协议发布出来。

接下来,开始配置。

  • 将测试集"steel_plate_test_data.csv"拷贝到目录"tools/mqtt/publisher/csv_files"目录下(若目录不存在,则创建)。

  • 然后在"tools/mqtt/publisher/Dockerfile"文件中,将"csv_files"目录拷贝到容器中,在"Dockerfile"的第34行添加如下内容:

COPY csv_files /csv_files
  • MQTT发布模块的"publisher.py"脚本自带的发送csv文件数据的函数"stream_csv()"不满足我们的需求(它会将csv表格中的每行数据,合成一个字符串来发送,这种方式不方面后续Grafana进行数据的可视化),所以我们重新实现了一个发送csv文件数据的函数"my_stream_csv()"函数。将如下代码,拷贝到文件"tools/mqtt/publisher/publisher.py"中 (例如拷贝到stream_csv()函数的上方,大概在110行的位置)。该函数会将csv文件中的每行数据,封装成json格式进行发送。
def my_stream_csv(mqttc, topic, interval, filename, len):"""Stream the csv file"""target_start_time = time.time()row_count = 0row_served = 0jencoder = json.JSONEncoder()print("\nMQTT Topic: {}\nInterval: {}\nFilename: {}\nLen: {}\n".format(topic, interval, filename, len))wait_time = 5print(f"Waiting {wait_time}s to send data...")time.sleep(wait_time)with open(filename, 'r') as fileobject:# Get headerheader = next(fileobject)header = [x for x in header.split(',') if x not in (' \n', ' \r\n')][:len]for row in fileobject:row = [x for x in row.split(',') if x not in (' \n', ' \r\n')][:len]if re.match(r'^-?\d+', row[0]) is None:continuerow_count += 1row_served += 1values = [float(x) for x in row]msg = {}for i in range(len):msg[header[i]] = values[i]mqttc.publish(topic, json.dumps(msg))time.sleep(interval)print('{} rows served in {}'.format(row_served, time.time() - target_start_time))print('{} Done! {} rows served in {}'.format(filename, row_served, time.time() - target_start_time))
  • 然后修改文件"tools/mqtt/publisher/publisher.py"中,main()函数下if args.csv is not None:后面的语句,引用新实现的my_stream_csv()函数,参考如下:
# 将
stream_csv(client,args.topic,args.subsample,args.sampling_rate,args.csv)
# 修改为
my_stream_csv(client,args.topic,args.interval,args.csv,27)
  • 最后,修改"tools/mqtt/publisher/docker-compose.yml"文件末尾的"command"字段,使其发布steel plate测试集中的数据,参考如下:
command: ["--topic", "test/steel_plate_data", "--csv", "./csv_files/steel_plate_test_data.csv", "--interval", "1"]

上述"command"含义如下:MQTT发布模块将会以topic名"test/steel_plate_data"发布数据,发送的是文件"csv_files/steel_plate_test_data.csv"中的数据,发送频率是1秒。

以上我们就完成了MQTT发布模块的配置。在配置过程中,我们对MQTT发布模块的Dockerfile进行的修改,后续需要重新编译该模块的docker镜像文件。

2.2.3 配置Telegraf模块

上一节我们配置了MQTT发布模块,它会以特定的topic发布steel plate测试集中的数据。接下来,我们需要配置Telegraf模块,让它能够接收到该topic的数据。

  • 打开文件"Telegraf/config/Telegraf/Telegraf_devmode.conf"文件,添加如下内容:(大概添加在3968行左右的位置,和其他inputs插件写在一起)
[[inputs.mqtt_consumer]]servers = ["tcp://$MQTT_BROKER_HOST:1883"]topics = ["test/steel_plate_data",]name_override = "steel_plate_data"data_format = "json"persistent_session = falseclient_id = ""username = ""password = ""

该配置文件的含义如下:Telegraf模块将会订阅topic名为"test/steel_plate_data"的数据,并重命名为"steel_plate_data",该名字即对应InfluxDB数据库中数据表的名字。

以上我们就完成了Telegraf模块的配置。

InfluxDB模块我们可以使用EII默认配置,不需要做修改,那么接下来进入最重要的Kapacitor模块。

2.2.4 配置Kapacitor模块

当MQTT发布模块发布的数据,经由Telegraf模块接收,并存入InfluxDB数据库后,Kapacitor模块就可以开始从InfluxDB数据库拉取数据,并处理了。接下来,我们进入Kapacitor模块的配置,其中我们会开发一个机器学习的UDF。

  • 首先我们将训练集"steel_plate_train_data.csv"拷贝到目录"Kapacitor/training_data_sets"中,该目录是Kapacitor模块用来存放训练数据的地方。

  • 接着我们来编写一个UDF(用户自定义功能块),在其中进行模型训练,并用训练好的模型进行推理。Kapacitor模块中的UDF都保存在目录"udfs"中,所以在目录"Kapacitor/udfs"目录下,创建一个新的python脚本"steel_plate_fault_classifier.py",并添加如下内容:

from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
from sklearn.ensemble import RandomForestClassifier
import time
import logging
import os
import pandas as pd
from sklearnex import patch_sklearn
patch_sklearn()logging.basicConfig(level=logging.INFO,filemode='w',filename='/EII/sockets/kapacitor.log',format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()class RfcHandler(Handler):"""Random Forest Classifier Handler"""def __init__(self, agent):self._agent = agentlogging.info("Training started...")train_data = pd.read_csv('/EII/training_data_sets/steel_plate_train_data.csv')X_train = train_data.iloc[:, :-1]y_train = train_data.iloc[:, -1]self.rfc = RandomForestClassifier(n_estimators=150, max_depth = 10, verbose=False)self.rfc.fit(X_train, y_train)logging.info("training complete...")def info(self):response = udf_pb2.Response()response.info.wants = udf_pb2.STREAMresponse.info.provides = udf_pb2.STREAMreturn responsedef init(self, init_req):response = udf_pb2.Response()response.init.success = Truereturn responsedef begin_batch(self, begin_req):raise Exception("not supported")def point(self, point):"""Store and processing of the point:param point: the body of the point received:type point: udf_pb2.Point"""start_time = time.time()self.response = udf_pb2.Response()logging.info(f"processing, start time: {start_time}")field_name = ['X_Minimum', 'X_Maximum', 'Y_Minimum', 'Y_Maximum', 'Pixels_Areas','X_Perimeter',  'Y_Perimeter', 'Sum_of_Luminosity', 'Minimum_of_Luminosity', 'Maximum_of_Luminosity', 'Length_of_Conveyer', 'TypeOfSteel_A300', 'TypeOfSteel_A400', 'Steel_Plate_Thickness', 'Edges_Index',	'Empty_Index', 'Square_Index', 'Outside_X_Index', 'Edges_X_Index',	'Edges_Y_Index', 'Outside_Global_Index', 'LogOfAreas', 'Log_X_Index', 'Log_Y_Index', 'Orientation_Index', 'Luminosity_Index', 'SigmoidOfAreas']df = pd.DataFrame(columns = field_name)point_msg = []for i in range(len(field_name)):point_msg.append(point.fieldsDouble[field_name[i]])a_msg = pd.Series(point_msg, index = df.columns)df = df.append(a_msg, ignore_index=True)prediction = self.rfc.predict(df)[0]# Simulate result: 0 - good, 1 - badsimulate_result = 0if prediction in ['Z_Scratch', 'K_Scatch']:simulate_result = 1stop_time = time.time()infer_time_ms = stop_time - start_timelogging.info(f"predition: {prediction}, infer time: {infer_time_ms} ms")self.response.point.CopyFrom(point)self.response.point.fieldsString['category'] = predictionself.response.point.fieldsInt['result'] = simulate_resultself.response.point.fieldsDouble['infer_time_ms'] = infer_time_msself._agent.write_response(self.response, True)def end_batch(self, batch_meta):raise Exception("not supported")def snapshot(self):response = udf_pb2.Response()response.snapshot.snapshot = bytes('', 'utf-8')return responsedef restore(self, restore_req):response = udf_pb2.Response()response.restore.success = Falseresponse.restore.error = bytes('not implemented', 'utf-8')return responseif __name__ == '__main__':# Create an agentagent = Agent()# Create a handler and pass it an agent so it can write pointsh = RfcHandler(agent)# Set the handler on the agentagent.handler = h# Anything printed to STDERR from a UDF process gets captured# into the Kapacitor logs.agent.start()agent.wait()

以上python脚本即是实现随机森林算法的UDF,UDF的实现有其固定的模板,官方参考如下:https://docs.influxdata.com/kapacitor/v1.5/guides/anomaly_detection/

算法详情大家可以浏览上面的代码,简要说明如下:

  1. 在方法__init__()中,实现了机器学习的训练过程。
  2. 在方法info()中,标明了采用流处理的方式。
  3. 既然是采用流处理的方式,那么用于批处理的两个方式begin_batch()end_batch()就可以实现了。
  4. point(self, point)方式是进行数据处理的地方,其中的"point"参数代表的就是数据表中的一笔数据。

值得注意的是,由于UDF中的log不会输出到命令行窗口,所以这里我们进行了配置(python脚本中的11-15行),将log输出到了文件中,方便后续调试程序。

logging.basicConfig(level=logging.INFO,filemode='w',filename='/EII/sockets/kapacitor.log',format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
  • 接下来,需要将UDF添加到Kapacitor的配置文件"Kapacitor/kapacitor_devmode.conf"中,打开该文件,在[udf.functions]字段下 (大概在532行附近),将"steel_plate_fault_classifier.py"添加进去,参考如下写法:
# Configuration for UDFs (User Defined Functions)[udf.functions][udf.functions.go_point_classifier]socket = "/tmp/point_classifier"timeout = "20s"[udf.functions.rfc]prog = "python3"args = ["-u", "/EII/udfs/rfc_classifier.py"]timeout = "60s"[udf.functions.rfc.env]PYTHONPATH = "/go/src/github.com/influxdata/kapacitor/udf/agent/py/:/EII/.local/lib/python3.9/site-packages/:/opt/conda/envs/env/lib/python3.9/site-packages/"[udf.functions.steel_plate_fault_classifier]prog = "python3"args = ["-u", "/EII/udfs/steel_plate_fault_classifier.py"]timeout = "60s"[udf.functions.steel_plate_fault_classifier.env]PYTHONPATH = "/go/src/github.com/influxdata/kapacitor/udf/agent/py/:/EII/.local/lib/python3.9/site-packages/:/opt/conda/envs/env/lib/python3.9/site-packages/"
  • 然后我们需要编写一个TICKScript来调用该UDF。在"Kapacitor/tick_scripts"目录下,创建一个新的脚本"steel_plate.tick",并添加如下内容:
dbrp "datain"."autogen"var data0 = stream|from().database('datain').retentionPolicy('autogen').measurement('steel_plate_data')@steel_plate_fault_classifier()|influxDBOut().buffer(0).database('datain').measurement('steel_plate_result').retentionPolicy('autogen')

有了编写温度监控案例TICKScript的经验,那么这里就比较好理解了。"steel_plate.tick"脚本实现了从数据库"datain"的数据表"steel_plate_data"中读取数据,然后经过UDF函数"steel_plate_fault_classifier"的处理,最后将处理结果写回数据库"datain"的数据表"steel_plate_result"中。注意这里调用UDF的写法,即是这样:@steel_plate_fault_classifier()

TICKScript中调用的UDF函数名"steel_plate_fault_classifier",即是文件"Kapacitor/kapacitor_devmode.conf"中定义的[udf.functions.steel_plate_fault_classifier]

  • 接着,需要在"Kapacitor/config.json"目录中添加"steel_plate.tick"脚本,使得Kapacitor模块在启动时,能将"steel_plate.tick" enable起来。打开"Kapacitor/config.json"文件,在其中添加一个新的task引用"steel_plate.tick"脚本,参考如下:
{"config": {"cert_type": ["zmq", "pem"],"influxdb": {},"task": [{"task_name": "go_point_classifier","tick_script": "go_point_classifier.tick","udfs": [{"type": "go","name": "go_classifier"}]},{"task_name": "temp_filter","tick_script": "temp_filter.tick"},{"task_name": "steel_plate_classifier","tick_script": "steel_plate.tick"}]},"interfaces": {}
}
  • 最后,我们将修改过的文件所在的目录都挂载到容器内,这样可以避免后续编译Kapacitor模块docker容器的过程。打开文件"Kapacitor/docker-compose.yml",在其中的volumes字段添加挂载信息,参考如下:
volumes:- "vol_temp_kapacitor:/tmp/"- "vol_eii_socket:${SOCKET_DIR}"- "vol_dev_shm:/dev/shm"- ./Certificates/Kapacitor:/run/secrets/Kapacitor:ro- ./Certificates/rootca/cacert.pem:/run/secrets/rootca/cacert.pem:ro- ../Kapacitor/tick_scripts:/EII/tick_scripts- ../Kapacitor/udfs:/EII/udfs- ../Kapacitor/training_data_sets:/EII/training_data_sets- ../Kapacitor/config:/EII/config

以上我们就完成了Kapacitor模块的配置。

2.2.5 启动案例

所有模块配置完成后,我们来启动EII软件栈。

  • 首先,在目录"IEdgeInsights/build/usecases"下,检查模块清单文件"test-mqtt.yml",包含如下模块信息:
AppContexts:
- ConfigMgrAgent
- tools/mqtt/broker
- tools/mqtt/publisher
- Telegraf
- InfluxDBConnector
- Grafana
- Kapacitor
  • 然后,编译配置文件。
$ cd IEdgeInsights/build
$ sudo -E python3 builder.py -f usecases/test-mqtt.yml
  • MQTT发布模块需要重新编译docker镜像,我们执行如下命令:
$ docker-compose -f docker-compose-build.yml build ia_mqtt_publisher
  • 启动EII软件栈。
$ ./eii_start.sh

2.2.6 检查运行结果

  • 首先,检查各个模块运行状态,查看容器状态是否健康。
$ docker ps
  • 查看MQTT发布模块,数据是否发送成功。执行如下命令,查看MQTT模块的log:
$ docker logs -f ia_mqtt_publisher

若看到如下log信息在滚动,则说明数据发送成功。

MQTT Topic: test/steel_plate_data
Interval: 1.0
Filename: ./csv_files/steel_plate_test_data.csv
Len: 27Waiting 5s to send data...
1 rows served in 6.005375146865845
2 rows served in 7.00670051574707
3 rows served in 8.00806999206543
4 rows served in 9.009141206741333
5 rows served in 10.010363101959229
6 rows served in 11.011662006378174
7 rows served in 12.012709617614746
8 rows served in 13.0132315158844
9 rows served in 14.014494895935059
  • 查看Kapacitor模块,是否成功从InfluxDB读取数据,并处理成功。查看Kapacitor模块中UDF输出了log。
$ docker exec -it ia_kapacitor tail -f /EII/sockets/kapacitor.log

若看到如下log信息在滚动,则说明UDF处理成功。

2022-09-24 01:50:58,202 INFO:root: processing, start time: 1663984258.2023537
2022-09-24 01:50:58,214 INFO:root: predition: Other_Faults, infer time: 0.012151002883911133 ms
2022-09-24 01:50:59,203 INFO:root: processing, start time: 1663984259.2034917
2022-09-24 01:50:59,215 INFO:root: predition: Bumps, infer time: 0.011562347412109375 ms
2022-09-24 01:51:00,223 INFO:root: processing, start time: 1663984260.2233331
2022-09-24 01:51:00,241 INFO:root: predition: Other_Faults, infer time: 0.018273353576660156 ms
2022-09-24 01:51:01,206 INFO:root: processing, start time: 1663984261.206068
2022-09-24 01:51:01,218 INFO:root: predition: Other_Faults, infer time: 0.012890100479125977 ms
2022-09-24 01:51:02,207 INFO:root: processing, start time: 1663984262.2077074
2022-09-24 01:51:02,218 INFO:root: predition: K_Scatch, infer time: 0.01125192642211914 ms

2.2.7 可视化展示

最后,我们通过Grafana界面,来进行数据可视化。

  • 打开浏览器,输入网址localhost:3000进入Grafana界面。

  • 笔者制作了一个仪表盘(Dashboard),保存成json文件,大家可以下载下来,直接导入到Grafana中。

Steel Plate Grafana Dashboard下载链接:https://pan.quark.cn/s/2520bed09d1f

  • Grafana界面的操作就又是一个大话题了,笔者这里就不详细介绍导入方法了,大家可以先自行探索一下。笔者会另写文章介绍Grafana的使用。

  • Dashboard导入成功后,大家将会看到如下界面,且数据会不断刷新。
    在这里插入图片描述

  • 关闭EII软件栈的命令。

$ docker-compose down

后记
经过EII系列(1)-(5)篇的内容,基本上把EII时序栈相关的介绍完了,大家实操过程中如果遇到问题,欢迎留言交流。

这篇关于(5)EII Kapacitor模块详解(附机器学习案例实操)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/254256

相关文章

详解Vue如何使用xlsx库导出Excel文件

《详解Vue如何使用xlsx库导出Excel文件》第三方库xlsx提供了强大的功能来处理Excel文件,它可以简化导出Excel文件这个过程,本文将为大家详细介绍一下它的具体使用,需要的小伙伴可以了解... 目录1. 安装依赖2. 创建vue组件3. 解释代码在Vue.js项目中导出Excel文件,使用第三

SQL注入漏洞扫描之sqlmap详解

《SQL注入漏洞扫描之sqlmap详解》SQLMap是一款自动执行SQL注入的审计工具,支持多种SQL注入技术,包括布尔型盲注、时间型盲注、报错型注入、联合查询注入和堆叠查询注入... 目录what支持类型how---less-1为例1.检测网站是否存在sql注入漏洞的注入点2.列举可用数据库3.列举数据库

Linux之软件包管理器yum详解

《Linux之软件包管理器yum详解》文章介绍了现代类Unix操作系统中软件包管理和包存储库的工作原理,以及如何使用包管理器如yum来安装、更新和卸载软件,文章还介绍了如何配置yum源,更新系统软件包... 目录软件包yumyum语法yum常用命令yum源配置文件介绍更新yum源查看已经安装软件的方法总结软

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

python管理工具之conda安装部署及使用详解

《python管理工具之conda安装部署及使用详解》这篇文章详细介绍了如何安装和使用conda来管理Python环境,它涵盖了从安装部署、镜像源配置到具体的conda使用方法,包括创建、激活、安装包... 目录pytpshheraerUhon管理工具:conda部署+使用一、安装部署1、 下载2、 安装3

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

mac中资源库在哪? macOS资源库文件夹详解

《mac中资源库在哪?macOS资源库文件夹详解》经常使用Mac电脑的用户会发现,找不到Mac电脑的资源库,我们怎么打开资源库并使用呢?下面我们就来看看macOS资源库文件夹详解... 在 MACOS 系统中,「资源库」文件夹是用来存放操作系统和 App 设置的核心位置。虽然平时我们很少直接跟它打交道,但了