EMQ X+TDengine 搭建 MQTT 物联网可视化平台

2024-03-01 12:10

本文主要是介绍EMQ X+TDengine 搭建 MQTT 物联网可视化平台,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

物联网数据采集涉及到大量设备接入、海量的时序数据传输,EMQ X MQTT 服务器 与 TDengine 大数据平台的组合技术栈完全能够胜任场景中的海量时间序列监测数据的传输、存储和计算。

数据入库后,往往需要其他方式如数据可视化系统将数据按照规则统计、展现出来,实现数据的监控、指标统计等业务需求,以便充分发挥数据的价值,TDengine 搭配开源软件 Grafana 可以快速搭建物联网数据可视化平台。

上述整套方案无需代码开发,涉及的产品均能提供开源软件、企业服务、云端 SaaS 服务不同层次的交付模式,能够根据项目需求实现免费版或企业版私有化落地以及云端部署。

在这里插入图片描述

方案介绍

EMQ X 简介

EMQ X 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 消息服务器。EMQ X 内置了大量开箱即用的功能,其 开源版 EMQ X Broker企业版 EMQ X Enterprise 均支持通过规则引擎将设备消息存储到 TDengine。

TDengine 是什么

TDengine 是涛思数据专为物联网、车联网、工业互联网、IT 运维等设计和优化的大数据平台。除核心的快 10 倍以上的时序数据库功能外,还提供缓存、数据订阅、流式计算等功能,最大程度减少研发和运维的复杂度,且核心代码,包括集群功能全部开源。

TDengine 提供社区版、企业版和云服务版,安装/使用教程详见 TDengine 使用文档。

Grafana 简介

Grafana 是一个跨平台、开源的度量分析和可视化工具,可以查询处理各类数据源中的数据,进行可视化的展示。它可以快速灵活创建的客户端图表,面板插件有许多不同方式的可视化指标和日志,官方库中具有丰富的仪表盘插件,比如热图、折线图、图表等多种展示方式;支持 Graphite,TDengine、InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch和 KairosDB 等数据源,支持数据项独立/混合查询展示;可以创建自定义告警规则并通知到其他消息处理服务或组件中。

业务场景

本文模拟物联网环境数据采集场景,假设现有一定数据的环境数据采集点,所有采集点数据均通过 MQTT 协议 传输至采集平台(MQTT Publish),主题设计如下:

sensor/data

传感器发送的数据格式为 JSON,数据包括传感器采集的温度、湿度、噪声音量、PM10、PM2.5、二氧化硫、二氧化氮、一氧化碳、传感器 ID、区域、采集时间等数据。

{"temperature": 30,"humidity" : 20,"volume": 44.5,"PM10": 23,"pm25": 61,"SO2": 14,"NO2": 4,"CO": 5,"id": "10-c6-1f-1a-1f-47","area": 1,"ts": 1596157444170
}

现在需要实时存储以便在后续任意时间查看数据,提出以下的需求:

  • 每个设备按照每 5 秒钟一次的频率进行数据上报,数据库需存储每条数据以供后续回溯分析;
  • 通过可视化系统查看 任意区域、任意时间区间内 的指标数据,如平均值、最大值、最小值。

环境准备

本文所用各个组件均有 Docker 镜像,除 EMQ X 需要修改少数配置为了便于操作使用下载安装外,TDengine 与 Grafana 均使用 Docker 搭建。

安装包资源与使用教程参照各自官网:

  • EMQ X:EMQ 官网 https://www.emqx.io/cn/
  • TDengine:涛思数据官网 https://www.taosdata.com/cn/
  • Grafana:Grafana 官网 https://grafana.com/

安装 EMQ X

如果您是 EMQ X 新手用户,推荐通过 EMQ X 文档 快速上手

访问 EMQ X 下载 页面下载适合您操作系统的安装包,本文截稿时 EMQ X 开源版最新版本为 v4.1.1,下载 zip 包的启动步骤如下 :

## 解压下载好的安装包
unzip emqx-macosx-v4.1.1.zip
cd emqx## 以 console 模式启动 EMQ X 方便调试
./bin/emqx console

