本文主要是介绍阿里云物联网mqtt使用及与springboot集成mqtt订阅功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
阿里云物联网mqtt使用及与springboot集成mqtt订阅功能详细教程
- 前言
- 目录
- 阿里云创建MQTT服务
- 服务器及设备间的通信
- springboot集成mqtt
前言
最近做了一个PLC与阿里云mqtt通讯,阿里云再与java后端通信的demo,记录下阿里云的使用及调试和springboot如何集成mqtt的功能,因网上重复的资料太多,大大降低了搜索效率,以及关于阿里云的资料较少,故自己写一篇文章记录下自己的学习的过程,方便以后再用,也给需要做类似项目的同学一个参考。这是本人第一次写文章,本人也是JAVA初学者,如有不对的地方还请各位指正。
目录
阿里云创建MQTT服务
先找到产品与服务,然后选择物联网平台,找到公共实例,创建一个产品。
创建产品
然后在左侧下拉栏找到设备管理,在设备管理下拉栏找到设备,然后添加设备。添加的设备为未激活状态,第一次连接后激活
一般至少需要添加2个设备,一个是服务器的设备,一个是硬件设备,多个硬件设备则添加多个设备 即可。
之后点击设备的查看,找到DeviceSecret点击查看设备证书,这个是用来生成mqtt登录信息的关键,包含ip,端口号(这个一般固定1883),id,用户名,密码。
之后就是生成登录信息,激活设备,以及调试了。以上操作具体可参考阿里云上的相关开发文档,对于生成登录信息和如何调试及调试工具可参考此文章:https://www.maxlicheng.com/embedded/iot/337.html
对于生成登录信息这里附上一个小工具,不懂如何生成登录信息的同学可以下载后打开html网页,把对应的信息复制粘贴到该网页即可
百度云链接:https://pan.baidu.com/s/1itmwuuy-RfpbJYt0pP9Bsw
提取码:2fg9
网页及使用方法如下:
计算出的链接域名最后会包含端口号,使用的时候分开即可,若要使用ip登录,将域名链接用DNS解析后即可得出ip地址
服务器及设备间的通信
首先,先设定订阅的话题和发布的话题,在产品的Topic类列表中找到自定义topic并创建自己的topic。
定义topic类,设备操作权限选订阅和发布,topic类为你的话题名
然后会得到类似如下的topic
将${deviceName}换成你的设备名,其余保持不变就是你订阅和发布的topic名。
因阿里云单个设备只能订阅和发布自己的topic,所以需要添加阿里云的topic转发功能。
在左侧下拉栏找到规则引擎,然后选云产品流转,创建规则,然后编写SQL
设置要转发的topic(即之前设置的硬件设备发布的topic),相关SQL编写可以参考阿里云的开发文档,阿里云可以帮你在一定程度上处理完数据后再转发给服务器(个人觉得挺好用的)。若不需要处理数据则字段那填 * ,条件不填即可,这样就是数据不做处理直接透传。
然后找到转发数据处,添加操作
同上面差不多,选择产品,转发到的设备,对应话题即可
回到之前的界面启动即可
springboot集成mqtt
因业务比较简单,这里只做了订阅topic的功能,没有发布topic的功能
maven依赖,主要是最后两个
<!--mqtt依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
Springboot对应的代码,此代码只是mqtt连接及订阅消息的代码,还需读者自行编写处理收到消息部分的解析代码及相关操作,如存入数据库
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;@Configuration
@IntegrationComponentScan
public class MqttService {//此处填DNS解析后对应的ip地址private String hostUrl = "tcp://ip地址:1883";//剩下部分填写其余对应的信息即可private String username = "填入信息";private String password = "填入信息";private String clientId = "填入信息";private String recvTopic ="填入信息";@Beanpublic MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();mqttConnectOptions.setCleanSession(true);mqttConnectOptions.setConnectionTimeout(10);mqttConnectOptions.setKeepAliveInterval(60);mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{hostUrl});return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), recvTopic);adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {//此处添加处理方法//传入的消息是用message.getPayload().toString()得到//传入的topic名字用message.getHeaders().get("mqtt_receivedTopic").toString()得到}};}
这篇关于阿里云物联网mqtt使用及与springboot集成mqtt订阅功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!