本文主要是介绍ThingsBoard知识点(Transports和Rule Engine),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
ThingsBoard整体架构图
开发环境(本地机器环境)
1、JDK 11
2、Maven 3.8.6
3、PostgreSQL 15.3
下载依赖:
mvn clean install -DskipTests
代码问题:
1、org.thingsboard.server.transport.mqtt.MqttTransportHandler存在提示“ Cannot resolve symbol ‘TransportProtos’”
解决方式:
步骤一:打开idea菜单: File->setting->plugins 搜索 proto*, 安装
protocol Buffer Linter插件, 并重启
步骤二:
打开idea 的路径,找到 bin 下的 idea.properties 配置文件, 记事本打开,搜索
idea.max.intellisense.filesize将值修改更大一些, 再重启idea
idea.max.intellisense.filesize=10000
1.访问地址
http://127.0.0.1:8080/login
默认系统用户名和密码
sysadmin@thingsboard.org
sysadmin
租户管理员
tenement@thingsboard.org
tenement
MQTT数据报从Transport至RuleEngine过程
MqttTransportService类
==》通过@PostConstruct注解在项目启动后进入init()方法
==》里面绑定了MqttTransportServerInitializer类即mqtt服务初始化
==》MqttTransportServerInitializer类继承ChannelInitializer类重写了initChannel方法
==》initChannel方法里绑定了MqttTransportHandler
==》进入MqttTransportHandler的channelRead方法,验证消息类型为mqtt时转入processMqttMsg方法
==》processMqttMsg里进行判断:消息类型为连接时转入processConnect,设备session为临时的转入processProvisionSessionMsg 否则转入enqueueRegularSessionMsg方法,这里先探讨转入enqueueRegularSessionMsg
==》转入enqueueRegularSessionMsg后调用processMsgQueue将消息投递到队列
==》跟进去发现里面调用了processRegularSessionMsg方法
==》processRegularSessionMsg里根据消息的类型进行转发,比如:发布,订阅,取消订阅,取消连接等等
==》跟进PUBLISH,转入processPublish方法
==》转入processDevicePublish,进入发现这里根据消息的主题进行转发,这里选择isDeviceTelemetryTopic对应的transportService.process接口实现
==》发现这里对消息封装了一下之后转入sendToRuleEngine,将消息发送到规则链
》继续跟进进入sendToRuleEngine,发现调用ruleEngineMsgProducer.send,即将消息通过生产者发送到队列这里对应多个实现,例如:inMemory,Kafka,RabbitMQ等等,默认发送到inMemory内存》
有生产者那肯定有消费者类
DefaultTbRuleEngineConsumerService类
核心消费者launchMainConsumers方法
==》launchConsumer
==》consumerLoop,发现consumerLoop是个循环,将消息取出来消费,
==》submitMessage方法,然后转入
==》forwardToRuleEngineActor,调用
==》actorContext.tell,这里开始就是Actor模型流转;
==》首先调用appActor.tell通过根appActor调用tell方法转入
==》enqueue方法,里面对消息进行了分类,分为高优先级和正常消息队列还有initActor()方法创建一系列actor,大概流程:AppActor》TenantActor》RuleChainActor》RuleNodeActor,先转入
==》tryProcessQueue方法,然后发现调用
==>processMailbox,发现这里是将之前分类的消息依次取出来然后调用actor.process(msg)方法依次向下流转处理消息
》ContextAwareActor》process==》doProcess==》…
ThingsBoard中Transports模块
Device以MQTT协议接入为例
1. 创建设备
使用租户管理员登录
首页->实体-->设备
输入基本信息,选择MQTT协议,凭证类型MQTT Basic;
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
mosquitto是基于MQTT协议衍生的一个开源的broker。
Mosquitto是一款实现了消息推送协议MQTT 3.1的开源消息代理软件提供轻量级的、支持可订阅/可发布的消息推送模式是设备与设备之间的短消息通信变得简单广泛应用于低功耗传感器、手机app消息推送是场景之一、嵌入式电脑、微型控制器等移动设备。
ThingsBoard规则链(Rule Chain)
一. 规则引擎(Rule Engine)
ThingsBoard解释
规则引擎是一个易于使用的框架,用于构建基于事件的工作流。有3个主要组成部分:
■消息- 任何传入事件。它可以是来自设备的传入数据、设备生命周期事件、REST API 事件、RPC 请求等。
■规则节点- 对传入消息执行的功能。有许多不同的节点类型可以过滤、转换或对传入的消息执行某些操作。
■规则链- 节点通过关系相互连接,因此来自规则节点的出站消息被发送到下一个连接的规则节点。
根规则链
Input:
规则链的总数入口
将消息传到下一个规则节点。
Device Profile:
设备配置文件节点。
根据设备配置文件设置处理设备消息
根据设备配置文件中定义的警报规则创建和清除警报。输出关系类型是“已创建警报”、“已更新警报”、“已更新警报严重性”和“已清除警报”,或者如果没有警报受到影响,则只是“成功”。
Message type switch:
消息类型切换
按消息类型路由传入消息
通过相应的链发送消息类型为“Post attributes”、“Post telemetry”、“RPC Request”等的消息,否则使用其他链。
Save Attributes:
保存属性
保存属性数据
根据可配置的范围参数保存实体属性。需要消息类型为“POST_ATTRIBUTES_REQUEST”的消息。如果更新插入(更新/插入)操作成功完成,规则节点将通过成功链发送传入消息,否则,使用失败链。此外,如果复选框发送属性更新通知设置为真,规则节点会将SHARED_SCOPE和SERVER_SCOPE属性更新的“属性更新”事件放入相应的规则引擎队列。
Save Timeseries:
保存时间序列
保存时间序列数据
根据可配置的 TTL 参数保存时间序列遥测数据。需要消息类型为“POST_TELEMETRY_REQUEST”的消息。以毫秒为单位的时间戳将从 metadata.ts 中获取,否则将应用“现在”消息时间戳。如果“skipLatestPersistence”设置为 true,则允许停止更新最新 ts_kv 表中传入键的值。
启用“useServerTs”参数以使用消息处理的时间戳而不是消息中的时间戳。如果您合并来自多个来源(设备、资产等)的消息,则对各种顺序处理很有用。
在顺序处理的情况下,平台保证消息按照提交到队列的顺序进行处理。然而,由多个设备/服务器发起的消息的时间戳可能在它们被推送到队列之前很长时间是不同步的。如果新记录的时间戳比前一条记录旧,DB 层会进行某些优化以忽略“属性”和“最新值”表的更新。因此,为确保所有消息都得到正确处理,应为顺序消息处理方案启用此参数。
log:
日志
使用 JS 脚本记录传入消息以将消息转换为字符串
将带有已配置 JS 函数的传入消息转换为字符串,并将最终值记录到 Thingsboard 日志文件中。可以通过属性访问消息有效负载msg。例如'temperature = ' + msg.temperature ;。可以通过属性访问消息元数据metadata。例如'name = ' + metadata.customerName;。
Rpc CallRequest:
rpc调用请求
向设备发送 RPC 调用
期望带有“方法”和“参数”的消息。将设备的响应转发到下一个节点。如果 RPC 调用请求是由用户的 REST API 调用发起的,将立即将响应转发给用户。
二. 规则节点(Rule Node)
概念
规则节点是规则引擎的基本组件,它一次处理单个传入消息并生成一个或多个传出消息。规则节点是规则引擎的主要逻辑单元。规则节点可以过滤,丰富,转换传入消息,执行操作或与外部系统通信。
作用:规则节点可以过滤,丰富,转换传入消息,执行操作或与外部系统通信。提供节点自定义能力,实现数据的运算。
规则节点类型
1、过滤节点(用于消息过滤和路由,过滤成功走真链、错误走假链)。
1:script(脚本过滤器节点)使用javascript条件进行消息过滤(消息msg,metadata消息元数据,msgType消息类型);
2:switch(交换节点)将传入消息路由到一个或多个输出链,节点执行已配置的JavaScript函数。
过滤节点:
2、属性集节点:用于更新传入消息的元数据。
1:消息发起方用户属性(customer attributes),将消息发起方属性信息或者遥测数据加入Metadata元数据中。
2:设备属性(device attributes),将消息发起方的设备属性或者遥测数据加入metadata。
属性集节点:
3、变换节点:用户更改创立的消息字段,比如,发起方、类型、有效负载,元数据。
1:脚本转换节点(script),作用:修改消息内容(msg(消息负载),msgType(消息类型),metadata(元数据)),可增加,可改。
2:转换到电子邮件节点(to email),通过使用从消息元数据派生的值填充电子邮件字段,将消息转换为电子邮件消息。设置“ SEND_EMAIL”输出消息类型,以后可以被“ 发送电子邮件节点”接受。可以将所有电子邮件字段配置为使用元数据中的值。
4、动作节点:根据传入的消息执行各种动作。
1:create alarm(创建告警),通过过滤节点中的过滤脚本判断后,对满足条件的消息进行告警的触发;
2:log(创建日志),对于系统中的关键系统进行日志输出;
3:rpc call request(远程RPC调用),监控系统rpc请求,下发控制命令请求;
5、外部节点:提供将消息及数据路由到外部中间件,或者其他第三方云平台中。用于与外部系统进行交互。
1:kafka(kafka消息中间件),MQTT(外部MQTT代理),RabbitMQ,支持将系统中的数据发布到kafka/MQTT代理/RabbitMQ中,供第三方消费者订阅数据。
2:send email (向外部发送邮件)。
3:aws sns:将消息发布到aws sns(亚马逊简单消息通知服务,是一种发布\订阅模式的消息收发服务)。
规则节点关系
规则节点之间存在关联性每个节点都有对应关系类型,用于标识关系的逻辑标签。
当规则节点生成输出消息时,它总是将消息路由到下一个指定的节点并通过关系类型进行关联。
表示成功与否的规则节点关系是Success和Failure。
表示逻辑运算的规则节点可以是True或False。
一些特定的规则节点可能使用完全不同的关系类型例如:“Post Telemetry”、“Attributes Updated”、“Entity Created”等。
三.规则链(Rule Chain)
是什么?规则链是规则节点及其关系的逻辑组。接收来自节点的出站消息将其发送至下一个节点。
用法:租户管理员可以定义一个“ 根”规则链,还可以定义多个其他规则链。根规则链处理所有传入的消息,并将其转发到其他规则链以进行其他处理。其他规则链也可以将消息转发到不同的规则链。
这篇关于ThingsBoard知识点(Transports和Rule Engine)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!