ThingsBoard MQTT 连接认证过程 源码分析+图例

2024-06-03 03:44

本文主要是介绍ThingsBoard MQTT 连接认证过程 源码分析+图例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

整个连接过程如图所示:

 高清图片链接

1、环境准备

  • thingsboard3.5.1 源码启动。(不懂怎么启动的,大家可以看我的博文ThingsBoard3.5.1源码启动)
  • MQTTX 客户端(用来连接 thingsboard MQTT)
  • 默认配置。queue.type=in-memory,cache.type=caffeine

因为我们的目的,是快速了解 thingsboard 的启动过程,所以所有的配置全部采用默认的方式。默认消息队列采用内存队列ConcurrentHashMap,缓存也采用内存缓存caffeine。

使用 customerA 用户账号密码登录,使用设备A1 AccessToken 连接。

2、源码分析

2.1 连接消息生产

2.1.1 入口

大家知道MQTT是基于TCP协议之上的轻量级通信协议,而TCP协议是面向连接、请求响应的通信协议。所以在 thingsboard 这一侧必然有一个服务器实现,用来等待客户端的连接。这个实现就是MqttTransportService

thingsboard 采用 netty 来实现一个MQTT server。

org.thingsboard.server.transport.mqtt.MqttTransportService@PostConstructpublic void init() throws Exception {log.info("Setting resource leak detector level to {}", leakDetectorLevel);ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));log.info("Starting MQTT transport...");bossGroup = new NioEventLoopGroup(bossGroupThreadCount);workerGroup = new NioEventLoopGroup(workerGroupThreadCount);ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, false)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);serverChannel = b.bind(host, port).sync().channel();if (sslEnabled) {b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, true)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);sslServerChannel = b.bind(sslHost, sslPort).sync().channel();}log.info("Mqtt transport started!");}

其中,关系到 netty server 性能的 bossGroupThreadCount,workerGroupThreadCount

thingsboard 提取出两个参数变量 

NETTY_BOSS_GROUP_THREADS

NETTY_WORKER_GROUP_THREADS

方便用户根据自己的设备台数、部署架构,来优化自己的 netty 性能。

 netty server 的请求处理过程如下图所示,圆圈为具体实现类,方框为方法。

在 MqttTransportHandler#processMqttMsg 方法中,因为我们的消息类型是连接,所以我们会进入 processConnect 方法。

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {if (msg.fixedHeader() == null) {log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());ctx.close();return;}deviceSessionCtx.setChannel(ctx);if (CONNECT.equals(msg.fixedHeader().messageType())) {processConnect(ctx, (MqttConnectMessage) msg);} else if (deviceSessionCtx.isProvisionOnly()) {processProvisionSessionMsg(ctx, msg);} else {enqueueRegularSessionMsg(ctx, msg);}}

 在 MqttTransportHandler#processConnect 方法中,由于采用 AccessToken 的授权方式,所以会进入 processAuthTokenConnect

MqttTransportHandler#processAuthTokenConnect 方法中,获取我们在MQTTX填的用户名、密码,然后委托给 DefaultTransportService#process 处理

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());String userName = msg.payload().userName();String clientId = msg.payload().clientIdentifier();deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {deviceSessionCtx.setProvisionOnly(true);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));} else {X509Certificate cert;if (sslHandler != null && (cert = getX509Certificate()) != null) {processX509CertConnect(ctx, cert, msg);} else {processAuthTokenConnect(ctx, msg);}}}private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {String userName = connectMessage.payload().userName();log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder().setClientId(connectMessage.payload().clientIdentifier());if (userName != null) {request.setUserName(userName);}byte[] passwordBytes = connectMessage.payload().passwordInBytes();if (passwordBytes != null) {String password = new String(passwordBytes, CharsetUtil.UTF_8);request.setPassword(password);}transportService.process(DeviceTransportType.MQTT, request.build(),new TransportServiceCallback<>() {@Overridepublic void onSuccess(ValidateDeviceCredentialsResponse msg) {onValidateDeviceResponse(msg, ctx, connectMessage);}@Overridepublic void onError(Throwable e) {log.trace("[{}] Failed to process credentials: {}", address, userName, e);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));ctx.close();}});}

