ThingsBoard知识点(Transports和Rule Engine)

2024-03-07 13:04

本文主要是介绍ThingsBoard知识点(Transports和Rule Engine),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ThingsBoard整体架构图

开发环境(本地机器环境)

1、JDK 11

2、Maven 3.8.6

3、PostgreSQL 15.3

下载依赖:

mvn clean install -DskipTests

代码问题:

1、org.thingsboard.server.transport.mqtt.MqttTransportHandler存在提示“ Cannot resolve symbol ‘TransportProtos’”

解决方式:

步骤一:打开idea菜单: File->setting->plugins 搜索 proto*, 安装 

protocol Buffer Linter插件, 并重启

步骤二:

打开idea 的路径,找到 bin 下的 idea.properties 配置文件, 记事本打开,搜索 

idea.max.intellisense.filesize将值修改更大一些, 再重启idea

idea.max.intellisense.filesize=10000

1.访问地址

http://127.0.0.1:8080/login

默认系统用户名和密码

sysadmin@thingsboard.org

sysadmin

租户管理员

tenement@thingsboard.org

tenement

MQTT数据报从Transport至RuleEngine过程

MqttTransportService类

==》通过@PostConstruct注解在项目启动后进入init()方法

==》里面绑定了MqttTransportServerInitializer类即mqtt服务初始化

==》MqttTransportServerInitializer类继承ChannelInitializer类重写了initChannel方法

==》initChannel方法里绑定了MqttTransportHandler

==》进入MqttTransportHandler的channelRead方法,验证消息类型为mqtt时转入processMqttMsg方法

==》processMqttMsg里进行判断:消息类型为连接时转入processConnect,设备session为临时的转入processProvisionSessionMsg 否则转入enqueueRegularSessionMsg方法,这里先探讨转入enqueueRegularSessionMsg

==》转入enqueueRegularSessionMsg后调用processMsgQueue将消息投递到队列

==》跟进去发现里面调用了processRegularSessionMsg方法

==》processRegularSessionMsg里根据消息的类型进行转发,比如:发布,订阅,取消订阅,取消连接等等

==》跟进PUBLISH,转入processPublish方法

==》转入processDevicePublish,进入发现这里根据消息的主题进行转发,这里选择isDeviceTelemetryTopic对应的transportService.process接口实现

==》发现这里对消息封装了一下之后转入sendToRuleEngine,将消息发送到规则链

》继续跟进进入sendToRuleEngine,发现调用ruleEngineMsgProducer.send,即将消息通过生产者发送到队列这里对应多个实现,例如:inMemory,Kafka,RabbitMQ等等,默认发送到inMemory内存》

有生产者那肯定有消费者类

DefaultTbRuleEngineConsumerService类

核心消费者launchMainConsumers方法

==》launchConsumer

==》consumerLoop,发现consumerLoop是个循环,将消息取出来消费,

==》submitMessage方法,然后转入

==》forwardToRuleEngineActor,调用

==》actorContext.tell,这里开始就是Actor模型流转;

==》首先调用appActor.tell通过根appActor调用tell方法转入

==》enqueue方法,里面对消息进行了分类,分为高优先级和正常消息队列还有initActor()方法创建一系列actor,大概流程:AppActor》TenantActor》RuleChainActor》RuleNodeActor,先转入

==》tryProcessQueue方法,然后发现调用

==>processMailbox,发现这里是将之前分类的消息依次取出来然后调用actor.process(msg)方法依次向下流转处理消息

》ContextAwareActor》process==》doProcess==》…

ThingsBoard中Transports模块

Device以MQTT协议接入为例

1. 创建设备

使用租户管理员登录

首页->实体-->设备

输入基本信息,选择MQTT协议,凭证类型MQTT Basic;

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

mosquitto是基于MQTT协议衍生的一个开源的broker。

Mosquitto是一款实现了消息推送协议MQTT 3.1的开源消息代理软件提供轻量级的、支持可订阅/可发布的消息推送模式是设备与设备之间的短消息通信变得简单广泛应用于低功耗传感器、手机app消息推送是场景之一、嵌入式电脑、微型控制器等移动设备。