启动成功后浏览器访问 http://127.0.0.1:18083 访问 EMQ X 管理控制台 Dashboard,使用 admin public 默认用户名密码完成初次登录。

EMQ X 企业版 4.1.2 提供了原生 TDengine 写入插件,性能更好、使用更方便,请移步规则引擎-写入数据到 TDengine查看

安装 TDengine

为了方便测试使用通过 Docker 进行安装(需映射网络端口),也可以使用安装包的方式进行安装:

## 拉取并启动容器
docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest## 启动后检查容器运行状态
docker ps -a

安装 Grafana

使用以下命令通过 Docker 安装并启动 Grafana:

docker run -d --name=grafana -p 3000:3000 grafana/grafana

启动成功后浏览器访问 http://127.0.0.1:3000 访问 Grafana 可视化面板,使用 admin admin 默认用户名密码完成初次登录,登录后按照提示修改密码使用新密码登录进入主界面:

配置 EMQ X 存储数据到 TDengine

TDengine 创建数据库与数据表

进入TDengine Docker 容器:

docker exec -it tdengine bash

创建 test 数据库:

taos
create database test;

创建 sensor_data 表,关于 TDengine 数据结构以及 SQL 命令参见 TAOS SQL:

use test;
CREATE TABLE sensor_data (ts timestamp,temperature float,humidity float,volume float,PM10 float,pm25 float,SO2 float,NO2 float,CO float,sensor_id NCHAR(255), area TINYINT,coll_time timestamp
);

配置 EMQ X 规则引擎

打开 EMQ X Dashboared,进入 规则引擎 -> 规则 页面,点击 创建 按钮进入创建页面。

规则 SQL

规则 SQL 用于 EMQ X 消息以及事件筛选,以下 SQL 表示从 sensor/data 主题筛选出 payload 数据:

SELECTpayload
FROM"sensor/data"

使用 SQL 测试功能 ,输入测试数据进行筛选结果测试,测试有结果且输出内容如下,标明 SQL 编写正确:

{"payload": "{\"temperature\":30,\"humidity\":20,\"volume\":44.5,\"PM10\":23,\"pm2.5\":61,\"SO2\":14,\"NO2\":4,\"CO\":5,\"id\":\"10-c6-1f-1a-1f-47\",\"area\":1,\"ts\":1596157444170}"
}

image20200731104046137.png

响应动作

为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API。通过 RESTful Connector 提供了最简单的连接方式,即使用 HTTP 请求携带认证信息与要执行的 SQL 操作 TDengine。

使用 EMQ X 开源版中的 发送到 Web 服务 即可通过 RESTful Connector 写入数据到 TDengine。即将到来的 EMQ X 企业版 4.1.1 版本将提供原生更高性能的写入 Connector。

发送到 Web 服务需要两个数据,一个是关联资源,另一个是消息内容模板。

  • 关联资源:HTTP 服务器配置信息,此处为 TDengine 的 RESTful Connector
  • 消息内容模板:此处为携带数据的 INSERT SQL,注意我们应当在 SQL 中指定数据库名,字符类型也要用单引号括起来, 消息内容模板为:
INSERT INTO test.sensor_data VALUES(now,${payload.temperature},${payload.humidity},${payload.volume},${payload.PM10},${payload.pm25},${payload.SO2},${payload.NO2},${payload.CO},'${payload.id}',${payload.area},${payload.ts}
)

image20200731145609393.png

创建过程

点击响应动作下的 添加 按钮,在弹出框内选择 发送数据到 Web 服务,点击 新建资源 新建一个 WebHook 资源。

image20200731104403456.png

资源类型选择 Webhook,请求 URL 填写 http://127.0.0.1:6041/rest/sql,请求方法选择 POST, 还需添加 Authorization 请求头作为认证信息

Authorization 的值为 Basic + TDengine 的 {username}:{password} 经过 Base64 编码之后的字符串, 例如 root:taosdata 编码后实际填入的值为:Basic cm9vdDp0YW9zZGF0YQ==

在响应动作创建页面选择新建的资源,并填入消息模板内容即可。

image20200804110459517.png

生成模拟数据