2.1.2 DefaultTransportService

 一路跟下去

DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法会调用 TbQueueProducer接口 send 方法,往主题 tb_transport.api.requests 发送消息。TbQueueProducer实现类是InMemoryTbQueueProducer

void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);if (messagesStats != null) {messagesStats.incrementTotal();}// 将消息发送给消息队列topic是tb_transport.api.requestsrequestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {@Overridepublic void onSuccess(TbQueueMsgMetadata metadata) {if (messagesStats != null) {messagesStats.incrementSuccessful();}log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);}@Overridepublic void onFailure(Throwable t) {if (messagesStats != null) {messagesStats.incrementFailed();}pendingRequests.remove(requestId);future.setException(t);}});}
1、TbQueueProducer 接口的实现类有很多个,具体发送消息的实现类是哪一个呢?
因为我们使用内存队列方式启动,所以实现类是 InMemoryTbQueueProducer

2、怎么确定发送的主题是 tb_transport.api.requests

主题是通过 requestTemplate获取的

而 requestTemplate又是 DefaultTbQueueRequestTemplate的一个属性,通过 Builder 构建器注入进来的。

对于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同种消息队列的实现方式。我们现在所用的是内存队列,所以进入InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory中,对于DefaultTbQueueRequestTemplate.requestTemplate

的初始化,使用的是TbQueueTransportApiSettings的配置。

requestsTopic 读取的,就是 tb_transport.api.requests 这一主题。

3、更进一步

认真分析初始化过程,得出下面请求主题的初始化图。

2.1.3 InMemoryTbQueueProducer

InMemoryTbQueueProducer#send 调用 DefaultInMemoryStorage#put 方法 

DefaultInMemoryStorage 往自己持有的 ConcurrentHashMap 中存放消息,

key 是主题 tb_transport.api.requests,value 是存放有消息的 LinkedBlockingQueue 内存队列

2.1.4 一个更抽象的发送模型

TbQueueProducer 往队列 queue 发送消息,主题 tb_transport.api.requests,而不管这个消息的实现是内存队列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 从queue中消费消息。至此,生产连接请求消息的过程结束。

2.2 消费消息

2.2.1 InMemoryTbQueueConsumer

我们知道现在消息生产者接口 TbQueueProducer 的实现类是 InMemoryTbQueueProducer,则它必然有一个消息消费者实现接口 TbQueueConsumer,消费者实现类是 InMemoryTbQueueConsumer

 InMemoryTbQueueConsumer 中对于消息的消费只有把消息从 ConcurrentHashMap 拉取出来的逻辑,而没有具体处理的逻辑,则处理的逻辑,是存在于调用这个 poll 方法的地方。

org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer@Overridepublic List<T> poll(long durationInMillis) {if (subscribed) {@SuppressWarnings("unchecked")List<T> messages = partitions.stream().map(tpi -> {try {return storage.get(tpi.getFullTopicName());} catch (InterruptedException e) {if (!stopped) {log.error("Queue was interrupted.", e);}return Collections.emptyList();}}).flatMap(List::stream).map(msg -> (T) msg).collect(Collectors.toList());if (messages.size() > 0) {return messages;}try {Thread.sleep(durationInMillis);} catch (InterruptedException e) {if (!stopped) {log.error("Failed to sleep.", e);}}}return Collections.emptyList();}

poll 方法的调用端,全局是搜不到的。

 我们可以探究一下它的构造方法,看看谁初始化了它,则谁就有可能调用它的 poll 方法。排除掉它自己,有两个类初始化了 InMemoryTbQueueConsumer,分别是 InMemoryMonolithQueueFactory 和 InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory 订阅的主题,是 tb_transport.api.responses 不是我们要找的 tb_transport.api.requests,忽略。