ThingsBoard规则链(Rule Chain)

一. 规则引擎(Rule Engine)

ThingsBoard解释

规则引擎是一个易于使用的框架,用于构建基于事件的工作流。有3个主要组成部分:

■消息- 任何传入事件。它可以是来自设备的传入数据、设备生命周期事件、REST API 事件、RPC 请求等。

■规则节点- 对传入消息执行的功能。有许多不同的节点类型可以过滤、转换或对传入的消息执行某些操作。

■规则链- 节点通过关系相互连接,因此来自规则节点的出站消息被发送到下一个连接的规则节点。

根规则链

Input:

规则链的总数入口

将消息传到下一个规则节点。

Device Profile:

设备配置文件节点。

根据设备配置文件设置处理设备消息

根据设备配置文件中定义的警报规则创建和清除警报。输出关系类型是“已创建警报”、“已更新警报”、“已更新警报严重性”和“已清除警报”,或者如果没有警报受到影响,则只是“成功”。

Message type switch:

消息类型切换

按消息类型路由传入消息

通过相应的链发送消息类型为“Post attributes”、“Post telemetry”、“RPC Request”等的消息,否则使用其他链。

Save Attributes:

保存属性

保存属性数据

根据可配置的范围参数保存实体属性。需要消息类型为“POST_ATTRIBUTES_REQUEST”的消息。如果更新插入(更新/插入)操作成功完成,规则节点将通过成功链发送传入消息,否则,使用失败链。此外,如果复选框发送属性更新通知设置为真,规则节点会将SHARED_SCOPE和SERVER_SCOPE属性更新的“属性更新”事件放入相应的规则引擎队列。

Save Timeseries:

保存时间序列

保存时间序列数据

根据可配置的 TTL 参数保存时间序列遥测数据。需要消息类型为“POST_TELEMETRY_REQUEST”的消息。以毫秒为单位的时间戳将从 metadata.ts 中获取,否则将应用“现在”消息时间戳。如果“skipLatestPersistence”设置为 true,则允许停止更新最新 ts_kv 表中传入键的值。

启用“useServerTs”参数以使用消息处理的时间戳而不是消息中的时间戳。如果您合并来自多个来源(设备、资产等)的消息,则对各种顺序处理很有用。

在顺序处理的情况下,平台保证消息按照提交到队列的顺序进行处理。然而,由多个设备/服务器发起的消息的时间戳可能在它们被推送到队列之前很长时间是不同步的。如果新记录的时间戳比前一条记录旧,DB 层会进行某些优化以忽略“属性”和“最新值”表的更新。因此,为确保所有消息都得到正确处理,应为顺序消息处理方案启用此参数。

log:

日志

使用 JS 脚本记录传入消息以将消息转换为字符串

将带有已配置 JS 函数的传入消息转换为字符串,并将最终值记录到 Thingsboard 日志文件中。可以通过属性访问消息有效负载msg。例如'temperature = ' + msg.temperature ;。可以通过属性访问消息元数据metadata。例如'name = ' + metadata.customerName;。

Rpc CallRequest:

rpc调用请求

向设备发送 RPC 调用

期望带有“方法”和“参数”的消息。将设备的响应转发到下一个节点。如果 RPC 调用请求是由用户的 REST API 调用发起的,将立即将响应转发给用户。

二. 规则节点(Rule Node)

概念

规则节点是规则引擎的基本组件,它一次处理单个传入消息并生成一个或多个传出消息。规则节点是规则引擎的主要逻辑单元。规则节点可以过滤,丰富,转换传入消息,执行操作或与外部系统通信。

作用:规则节点可以过滤,丰富,转换传入消息,执行操作或与外部系统通信。提供节点自定义能力,实现数据的运算。

规则节点类型

1、过滤节点(用于消息过滤和路由,过滤成功走真链、错误走假链)。

