使用 RisingWave、NATS JetStream 和 Superset 进行实时物联网监控

本文主要是介绍使用 RisingWave、NATS JetStream 和 Superset 进行实时物联网监控,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在物联网(IoT)背景下,处理实时数据会遇到一些特定的障碍,如边缘计算资源不足、网络条件限制、扩展性存在问题、设备间有多样性差异。要克服这些挑战,需要高效的边缘计算技术、强大的安全措施、标准化协议、可扩展的管理系统和先进的数据处理能力。

通过综合利用 NATS JetStream、RisingWave 和 Superset,可以构建一个强大的解决方案,用于开发可靠且可扩展的实时物联网应用。

RisingWave 是什么?

RisingWave 是与 PostgreSQL 兼容的流数据库,具有成本效益、可扩展性和真正的云原生架构。它允许用户使用 SQL 从流数据中获取实时见解,易于设置、使用和操作。

NATS JetStream 是什么?

NATS 是一种安全连接技术,设计用于在分布式系统中发现和交换信息。它可以部署在任何环境中,用于微服务、数据流和物联网等不同用例,支持边缘设备,可使用多种语言和客户端进行交互。JetStream 构建在 NATS 之上,支持消息流的持久化。

Superset 是什么?

Apache Superset 是一个现代化数据探索和数据可视化平台。它是一款开源软件,可以取代或增强许多团队的专有商业智能工具。

概述

本文将深入探讨一个物联网场景,重点关注通过物联网传感器监控温度和湿度数据。我们将探讨 NATS JetStream 如何使边缘设备能够轻松将数据流传输到 RisingWave 并进行实时处理。通过窗口操作和聚合,RisingWave 可以高效地对数据进行高级分析。最后,我们将使用 Superset 创建表、图表和集成看板,对处理和分析的数据进行可视化。

实时物联网应用开发解决方案

1. 设置 NATS JetStream

NATS 服务器经过高度优化,其二进制文件不到 20 MB,使其可以轻松在各种机器上运行。无论是在 Raspberry Pi 还是规模宏大的服务器上,也无论是在云端、本地、边缘、裸机、虚拟机还是在容器中,均可轻松运行。

您可以使用 Docker 安装 NATS JetStream,如下所示:

docker pull nats:latest

要在 Docker 上运行 NATS JetStream,可以使用 -js Flag 启动 NATS 服务器。此 Flag 可启用 JetStream 功能,使您能够充分利用其各项功能。

docker run -p 4222:4222 -ti nats:latest -js

该 Docker 命令可启动 NATS JetStream。现在,您可以通过各种语言和客户端发布和订阅信息。

在 4222 端口运行的 NATS JetStream 服务器

2. 向 JetStream 发布数据

在此示例中,我们使用 iot_data 主题将物联网数据发布到 JetStream 的 Stream event_stream 中。下面是正在发布的数据示例:

'{"device_Id":"sensor1","temperature":25,"ts":"2023-01-05 05:50:00+00:00"},
'{"device_Id":"sensor1","temperature":26,"ts":"2023-01-05 05:50:01+00:00"}'
'{"device_Id":"sensor2","humidity":60,"ts":"2023-01-05 05:50:01+00:00"}'
'{"device_Id":"sensor1","temperature":27,"ts":"2023-01-05 05:50:02+00:00"}'
'{"device_Id":"sensor2","humidity":62,"ts":"2023-01-05 05:50:02+00:00"}'

3. 从 RisingWave 摄取 JetStream 的数据

我们可以使用开源 RisingWave 或托管服务(RisingWave Cloud)来摄取和处理流数据。本文将使用 RisingWave Cloud,它能够提供良好的用户体验,简化管理和使用 RisingWave 进行物联网监控的操作。

创建 RisingWave 集群

使用免费计划在 RisingWave Cloud 中创建 RisingWave 集群。有关说明,请参阅 RisingWave Cloud 文档。

RisingWave Cloud:账户注册和登录流程

在 RisingWave 中创建 Source 以摄取数据流

在 RisingWave 中创建 Source,以便从先前设置的 iot_data 主题的 Stream event_stream 中摄取数据。在此示例中,RisingWave 充当 NATS JetStream 的 Stream 和主题的订阅者。

请注意,RisingWave 中带有连接器设置的 Source 会与 Stream 建立连接,但不会持久化流数据。

CREATE SOURCE iot_source(device_Id VARCHAR,temperature VARCHAR,humidity VARCHAR,ts TIMESTAMPTZ
)
WITH (connector='nats',server_url='nats://8.210.9.253:4222',subject='iot_data',stream='event_stream',connect_mode='plain'
)FORMAT PLAIN ENCODE JSON;