以下脚本模拟了 10000 个设备在过去 24 小时内、每隔 5 秒钟上报一条模拟数据并发送到 EMQ X 的场景。

  • 总数据量: 24 * 3600 / 5 * 100 = 172 万条
  • 消息 TPS: 20

读者安装 Node.js ,按需修改配置参数后可以通过以下命令启动:

npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org
node mock.js

附:模拟生成数据并发送到 EMQ X 代码,请根据集群性能调整相关参数

// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 100
const STEP = 5000 // 模拟采集时间间隔 ms
const AWAIT = 5000 // 每次发送完后休眠时间,防止消息速率过快 ms
const CLIENT_POOL = []startMock()function sleep(timer = 100) {return new Promise(resolve => {setTimeout(resolve, timer)})
}async function startMock() {const now = Date.now()for (let i = 0; i < CLIENT_NUM; i++) {const client = await createClient(`mock_client_${i}`)CLIENT_POOL.push(client)}// last 24h every 5sconst last = 24 * 3600 * 1000for (let ts = now - last; ts <= now; ts += STEP) {for (const client of CLIENT_POOL) {const mockData = generateMockData()const data = {...mockData,id: client.options.clientId,ts,}client.publish('sensor/data', JSON.stringify(data))}const dateStr = new Date(ts).toLocaleTimeString()console.log(`${dateStr} send success.`)await sleep(AWAIT)}console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}/*** Init a virtual mqtt client* @param {string} clientId ClientID*/
function createClient(clientId) {return new Promise((resolve, reject) => {const client = mqtt.connect(EMQX_SERVER, {clientId,})client.on('connect', () => {console.log(`client ${clientId} connected`)resolve(client)})client.on('reconnect', () => {console.log('reconnect')})client.on('error', (e) => {console.error(e)reject(e)})})
}/**
* Generate mock data
*/
function generateMockData() {return {"temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),"humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),"volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),"PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),"pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),"SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),"NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),"CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),"area": Mock.Random.integer(0, 20),"ts": 1596157444170,}
}

可视化配置

组件安装完成,模拟数据写入成功后,按照 Grafana 可视化界面的操作指引,完成业务所需数据可视化配置。

添加数据源(Add data source)

Grafana 的 TDengine 数据源需要手动安装插件,具体安装范式以 TDengine 文档为准。

添加数据源,即显示的数据源信息。选取 TDengine 类型数据源,输入连接参数进行配置,默认情况下,关键配置信息如下:

image20200804110612868.png

添加仪表盘(New Dashboard)

在 EMQ X Sample 仓库获取Grafana 仪表盘导出文件导入即可查看图表示例。

添加好数据源后,添加需要显示的数据仪表盘信息。仪表盘为多个可视化面板的集合,点击 New Dashboard 后,选择 + Query 通过查询来添加数据面板。

创建面板需要四个步骤,分别是 Queries(查询)Visualization(可视化)General(图表配置)Alert(告警) ,创建时间

平均值面板

使用 Grafana 的可视化查询构建工具,查询出所有设备的平均值。

以下 SQL 按照指定时间段($form $to)、指定时间间隔($interval),查询出数据中关键指标的平均值:

select avg(temperature), avg(humidity), avg(volume), avg(PM10), avg(pm25), avg(SO2), avg(NO2), avg(CO)  from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)

Visualization 默认不做更改, General 里面修改面板名称为 历史平均值,如果需要对业务进行监控告警,可以在 Alert 里编排告警规则,此处仅做可视化展示,不使用此功能。

image20200803091833280.png

完成创建后,点击左上角返回按钮,该 Dashboard 里成功添加一个数据面板。点击顶部导航栏 保存 图标,输入 Dashboard 名称完成 Dashboard 的创建。

最大值、最小值面板

继续点击 Dashboard 的 Add panel 按钮,添加最大值、最小值图表。操作步骤同添加平均值,仅对查询中 SELECT 统计方法字段做出调整,调整为 AVG 函数为 MAXMIN

select max(temperature), max(humidity), max(volume), max(PM10), max(pm25), max(SO2), max(NO2), max(CO), min(temperature), min(humidity), min(volume), min(PM10), min(pm25), min(SO2), min(NO2), min(CO)  from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)

image20200803093314019.png

仪表盘效果

