Vert.x框架实现MQTT服务,代码实现及其详解

2024-02-07 08:12

本文主要是介绍Vert.x框架实现MQTT服务,代码实现及其详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简单的总结

先做一个简单的近日总结吧
最近在学习怎么用vertx框架实现一个mqtt服务和物联网平台进一步的搭建
这两天又折腾了笔记软件,想从语雀迁到obsidian,也花了不少时间,想着最近都没怎么写过博客,就写一写自己这几天学习vertx框架实现mqtt服务的收获和学习到东西

Vert.x MQTT

MQTT是基于发布/订阅(publish/subscribe)模式的”轻量级“通信协议,该协议构建于TCP/IP协议之上,由于其低开销,低带宽占用的特点,非常适合在物联网设备通信方面使用。
关于MQTT相关的基础知识我不再赘述,如果还不清楚的同学可以看看这个文档,快速了解一下相关知识。MQTT 协议入门:基础知识和快速教程

而Vert.x MQTT是一个使用Vert.x实现MQTT协议中客户端与服务端的组件,里边有一些基础的api,可以利用api写出自己想要的服务。以下是其官方文档,可以参考一下,虽然是汉化过的文档,但是我看的过程还是比较困难,可能是作者太菜了。Vert.x MQTT

这里我只用了Vertx MQTT的服务端组件实现了MQTT服务,客户端部分采用了MQTTX创建虚拟设备。MQTTX是一款客户端工具,可以用于创建虚拟设备快速进行测试,MQTT 客户端工具演示 | EMQX 文档

代码实现

先分方法放出来,最后再放完整代码

start()方法