4. 在 RisingWave 中进行分析

现在,我们根据名为 iot_source 的 Source 创建一个名为 iot_mv 的物化视图,用于存储传入的数据并进行分析。

CREATE MATERIALIZED VIEW iot_mv AS
SELECT device_Id, temperature,humidity,ts 
FROM iot_source;

可以使用以下 SQL 语句查询结果。

SELECT device_Id, temperature,ts 
from iot_mv
WHERE deviceId ='sensor1'
limit 5;

下面是一个结果示例。

device_id  | temperature |               ts               
----------+-------------+-------------------------------sensor1  |          25 | 2023-01-05 05:50:00+00:00sensor1  |          26 | 2023-01-05 05:50:01+00:00sensor1  |          27 | 2023-01-05 05:50:03+00:00sensor1  |          28 | 2023-01-05 05:50:05+00:00sensor1  |          29 | 2023-01-05 05:50:07+00:00

可以使用以下 SQL 语句查询结果。

SELECT device_Id, humidity,ts 
from iot_mv
WHERE deviceId ='sensor2'
limit 5;
| device_id | humidity |                    ts                    
|----------|----------|------------------------------------------
| sensor2  |    60    | 2023-01-05 05:50:02+00:00 
| sensor2  |    62    | 2023-01-05 05:50:04+00:00 
| sensor2  |    65    | 2023-01-05 05:50:06+00:00 
| sensor2  |    68    | 2023-01-05 05:50:08+00:00 
| sensor2  |    70    | 2023-01-05 05:50:10+00:00

下面的语句可创建一个名为 avg_temperature_mv 的物化视图,用于根据时间戳 ts 计算指定设备 sensor1 在 1 分钟 Tumbling 窗口内的平均温度。结果包括设备 ID、平均温度、窗口开始和窗口结束的列。

CREATE MATERIALIZED VIEW avg_temperature_mv AS
SELECT device_Id, AVG(temperature) AS avg_temperature
window_start, window_end
FROM TUMBLE (iot_mv, ts, INTERVAL '1 MINUTES')
WHERE device_Id ='sensor1'
GROUP BY device_Id,window_start, window_end;

可以使用以下 SQL 语句查询结果。

SELECT * FROM avg_temperature_mv LIMIT 5;

下面是一个结果示例。

| device_id | avg_temperature  |        window_start        |          window_end           
|----------|------------------|----------------------------|--------------------------
| sensor1  |        41        | 2023-01-05T05:56:00Z       | 2023-01-05T05:57:00Z 
| sensor1  |        40        | 2023-01-05T05:50:00Z       | 2023-01-05T05:51:00Z 
| sensor1  |        38        | 2023-01-05T05:55:00Z       | 2023-01-05T05:56:00Z 
| sensor1  |        35        | 2023-01-05T05:54:00Z       | 2023-01-05T05:55:00Z 
| sensor1  |        55        | 2023-01-05T06:01:00Z       | 2023-01-05T06:02:00Z

同样,下面的语句可创建一个名为 avg_humidity_mv 的物化视图,用于根据时间戳 ts 计算指定设备 sensor2 在 1 分钟 Tumbling 窗口内的平均湿度。结果包括设备 ID、平均湿度、窗口开始和窗口结束的列。

CREATE MATERIALIZED VIEW avg_humidity_mv AS
SELECT device_Id, AVG(humidity) AS avg_humidity
window_start, window_end
FROM TUMBLE (iot_mv, ts, INTERVAL '1 MINUTES')
WHERE device_Id ='sensor2'
GROUP BY device_Id,window_start, window_end;

可以使用以下 SQL 语句查询结果。

SELECT * FROM avg_humidity_mv LIMIT 5;

下面是一个结果示例。

| device_Id | avg_humidity |        window_start         |          window_end           
|----------|--------------|-----------------------------|-------------------------------
| sensor2  |   112.33     | 2023-01-05T05:58:00Z | 2023-01-05T05:59:00Z |
| sensor2  |      75      | 2023-01-05T05:53:00Z | 2023-01-05T05:54:00Z |
| sensor2  |      90      | 2023-01-05T05:55:00Z | 2023-01-05T05:56:00Z |
| sensor2  |      95      | 2023-01-05T05:50:00Z | 2023-01-05T05:51:00Z |
| sensor2  |     105      | 2023-01-05T05:57:00Z | 2023-01-05T05:58:00Z |

