springboot整合mqtt向EMQX发送信息

2024-05-29 08:08

本文主要是介绍springboot整合mqtt向EMQX发送信息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 spingboot整合mqtt

原理:

 二 操作案例

2.1 工程结构

 2.2 配置pom文件

<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version><scope>test</scope></dependency><!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.16</version></dependency><!-- springBoot的启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.0.1.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId><version>5.1.5.RELEASE</version></dependency>

 2.3 配置application配置文件

server:port: 8081
spring:mqtt:username: admin							# 账号password: public						# 密码host-url: tcp://172.16.71.150:1883					# mqtt连接tcp地址client-id: mq-dky-0813						# 客户端Id,每个启动的id要不同default-topic: mq-dky-guolu					# 默认主题timeout: 100							# 超时时间keepalive: 100

2.4 读取配置文件,初始客户端

package com.ljf.mqtt.demo.config;import com.ljf.mqtt.demo.client.MqttPushClient;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** @ClassName: MqttConfig* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:43:39 * @Version: V1.0**/
@Component
@ConfigurationProperties("spring.mqtt")
@Setter
@Getter
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;@Beanpublic MqttPushClient getMqttPushClient() {mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);// 以/#结尾表示订阅所有以test开头的主题mqttPushClient.subscribe(defaultTopic, 0);return mqttPushClient;}
}

2.4 订阅推送客户端

package com.ljf.mqtt.demo.client;import com.ljf.mqtt.demo.listener.PushCallback;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: MqttPushClient* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:48:38 * @Version: V1.0**/
@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(pushCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** @param qos         连接方式* @param retained    是否保留* @param topic       主题* @param pushMessage 消息体*/public void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("==============开始订阅主题=========" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}
}

2.5 定制监听订阅者

package com.ljf.mqtt.demo.listener;import com.ljf.mqtt.demo.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: PushCallback* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:52:20 * @Version: V1.0**/
@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;@Overridepublic void connectionLost(Throwable throwable) {// 连接丢失后,一般在这里面进行重连logger.info("连接断开,可以做重连");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}
}

2.6 发布数据

package com.ljf.mqtt.demo.controller;import com.ljf.mqtt.demo.client.MqttPushClient;
import com.ljf.mqtt.demo.utils.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @ClassName: PullController* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:56:18 * @Version: V1.0**/
@RestController
@RequestMapping("/")
public class PullController {@Autowiredprivate MqttPushClient mqttPushClient;/*** @author liujianfu* @description    测试发布主题* @date 2021/8/16 15:04* @param []* @return RUtils*/@GetMapping(value = "/publishTopic")public R publishTopic(String sendMessage) {System.out.println("message:"+sendMessage);sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";mqttPushClient.publish(0,false,"mq-dky-guolu",sendMessage);return R.ok("OK");}
}

2.7 发布数据

1.发布数据:

 2.订阅消费数据

 3.emqx页面

4.在页面进行模拟

连接

订阅

 推送:

java代码客户端:

这篇关于springboot整合mqtt向EMQX发送信息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013103

相关文章

Java中Map的五种遍历方式实现与对比

《Java中Map的五种遍历方式实现与对比》其实Map遍历藏着多种玩法,有的优雅简洁,有的性能拉满,今天咱们盘一盘这些进阶偏基础的遍历方式,告别重复又臃肿的代码,感兴趣的小伙伴可以了解下... 目录一、先搞懂:Map遍历的核心目标二、几种遍历方式的对比1. 传统EntrySet遍历(最通用)2. Lambd

Spring Boot 中 RestTemplate 的核心用法指南

《SpringBoot中RestTemplate的核心用法指南》本文详细介绍了RestTemplate的使用,包括基础用法、进阶配置技巧、实战案例以及最佳实践建议,通过一个腾讯地图路线规划的案... 目录一、环境准备二、基础用法全解析1. GET 请求的三种姿势2. POST 请求深度实践三、进阶配置技巧1

springboot+redis实现订单过期(超时取消)功能的方法详解

《springboot+redis实现订单过期(超时取消)功能的方法详解》在SpringBoot中使用Redis实现订单过期(超时取消)功能,有多种成熟方案,本文为大家整理了几个详细方法,文中的示例代... 目录一、Redis键过期回调方案(推荐)1. 配置Redis监听器2. 监听键过期事件3. Redi

Spring Boot 处理带文件表单的方式汇总

《SpringBoot处理带文件表单的方式汇总》本文详细介绍了六种处理文件上传的方式,包括@RequestParam、@RequestPart、@ModelAttribute、@ModelAttr... 目录方式 1:@RequestParam接收文件后端代码前端代码特点方式 2:@RequestPart接

SpringBoot整合Zuul全过程

《SpringBoot整合Zuul全过程》Zuul网关是微服务架构中的重要组件,具备统一入口、鉴权校验、动态路由等功能,它通过配置文件进行灵活的路由和过滤器设置,支持Hystrix进行容错处理,还提供... 目录Zuul网关的作用Zuul网关的应用1、网关访问方式2、网关依赖注入3、网关启动器4、网关全局变

SpringBoot全局异常拦截与自定义错误页面实现过程解读

《SpringBoot全局异常拦截与自定义错误页面实现过程解读》本文介绍了SpringBoot中全局异常拦截与自定义错误页面的实现方法,包括异常的分类、SpringBoot默认异常处理机制、全局异常拦... 目录一、引言二、Spring Boot异常处理基础2.1 异常的分类2.2 Spring Boot默

基于SpringBoot实现分布式锁的三种方法

《基于SpringBoot实现分布式锁的三种方法》这篇文章主要为大家详细介绍了基于SpringBoot实现分布式锁的三种方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、基于Redis原生命令实现分布式锁1. 基础版Redis分布式锁2. 可重入锁实现二、使用Redisso

SpringBoot的全局异常拦截实践过程

《SpringBoot的全局异常拦截实践过程》SpringBoot中使用@ControllerAdvice和@ExceptionHandler实现全局异常拦截,@RestControllerAdvic... 目录@RestControllerAdvice@ResponseStatus(...)@Except

Springboot配置文件相关语法及读取方式详解

《Springboot配置文件相关语法及读取方式详解》本文主要介绍了SpringBoot中的两种配置文件形式,即.properties文件和.yml/.yaml文件,详细讲解了这两种文件的语法和读取方... 目录配置文件的形式语法1、key-value形式2、数组形式读取方式1、通过@value注解2、通过

Java 接口定义变量的示例代码

《Java接口定义变量的示例代码》文章介绍了Java接口中的变量和方法,接口中的变量必须是publicstaticfinal的,用于定义常量,而方法默认是publicabstract的,必须由实现类... 在 Java 中,接口是一种抽象类型,用于定义类必须实现的方法。接口可以包含常量和方法,但不能包含实例