代码的入口,创建了一个MQTT服务器并进行了监听。在接收到来自客户端的连接请求时,会调用不同的处理方法来处理订阅、退订、收到消息和断开连接等操作。

  @Overridepublic void start() throws Exception {MqttServer mqttServer = MqttServer.create(vertx);topicSubscribers = new HashMap<>();subscriptions=new HashMap<>();mqttServer.endpointHandler(endpoint -> {//使用lambda表达式实现了endpointHandler方法,传入参数endpoint;System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());if (endpoint.auth() != null) {System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");}System.out.println("[properties = " + endpoint.connectProperties() + "]");// accept connection from the remote clientendpoint.accept(false);SubscribeHandle(endpoint);UnSubscribeHandle(endpoint);ReceiveHandle(endpoint);DisConnectHandle(endpoint);});mqttServer.listen(ar -> {if (ar.succeeded()) {System.out.println("MQTT server is listening on port " + ar.result().actualPort());} else {System.out.println("Error on starting the server");ar.cause().printStackTrace();}});}

DisConnectHandle方法

用于处理设备断开连接的情况。当设备发送断开连接消息时,会从订阅关系列表中删除该设备,并从订阅列表中删除该设备的所有订阅。

  private void DisConnectHandle(MqttEndpoint endpoint) {endpoint.disconnectMessageHandler(disconnectMessage -> {System.out.println("Received disconnect from client, reason code = " + disconnectMessage.code());System.out.println("Client "+endpoint.auth().getUsername()+"disconnect with");//对两个映射订阅关系的列表进行更新for (String topic:subscriptions.get(endpoint)){topicSubscribers.get(topic).remove(endpoint);System.out.println("["+topic+"]");}subscriptions.remove(endpoint);});}

SubscribeHandle方法

用于处理订阅消息。当设备发送订阅消息时,会检查订阅的主题是否合法。如果主题合法,则将设备添加到订阅关系列表和订阅列表中,并向设备发送订阅确认消息。

    private void SubscribeHandle(MqttEndpoint endpoint) {endpoint.subscribeHandler(subscribe -> {Boolean IsValidTopic=false;//存储订阅消息中要订阅的topic的列表List<MqttTopicSubscription> topicSubscriptions = subscribe.topicSubscriptions();//存储订阅topic Qos级别的列表List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();//遍历列表for (MqttTopicSubscription s : topicSubscriptions) {//topicString topic = s.topicName();//Qos级别MqttQoS qos = s.qualityOfService();//判断topic是否合法if (!isValidTopic(topic)){//不合法则向设备发送消息endpoint.publish(topic, Buffer.buffer("非法topic,topic不可包含空格"), qos, false, false);continue;}else {IsValidTopic=true;}System.out.println("Subscription for " + topic + " with QoS " + qos);reasonCodes.add(MqttSubAckReasonCode.qosGranted(qos));//判断是否已有此topic,如果有则直接添加,没有则新建键值对if (!topicSubscribers.containsKey(topic)) {topicSubscribers.put(topic, new ArrayList<MqttEndpoint>());}topicSubscribers.get(topic).add(endpoint);//同上if (!subscriptions.containsKey(endpoint)) {subscriptions.put(endpoint, new ArrayList<String>());}subscriptions.get(endpoint).add(topic);}if(IsValidTopic){endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);}});}

UnSubscribeHandle方法

用于处理退订消息。当设备发送退订消息时,会从订阅关系列表中删除该设备,并检查是否需要删除订阅列表中的主题。

private void UnSubscribeHandle(MqttEndpoint endpoint) {endpoint.unsubscribeHandler(unsubscribe -> {//遍历要退订的topicfor (String unsubscribedTopic : unsubscribe.topics()) {topicSubscribers.get(unsubscribedTopic).remove(endpoint);//如果某topic的订阅列表为空,删除topicif (topicSubscribers.get(unsubscribedTopic).size() == 0) {topicSubscribers.remove(unsubscribedTopic);}subscriptions.get(endpoint).remove(unsubscribedTopic);//同上if (subscriptions.get(endpoint).size()==0){subscriptions.remove(endpoint);}System.out.println("unsubscribed :" + endpoint.auth().getUsername() + "for" + unsubscribedTopic);}endpoint.unsubscribeAcknowledge(unsubscribe.messageId());});}

ReceiveHandle方法

用于处理收到的消息。当设备发布消息时,会检查发布的主题是否合法。如果主题合法,则遍历订阅关系列表,将消息发布给订阅了匹配主题的设备。

private void ReceiveHandle(MqttEndpoint endpoint) {endpoint.publishHandler(publish -> {String topic = publish.topicName();Buffer payload = publish.payload();//对topic的合法性进行判断if (!isValidTopic(topic)){endpoint.publish(topic, Buffer.buffer("非法topic,topic不可包含空格"), MqttQoS.AT_MOST_ONCE, false, false);return;}//记录日志接收到设备发布的消息System.out.println("Received message [" + publish.payload().toString(Charset.defaultCharset()) + "] with QoS [" + publish.qosLevel() + "]");if (publish.qosLevel() == MqttQoS.AT_LEAST_ONCE) {endpoint.publishAcknowledge(publish.messageId());} else if (publish.qosLevel() == MqttQoS.EXACTLY_ONCE) {endpoint.publishReceived(publish.messageId());}//遍历订阅关系,进行消息发布for (Map.Entry<String, List<MqttEndpoint>> entry : topicSubscribers.entrySet()) {String subscribedTopic = entry.getKey();//被订阅的topicList<MqttEndpoint> subscribers = entry.getValue();//订阅上方topic的订阅者//判断消息发布的topic是否能和被设备订阅的topic按照规则匹配if (isTopicMatch(subscribedTopic, topic)) {//若匹配,则遍历topic订阅者列表,并进行消息发布for (MqttEndpoint subscriber : subscribers) {subscriber.publish(topic, payload, publish.qosLevel(), publish.isDup(), publish.isRetain());}}}});endpoint.publishAcknowledgeHandler(messageId -> {System.out.println("received ack for message =" + messageId);}).publishReceivedHandler(messageId -> {endpoint.publishRelease(messageId);}).publishCompletionHandler(messageId -> {System.out.println("Received ack for message =" + messageId);});endpoint.publishReleaseHandler(endpoint::publishComplete);}

isTopicMatch方法

用于判断订阅的主题和发布的主题是否匹配。它会将主题按照"/“进行分割,并进行逐层匹配,支持通配符”+“和”#"。

private boolean isTopicMatch(String subscribedTopic, String publishedTopic) {String[] publishTopicArray = publishedTopic.split("/");String[] subscribedTopicArray = subscribedTopic.split("/");//将两个要比较的topic分割//订阅的topic长度不能比发布的topic长一个以上if (subscribedTopicArray.length - 1 > publishTopicArray.length) {return false;}//如果发布的topic长度比订阅的topic长度要长//并且订阅的topic最后不是以#结尾都返回false,因为这不可能if (subscribedTopicArray.length<publishTopicArray.length){if (!subscribedTopicArray[subscribedTopicArray.length-1].equals("#")){return false;}}//对两个topic进行比较for (int i = 0; i < publishTopicArray.length && i < subscribedTopicArray.length; i++) {//如果匹配成功或者匹配到了+,进行下一层匹配if (subscribedTopicArray[i].equals(publishTopicArray[i])||subscribedTopicArray[i].equals("+")){continue;}//如果匹配到了#,直接通过if (subscribedTopicArray[i].equals("#")) {break;}return false;}return true;}

isValidTopic方法

用于判断主题是否合法。它会检查主题是否包含空格,并且要么以"/#“结尾,要么不包含”#"。

public boolean isValidTopic(String topic) {//topic 不能包含任何空格,并且要么以 /# 结尾,要么不包含 #return (!topic.matches(".*\\s+.*"))&&(topic.matches(".*(?:\\/#)?$"));}

完整代码

package com.yichen.starter;import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*
* 缺少
* */
public class MqttVertical extends AbstractVerticle {private static Map<String, List<MqttEndpoint>> topicSubscribers;//存储每个topic的订阅关系private static Map<MqttEndpoint, List<String>> subscriptions;//存储每个设备订阅的topic@Overridepublic void start() throws Exception {MqttServer mqttServer = MqttServer.create(vertx);topicSubscribers = new HashMap<>();subscriptions=new HashMap<>();mqttServer.endpointHandler(endpoint -> {//使用lambda表达式实现了endpointHandler方法,传入参数endpoint;System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());if (endpoint.auth() != null) {System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");}System.out.println("[properties = " + endpoint.connectProperties() + "]");// accept connection from the remote clientendpoint.accept(false);SubscribeHandle(endpoint);UnSubscribeHandle(endpoint);ReceiveHandle(endpoint);DisConnectHandle(endpoint);});mqttServer.listen(ar -> {if (ar.succeeded()) {System.out.println("MQTT server is listening on port " + ar.result().actualPort());} else {System.out.println("Error on starting the server");ar.cause().printStackTrace();}});}/** 设备断开处理* */private void DisConnectHandle(MqttEndpoint endpoint) {endpoint.disconnectMessageHandler(disconnectMessage -> {System.out.println("Received disconnect from client, reason code = " + disconnectMessage.code());System.out.println("Client "+endpoint.auth().getUsername()+"disconnect with");//对两个映射订阅关系的列表进行更新for (String topic:subscriptions.get(endpoint)){topicSubscribers.get(topic).remove(endpoint);System.out.println("["+topic+"]");}subscriptions.remove(endpoint);});}/**处理订阅消息* */private void SubscribeHandle(MqttEndpoint endpoint) {endpoint.subscribeHandler(subscribe -> {Boolean IsValidTopic=false;//存储订阅消息中要订阅的topic的列表List<MqttTopicSubscription> topicSubscriptions = subscribe.topicSubscriptions();//存储订阅topic Qos级别的列表List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();//遍历列表for (MqttTopicSubscription s : topicSubscriptions) {//topicString topic = s.topicName();//Qos级别MqttQoS qos = s.qualityOfService();//判断topic是否合法if (!isValidTopic(topic)){//不合法则向设备发送消息endpoint.publish(topic, Buffer.buffer("非法topic,topic不可包含空格"), qos, false, false);continue;}else {IsValidTopic=true;}System.out.println("Subscription for " + topic + " with QoS " + qos);reasonCodes.add(MqttSubAckReasonCode.qosGranted(qos));//判断是否已有此topic,如果有则直接添加,没有则新建键值对if (!topicSubscribers.containsKey(topic)) {topicSubscribers.put(topic, new ArrayList<MqttEndpoint>());}topicSubscribers.get(topic).add(endpoint);//同上if (!subscriptions.containsKey(endpoint)) {subscriptions.put(endpoint, new ArrayList<String>());}subscriptions.get(endpoint).add(topic);}if(IsValidTopic){endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);}});}/** 处理退订* */private void UnSubscribeHandle(MqttEndpoint endpoint) {endpoint.unsubscribeHandler(unsubscribe -> {//遍历要退订的topicfor (String unsubscribedTopic : unsubscribe.topics()) {topicSubscribers.get(unsubscribedTopic).remove(endpoint);//如果某topic的订阅列表为空,删除topicif (topicSubscribers.get(unsubscribedTopic).size() == 0) {topicSubscribers.remove(unsubscribedTopic);}subscriptions.get(endpoint).remove(unsubscribedTopic);//同上if (subscriptions.get(endpoint).size()==0){subscriptions.remove(endpoint);}System.out.println("unsubscribed :" + endpoint.auth().getUsername() + "for" + unsubscribedTopic);}endpoint.unsubscribeAcknowledge(unsubscribe.messageId());});}private void ReceiveHandle(MqttEndpoint endpoint) {endpoint.publishHandler(publish -> {String topic = publish.topicName();Buffer payload = publish.payload();//对topic的合法性进行判断if (!isValidTopic(topic)){endpoint.publish(topic, Buffer.buffer("非法topic,topic不可包含空格"), MqttQoS.AT_MOST_ONCE, false, false);return;}//记录日志接收到设备发布的消息System.out.println("Received message [" + publish.payload().toString(Charset.defaultCharset()) + "] with QoS [" + publish.qosLevel() + "]");if (publish.qosLevel() == MqttQoS.AT_LEAST_ONCE) {endpoint.publishAcknowledge(publish.messageId());} else if (publish.qosLevel() == MqttQoS.EXACTLY_ONCE) {endpoint.publishReceived(publish.messageId());}//遍历订阅关系,进行消息发布for (Map.Entry<String, List<MqttEndpoint>> entry : topicSubscribers.entrySet()) {String subscribedTopic = entry.getKey();//被订阅的topicList<MqttEndpoint> subscribers = entry.getValue();//订阅上方topic的订阅者//判断消息发布的topic是否能和被设备订阅的topic按照规则匹配if (isTopicMatch(subscribedTopic, topic)) {//若匹配,则遍历topic订阅者列表,并进行消息发布for (MqttEndpoint subscriber : subscribers) {subscriber.publish(topic, payload, publish.qosLevel(), publish.isDup(), publish.isRetain());}}}});endpoint.publishAcknowledgeHandler(messageId -> {System.out.println("received ack for message =" + messageId);}).publishReceivedHandler(messageId -> {endpoint.publishRelease(messageId);}).publishCompletionHandler(messageId -> {System.out.println("Received ack for message =" + messageId);});endpoint.publishReleaseHandler(endpoint::publishComplete);}
/*
* 判断topic是否匹配
* */private boolean isTopicMatch(String subscribedTopic, String publishedTopic) {String[] publishTopicArray = publishedTopic.split("/");String[] subscribedTopicArray = subscribedTopic.split("/");//将两个要比较的topic分割//订阅的topic长度不能比发布的topic长一个以上if (subscribedTopicArray.length - 1 > publishTopicArray.length) {return false;}//如果发布的topic长度比订阅的topic长度要长//并且订阅的topic最后不是以#结尾都返回false,因为这不可能if (subscribedTopicArray.length<publishTopicArray.length){if (!subscribedTopicArray[subscribedTopicArray.length-1].equals("#")){return false;}}//对两个topic进行比较for (int i = 0; i < publishTopicArray.length && i < subscribedTopicArray.length; i++) {//如果匹配成功或者匹配到了+,进行下一层匹配if (subscribedTopicArray[i].equals(publishTopicArray[i])||subscribedTopicArray[i].equals("+")){continue;}//如果匹配到了#,直接通过if (subscribedTopicArray[i].equals("#")) {break;}return false;}return true;}public boolean isValidTopic(String topic) {//topic 不能包含任何空格,并且要么以 /# 结尾,要么不包含 #return (!topic.matches(".*\\s+.*"))&&(topic.matches(".*(?:\\/#)?$"));}
}

最后

以上就是我的全部代码和相关内容解释,如果有疑问欢迎评论区交流,如果帮到你了可以点个赞

这篇关于Vert.x框架实现MQTT服务,代码实现及其详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/687088

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

poj 1258 Agri-Net(最小生成树模板代码)

感觉用这题来当模板更适合。 题意就是给你邻接矩阵求最小生成树啦。~ prim代码:效率很高。172k...0ms。 #include<stdio.h>#include<algorithm>using namespace std;const int MaxN = 101;const int INF = 0x3f3f3f3f;int g[MaxN][MaxN];int n