2.2.2 InMemoryMonolithQueueFactory

我们先来看一下 InMemoryMonolithQueueFactoryInMemoryMonolithQueueFactory 里面有一个方法,传入的 TbQueueTransportApiSettings,刚好就是我们请求消息的主题配置类。

org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Overridepublic TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());}org.thingsboard.server.queue.settings.TbQueueTransportApiSettings 
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {// tb_transport.api.requests@Value("${queue.transport_api.requests_topic}")private String requestsTopic;
}

查看对方法 createTransportApiRequestConsumer 的调用,找到一个非具体队列实现的调用类TbCoreTransportApiService

2.2.3 TbCoreTransportApiService

TbCoreTransportApiService 初始化 init 方法,会创建 TbQueueConsumer——也就是具体的实现类 InMemoryTbQueueConsumer 注入到 DefaultTbQueueResponseTemplate.requestTemplate,然后执行 DefaultTbQueueResponseTemplate#init() 方法。

 2.2.4 DefaultTbQueueResponseTemplate

至此,我们找到了 InMemoryTbQueueConsumer#poll 调用的地方。

继续往下,看看对于消息 requests,是怎么消费的。 

 2.2.5 DefaultTransportApiService

 通过 AccessToken 查找到设备的授权 DeviceCredentials (即device_credentials表记录)然后构造 DeviceInfo 返回给设备端。

org.thingsboard.server.service.transport.DefaultTransportApiService// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);if (credentials != null && credentials.getCredentialsType() == credentialsType) {return getDeviceInfo(credentials);} else {return getEmptyTransportApiResponseFuture();}}

 

2.2.6 消费消息流程图

3、总结

这篇关于ThingsBoard MQTT 连接认证过程 源码分析+图例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1025932

相关文章

Spring boot整合dubbo+zookeeper的详细过程

《Springboot整合dubbo+zookeeper的详细过程》本文讲解SpringBoot整合Dubbo与Zookeeper实现API、Provider、Consumer模式,包含依赖配置、... 目录Spring boot整合dubbo+zookeeper1.创建父工程2.父工程引入依赖3.创建ap

C#连接SQL server数据库命令的基本步骤

《C#连接SQLserver数据库命令的基本步骤》文章讲解了连接SQLServer数据库的步骤,包括引入命名空间、构建连接字符串、使用SqlConnection和SqlCommand执行SQL操作,... 目录建议配合使用:如何下载和安装SQL server数据库-CSDN博客1. 引入必要的命名空间2.

Linux下进程的CPU配置与线程绑定过程

《Linux下进程的CPU配置与线程绑定过程》本文介绍Linux系统中基于进程和线程的CPU配置方法,通过taskset命令和pthread库调整亲和力,将进程/线程绑定到特定CPU核心以优化资源分配... 目录1 基于进程的CPU配置1.1 对CPU亲和力的配置1.2 绑定进程到指定CPU核上运行2 基于

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式

《Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式》本文详细介绍如何使用Java通过JDBC连接MySQL数据库,包括下载驱动、配置Eclipse环境、检测数据库连接等关键步骤,... 目录一、下载驱动包二、放jar包三、检测数据库连接JavaJava 如何使用 JDBC 连接 mys

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java进程异常故障定位及排查过程

《Java进程异常故障定位及排查过程》:本文主要介绍Java进程异常故障定位及排查过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、故障发现与初步判断1. 监控系统告警2. 日志初步分析二、核心排查工具与步骤1. 进程状态检查2. CPU 飙升问题3. 内存

SpringBoot整合liteflow的详细过程

《SpringBoot整合liteflow的详细过程》:本文主要介绍SpringBoot整合liteflow的详细过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋...  liteflow 是什么? 能做什么?总之一句话:能帮你规范写代码逻辑 ,编排并解耦业务逻辑,代码