5. 在 Apache Superset 中可视化数据

我们将配置 Superset,以便从 RisingWave 读取数据并进行可视化。

将 RisingWave 连接到 Superset

可以在 Apache Superset 中将 RisingWave 作为数据源,使用 RisingWave 中的表和物化视图进行可视化和创建看板。要了解该过程,请按照 配置 Superset 从 RisingWave 读取数据 一文中的说明进行操作。

成功将 RisingWave 连接到 Apache Superset 后,我们可将 RisingWave 中的物化视图添加为数据集,以创建表、各种图表和综合看板。

使用 Apache Superset 可视化数据:表、图表和看板

此表由 iot_mv 数据集生成,显示温度传感器 ID、温度读数以及每个读数相应的时间戳等信息。

温度传感器表: 温度传感器 ID、温度读数和时间戳

此表也由 iot_mv 数据集生成,显示湿度传感器 ID、湿度读数以及每个读数相应的时间戳等详细信息。它全面展示了在 iot_mv 物化视图中捕获和存储的湿度数据。

湿度传感器表: 湿度传感器 ID、湿度读数和时间戳

此条形图由 avg_temperature_mv 数据集生成,显示了温度传感器在预定义的 1 分钟时间窗口内获取的平均温度。

平均温度传感器图表:显示温度传感器在 1 分钟窗口内获取的平均温度值

此折线图由 avg_humidity_mv 数据集生成,显示了湿度传感器在指定的 1 分钟时间窗口内获取的平均湿度。

平均湿度传感器图表:显示湿度传感器在 1 分钟窗口内获取的平均湿度值

此综合看板呈现了一系列图表,有助于全面实时监控物联网设备。通过对每个相应时间戳的温度和湿度传感器读数进行深入分析,获取有价值的见解,使用户能够做出明智的决策,并实现对工业物联网设备的有效监控。

物联网设备实时监控看板:基于温度和湿度传感器

总结

本文逐步介绍了如何利用 NATS JetStream、RisingWave 和 Superset 构建实时物联网监控解决方案。以上三个系统的设置过程简单省力,资源效率高且具有强大的可扩展性,是实时物联网应用的理想组合。通过三者的无缝集成,不到一小时即可创建一个实时物联网看板。简而言之,这展示了物联网设备背景下 NATS JetStream、RisingWave 和 Apache Superset 在工业流程中的无缝集成,并通过可视化和看板实现了实时分析和监控。

这篇关于使用 RisingWave、NATS JetStream 和 Superset 进行实时物联网监控的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/876050

相关文章

Python调用Orator ORM进行数据库操作

《Python调用OratorORM进行数据库操作》OratorORM是一个功能丰富且灵活的PythonORM库,旨在简化数据库操作,它支持多种数据库并提供了简洁且直观的API,下面我们就... 目录Orator ORM 主要特点安装使用示例总结Orator ORM 是一个功能丰富且灵活的 python O

Nginx设置连接超时并进行测试的方法步骤

《Nginx设置连接超时并进行测试的方法步骤》在高并发场景下,如果客户端与服务器的连接长时间未响应,会占用大量的系统资源,影响其他正常请求的处理效率,为了解决这个问题,可以通过设置Nginx的连接... 目录设置连接超时目的操作步骤测试连接超时测试方法:总结:设置连接超时目的设置客户端与服务器之间的连接

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Python使用国内镜像加速pip安装的方法讲解

《Python使用国内镜像加速pip安装的方法讲解》在Python开发中,pip是一个非常重要的工具,用于安装和管理Python的第三方库,然而,在国内使用pip安装依赖时,往往会因为网络问题而导致速... 目录一、pip 工具简介1. 什么是 pip?2. 什么是 -i 参数?二、国内镜像源的选择三、如何

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

Linux使用nload监控网络流量的方法

《Linux使用nload监控网络流量的方法》Linux中的nload命令是一个用于实时监控网络流量的工具,它提供了传入和传出流量的可视化表示,帮助用户一目了然地了解网络活动,本文给大家介绍了Linu... 目录简介安装示例用法基础用法指定网络接口限制显示特定流量类型指定刷新率设置流量速率的显示单位监控多个

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

C++ Primer 多维数组的使用

《C++Primer多维数组的使用》本文主要介绍了多维数组在C++语言中的定义、初始化、下标引用以及使用范围for语句处理多维数组的方法,具有一定的参考价值,感兴趣的可以了解一下... 目录多维数组多维数组的初始化多维数组的下标引用使用范围for语句处理多维数组指针和多维数组多维数组严格来说,C++语言没