保存仪表盘,拖拽调整每个数据面板大小、位置,最终得到一个视觉效果较好的数据仪表盘。仪表盘右上角可以选择时间区间、自动刷新时间,此时设备持续发送数据采集数据,仪表盘数据值会有所变动,实现了比较好的可视化效果。

FAQ

Q: 为什么 Grafana 中没有图标数据?

  • 请拖动时间范围,检查、确保所选时段内有数据

Q: EMQ X 开源版和 EMQ X 企业版写入 TDengine 功能上有什么区别?

  • 开源版使用 Webhook + TDengine RESTful Connector,两边都有一定的性能损耗,最大写入速度约为 700 条/秒
  • 企业版使用 EMQ X 原生插件,能够做到 20,000 条/秒写入

Q: 规则执行了,但是写入不了数据?

  • 请检查认证信息是否配置正确,请求头、连接地址、端口等信息是否匹配 TDengin 版本

总结

至此我们借助 EMQ X + TDengine 完成了物联网数据传输、存储、展现整个流程的系统搭建,读者可以了解到 EMQ X 丰富的拓展能力与 TDengine 完备的大数据平台特性在物联网数据采集中的应用。深入学习掌握 Grafana 的其他功能后,用户可以定制出更完善的数据可视化乃至监控告警系统。

image20200803093438116.png

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.io/cn/blog/emqx-tdengine-grafana

这篇关于EMQ X+TDengine 搭建 MQTT 物联网可视化平台的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/762404

相关文章

流媒体平台/视频监控/安防视频汇聚EasyCVR播放暂停后视频画面黑屏是什么原因?

视频智能分析/视频监控/安防监控综合管理系统EasyCVR视频汇聚融合平台,是TSINGSEE青犀视频垂直深耕音视频流媒体技术、AI智能技术领域的杰出成果。该平台以其强大的视频处理、汇聚与融合能力,在构建全栈视频监控系统中展现出了独特的优势。视频监控管理系统EasyCVR平台内置了强大的视频解码、转码、压缩等技术,能够处理多种视频流格式,并以多种格式(RTMP、RTSP、HTTP-FLV、WebS

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

如何解决线上平台抽佣高 线下门店客流少的痛点!

目前,许多传统零售店铺正遭遇客源下降的难题。尽管广告推广能带来一定的客流,但其费用昂贵。鉴于此,众多零售商纷纷选择加入像美团、饿了么和抖音这样的大型在线平台,但这些平台的高佣金率导致了利润的大幅缩水。在这样的市场环境下,商家之间的合作网络逐渐成为一种有效的解决方案,通过资源和客户基础的共享,实现共同的利益增长。 以最近在上海兴起的一个跨行业合作平台为例,该平台融合了环保消费积分系统,在短

Android平台播放RTSP流的几种方案探究(VLC VS ExoPlayer VS SmartPlayer)

技术背景 好多开发者需要遴选Android平台RTSP直播播放器的时候,不知道如何选的好,本文针对常用的方案,做个大概的说明: 1. 使用VLC for Android VLC Media Player(VLC多媒体播放器),最初命名为VideoLAN客户端,是VideoLAN品牌产品,是VideoLAN计划的多媒体播放器。它支持众多音频与视频解码器及文件格式,并支持DVD影音光盘,VCD影

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

K8S(Kubernetes)开源的容器编排平台安装步骤详解

K8S(Kubernetes)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。以下是K8S容器编排平台的安装步骤、使用方式及特点的概述: 安装步骤: 安装Docker:K8S需要基于Docker来运行容器化应用程序。首先要在所有节点上安装Docker引擎。 安装Kubernetes Master:在集群中选择一台主机作为Master节点,安装K8S的控制平面组件,如AP

什么是 TDengine?

TDengine 是一款专为物联网、工业互联网等场景设计并优化的大数据平台,其核心模块是高性能、集群开源、云原生、极简的时序数据库。它能安全高效地将大量设备、数据采集器每天产生的高达 TB 甚至 PB 级的数据进行汇聚、存储、分析和分发,对业务运行状态进行实时监测、预警,提供实时的商业洞察。 1.高性能         充分利用时序大数据的特点,TDengine 设计了新颖的存