1:script(脚本过滤器节点)使用javascript条件进行消息过滤(消息msg,metadata消息元数据,msgType消息类型);

2:switch(交换节点)将传入消息路由到一个或多个输出链,节点执行已配置的JavaScript函数。

过滤节点:

2、属性集节点:用于更新传入消息的元数据。

1:消息发起方用户属性(customer attributes),将消息发起方属性信息或者遥测数据加入Metadata元数据中。

2:设备属性(device attributes),将消息发起方的设备属性或者遥测数据加入metadata。

属性集节点:

3、变换节点:用户更改创立的消息字段,比如,发起方、类型、有效负载,元数据。

1:脚本转换节点(script),作用:修改消息内容(msg(消息负载),msgType(消息类型),metadata(元数据)),可增加,可改。

2:转换到电子邮件节点(to email),通过使用从消息元数据派生的值填充电子邮件字段,将消息转换为电子邮件消息。设置“ SEND_EMAIL”输出消息类型,以后可以被“ 发送电子邮件节点”接受。可以将所有电子邮件字段配置为使用元数据中的值。

4、动作节点:根据传入的消息执行各种动作。

1:create alarm(创建告警),通过过滤节点中的过滤脚本判断后,对满足条件的消息进行告警的触发;

2:log(创建日志),对于系统中的关键系统进行日志输出;

3:rpc call request(远程RPC调用),监控系统rpc请求,下发控制命令请求;

5、外部节点:提供将消息及数据路由到外部中间件,或者其他第三方云平台中。用于与外部系统进行交互。

1:kafka(kafka消息中间件),MQTT(外部MQTT代理),RabbitMQ,支持将系统中的数据发布到kafka/MQTT代理/RabbitMQ中,供第三方消费者订阅数据。

2:send email (向外部发送邮件)。

3:aws sns:将消息发布到aws sns(亚马逊简单消息通知服务,是一种发布\订阅模式的消息收发服务)。

规则节点关系

规则节点之间存在关联性每个节点都有对应关系类型,用于标识关系的逻辑标签。

当规则节点生成输出消息时,它总是将消息路由到下一个指定的节点并通过关系类型进行关联。

表示成功与否的规则节点关系是Success和Failure。

表示逻辑运算的规则节点可以是True或False。

一些特定的规则节点可能使用完全不同的关系类型例如:“Post Telemetry”、“Attributes Updated”、“Entity Created”等。

三.规则链(Rule Chain)

是什么?规则链是规则节点及其关系的逻辑组。接收来自节点的出站消息将其发送至下一个节点。

用法:租户管理员可以定义一个“ 根”规则链,还可以定义多个其他规则链。根规则链处理所有传入的消息,并将其转发到其他规则链以进行其他处理。其他规则链也可以将消息转发到不同的规则链。

这篇关于ThingsBoard知识点(Transports和Rule Engine)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/783614

相关文章

基本知识点

1、c++的输入加上ios::sync_with_stdio(false);  等价于 c的输入,读取速度会加快(但是在字符串的题里面和容易出现问题) 2、lower_bound()和upper_bound() iterator lower_bound( const key_type &key ): 返回一个迭代器,指向键值>= key的第一个元素。 iterator upper_bou

两个月冲刺软考——访问位与修改位的题型(淘汰哪一页);内聚的类型;关于码制的知识点;地址映射的相关内容

1.访问位与修改位的题型(淘汰哪一页) 访问位:为1时表示在内存期间被访问过,为0时表示未被访问;修改位:为1时表示该页面自从被装入内存后被修改过,为0时表示未修改过。 置换页面时,最先置换访问位和修改位为00的,其次是01(没被访问但被修改过)的,之后是10(被访问了但没被修改过),最后是11。 2.内聚的类型 功能内聚:完成一个单一功能,各个部分协同工作,缺一不可。 顺序内聚:

STL经典案例(四)——实验室预约综合管理系统(项目涉及知识点很全面,内容有点多,耐心看完会有收获的!)

