springboot整合mqtt向EMQX发送信息

2024-05-29 08:08

本文主要是介绍springboot整合mqtt向EMQX发送信息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 spingboot整合mqtt

原理:

 二 操作案例

2.1 工程结构

 2.2 配置pom文件

<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version><scope>test</scope></dependency><!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.16</version></dependency><!-- springBoot的启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.0.1.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId><version>5.1.5.RELEASE</version></dependency>

 2.3 配置application配置文件

server:port: 8081
spring:mqtt:username: admin							# 账号password: public						# 密码host-url: tcp://172.16.71.150:1883					# mqtt连接tcp地址client-id: mq-dky-0813						# 客户端Id,每个启动的id要不同default-topic: mq-dky-guolu					# 默认主题timeout: 100							# 超时时间keepalive: 100

2.4 读取配置文件,初始客户端

package com.ljf.mqtt.demo.config;import com.ljf.mqtt.demo.client.MqttPushClient;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** @ClassName: MqttConfig* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:43:39 * @Version: V1.0**/
@Component
@ConfigurationProperties("spring.mqtt")
@Setter
@Getter
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;@Beanpublic MqttPushClient getMqttPushClient() {mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);// 以/#结尾表示订阅所有以test开头的主题mqttPushClient.subscribe(defaultTopic, 0);return mqttPushClient;}
}

2.4 订阅推送客户端

package com.ljf.mqtt.demo.client;import com.ljf.mqtt.demo.listener.PushCallback;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: MqttPushClient* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:48:38 * @Version: V1.0**/
@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(pushCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** @param qos         连接方式* @param retained    是否保留* @param topic       主题* @param pushMessage 消息体*/public void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("==============开始订阅主题=========" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}
}

2.5 定制监听订阅者

package com.ljf.mqtt.demo.listener;import com.ljf.mqtt.demo.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: PushCallback* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:52:20 * @Version: V1.0**/
@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;@Overridepublic void connectionLost(Throwable throwable) {// 连接丢失后,一般在这里面进行重连logger.info("连接断开,可以做重连");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}
}

2.6 发布数据

package com.ljf.mqtt.demo.controller;import com.ljf.mqtt.demo.client.MqttPushClient;
import com.ljf.mqtt.demo.utils.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @ClassName: PullController* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:56:18 * @Version: V1.0**/
@RestController
@RequestMapping("/")
public class PullController {@Autowiredprivate MqttPushClient mqttPushClient;/*** @author liujianfu* @description    测试发布主题* @date 2021/8/16 15:04* @param []* @return RUtils*/@GetMapping(value = "/publishTopic")public R publishTopic(String sendMessage) {System.out.println("message:"+sendMessage);sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";mqttPushClient.publish(0,false,"mq-dky-guolu",sendMessage);return R.ok("OK");}
}

2.7 发布数据

1.发布数据:

 2.订阅消费数据

 3.emqx页面

4.在页面进行模拟

连接

订阅

 推送:

java代码客户端:

这篇关于springboot整合mqtt向EMQX发送信息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013103

相关文章

Java Stream流与使用操作指南

《JavaStream流与使用操作指南》Stream不是数据结构,而是一种高级的数据处理工具,允许你以声明式的方式处理数据集合,类似于SQL语句操作数据库,本文给大家介绍JavaStream流与使用... 目录一、什么是stream流二、创建stream流1.单列集合创建stream流2.双列集合创建str

springboot集成easypoi导出word换行处理过程

《springboot集成easypoi导出word换行处理过程》SpringBoot集成Easypoi导出Word时,换行符n失效显示为空格,解决方法包括生成段落或替换模板中n为回车,同时需确... 目录项目场景问题描述解决方案第一种:生成段落的方式第二种:替换模板的情况,换行符替换成回车总结项目场景s

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

SpringBoot中@Value注入静态变量方式

《SpringBoot中@Value注入静态变量方式》SpringBoot中静态变量无法直接用@Value注入,需通过setter方法,@Value(${})从属性文件获取值,@Value(#{})用... 目录项目场景解决方案注解说明1、@Value("${}")使用示例2、@Value("#{}"php

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

线上Java OOM问题定位与解决方案超详细解析

《线上JavaOOM问题定位与解决方案超详细解析》OOM是JVM抛出的错误,表示内存分配失败,:本文主要介绍线上JavaOOM问题定位与解决方案的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一、OOM问题核心认知1.1 OOM定义与技术定位1.2 OOM常见类型及技术特征二、OOM问题定位工具

基于 Cursor 开发 Spring Boot 项目详细攻略

《基于Cursor开发SpringBoot项目详细攻略》Cursor是集成GPT4、Claude3.5等LLM的VSCode类AI编程工具,支持SpringBoot项目开发全流程,涵盖环境配... 目录cursor是什么?基于 Cursor 开发 Spring Boot 项目完整指南1. 环境准备2. 创建

Spring Security简介、使用与最佳实践

《SpringSecurity简介、使用与最佳实践》SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,本文给大家介绍SpringSec... 目录一、如何理解 Spring Security?—— 核心思想二、如何在 Java 项目中使用?——

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

springboot中使用okhttp3的小结

《springboot中使用okhttp3的小结》OkHttp3是一个JavaHTTP客户端,可以处理各种请求类型,比如GET、POST、PUT等,并且支持高效的HTTP连接池、请求和响应缓存、以及异... 在 Spring Boot 项目中使用 OkHttp3 进行 HTTP 请求是一个高效且流行的方式。