项目干货满满,内容有点过多,看起来可能会有点卡。系统提示读完超过俩小时,建议分多篇发布,我觉得分篇就不完整了,失去了这个项目的灵魂 一、需求分析 高校实验室预约管理系统包括三种不同身份:管理员、实验室教师、学生 管理员:给学生和实验室教师创建账号并分发 实验室教师:审核学生的预约申请 学生:申请使用实验室 高校实验室包括:超景深实验室(可容纳10人)、大数据实验室(可容纳20人)、物联网实验

C++语法知识点合集:11.模板

文章目录 一、非类型模板参数1.非类型模板参数的基本形式2.指针作为非类型模板参数3.引用作为非类型模板参数4.非类型模板参数的限制和陷阱:5.几个问题 二、模板的特化1.概念2.函数模板特化3.类模板特化(1)全特化(2)偏特化(3)类模板特化应用示例 三、模板分离编译1.概念2.模板的分离编译 模版总结 一、非类型模板参数 模板参数分类类型形参与非类型形参 非类型模板

枚举相关知识点

1.是用户定义的数据类型,为一组相关的常量赋予有意义的名字。 2.enum常量本身带有类型信息,即Weekday.SUN类型是Weekday,编译器会自动检查出类型错误,在编译期间可检查错误。 3.enum定义的枚举类有什么特点。         a.定义的enum类型总是继承自java.lang.Enum,且不能被继承,因为enum被编译器编译为final修饰的类。         b.只能定义

【408数据结构】散列 (哈希)知识点集合复习考点题目

苏泽  “弃工从研”的路上很孤独,于是我记下了些许笔记相伴,希望能够帮助到大家    知识点 1. 散列查找 散列查找是一种高效的查找方法,它通过散列函数将关键字映射到数组的一个位置,从而实现快速查找。这种方法的时间复杂度平均为(

【反射知识点详解】

Java中的反射(Reflection)是一个非常强大的机制,它允许程序在运行时检查或修改类的行为。这种能力主要通过java.lang.reflect包中的类和接口来实现。 通过反射,Java程序可以动态地创建对象、调用方法、访问字段,以及获取类的各种信息(如构造器、方法、字段等)。 反射的用途 反射主要用于以下几种情况: 动态创建对象:通过类的Class对象动态地创建其实例。访问类的字段

2024年AMC10美国数学竞赛倒计时两个月:吃透1250道真题和知识点(持续)

根据通知,2024年AMC10美国数学竞赛的报名还有两周,正式比赛还有两个月就要开始了。计划参赛的孩子们要记好时间,认真备考,最后冲刺再提高成绩。 那么如何备考2024年AMC10美国数学竞赛呢?做真题,吃透真题和背后的知识点是备考AMC8、AMC10有效的方法之一。通过做真题,可以帮助孩子找到真实竞赛的感觉,而且更加贴近比赛的内容,可以通过真题查漏补缺,更有针对性的补齐知识的短板。

Python知识点:如何使用Python开发桌面应用(Tkinter、PyQt)

Python 提供了多个库来开发桌面应用程序,其中最常见的两个是 Tkinter 和 PyQt。这两者各有优点,选择取决于你的需求。以下我会介绍如何使用 Tkinter 和 PyQt 开发简单的桌面应用程序。 1. 使用 Tkinter 开发桌面应用 Tkinter 是 Python 的标准库,它非常轻量级且跨平台。它适合开发简单的桌面应用,入门较容易。 安装 Tkinter Tkinte

Python知识点:如何使用Anaconda进行科学计算环境管理

使用 Anaconda 进行科学计算环境管理是一个非常强大且灵活的方式,特别适合处理 Python 和 R 语言的包管理和虚拟环境管理。Anaconda 集成了许多用于科学计算和数据分析的库,并提供了环境隔离的功能,确保不同项目之间不会发生包冲突。以下是使用 Anaconda 进行科学计算环境管理的详细步骤: 1. 安装 Anaconda 首先,你需要在本地机器上安装 Anaconda。你可以