RabbitMQ之Plugins插件----AMQP对接MQTT

2024-03-24 11:28

本文主要是介绍RabbitMQ之Plugins插件----AMQP对接MQTT,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.启用插件

rabbitmq-plugins enable rabbitmq_mqtt

2.检查是否启动成功,打开rabbitmq后台

3.概念:
AMQP是由交换器和queue队列组成的消息队列机制,MQTT是由订阅主题组成的消息机制

1.MQTT创建连接时会向rabbitmq创建一个自己的queue,默认使用amq.topic交换器,由mqtt-subscription-***-qos1组成queue名。其中qos概念自行了解。

2.AMQP连接消费也需要绑定到MQTT对应的交换器和队列中,其中routingKey则使用MQTT订阅主题。

上代码:

//****************************************************************** */
//MQTTconst mqtt = require("mqtt")
const client = mqtt.connect({url: "mqtt://127.0.0.1:1883",clientId: 'nickchen111', // 客户端ID,此ID可自定义。queue后缀username: 'guest',   // 用户名password: 'guest'
}) // 连接到mqtt服务器var quque = "device_aaa"
var quque_rpc = "device_aaa_rpc";
client.subscribe(quque_rpc, { qos: 1 })var status = false;
var count = 0;
var error = 0;
setInterval(() => {if (status) {count += 1;var msg = Math.ceil(Math.random() * 40)console.log("MQTT发送消息:" + msg)client.publish(quque, msg.toString(), { qos: 0, retain: true })}
}, 3000);client.on("message", function (top, message) {if (top == "device_aaa") {error += 1;}console.log("MQTT接收消息:" + top + "  : " + message.toString() + "*******丢失返回消息共:[" + error + "]条")if (JSON.parse(message.toString()).command == "stop") {status = false;}if (JSON.parse(message.toString()).command == "start") {status = true;}
})//****************************************************************** */
//AMQP
const amqp = require('amqplib');async function consumer() {// 创建链接对象const connection = await amqp.connect('amqp://localhost:5672');// 获取通道const channel = await connection.createChannel();// 声明参数const exchangeName = 'amq.topic';const queueName = 'mqtt-subscription-nickchen111qos1';const routingKey = 'device_aaa';// // 声明一个交换机// await channel.assertExchange(exchangeName, 'amq.topic', { durable: true });//消息持久化// // 声明一个队列await channel.assertQueue(queueName, { autoDelete: true, durable: true });// 绑定关系(队列、交换机、路由键)await channel.bindQueue(queueName, exchangeName, routingKey);var data = await channel.publish(exchangeName, "device_aaa_rpc", Buffer.from(JSON.stringify({ command: "start" })))// 消费await channel.consume(queueName, msg => {console.log('Consumer:', msg.content.toString());channel.ack(msg);},{ noAck: false });//开启通知console.log('消费端启动成功!');
}
consumer();

        

这篇关于RabbitMQ之Plugins插件----AMQP对接MQTT的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/841450

相关文章

Springboot使用RabbitMQ实现关闭超时订单(示例详解)

《Springboot使用RabbitMQ实现关闭超时订单(示例详解)》介绍了如何在SpringBoot项目中使用RabbitMQ实现订单的延时处理和超时关闭,通过配置RabbitMQ的交换机、队列和... 目录1.maven中引入rabbitmq的依赖:2.application.yml中进行rabbit

IDEA常用插件之代码扫描SonarLint详解

《IDEA常用插件之代码扫描SonarLint详解》SonarLint是一款用于代码扫描的插件,可以帮助查找隐藏的bug,下载并安装插件后,右键点击项目并选择“Analyze”、“Analyzewit... 目录SonajavascriptrLint 查找隐藏的bug下载安装插件扫描代码查看结果总结Sona

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

Maven(插件配置和生命周期的绑定)

1.这篇文章很好,介绍的maven插件的。 2.maven的source插件为例,可以把源代码打成包。 Goals Overview就可以查看该插件下面所有的目标。 这里我们要使用的是source:jar-no-fork。 3.查看source插件的example,然后配置到riil-collect.xml中。  <build>   <plugins>    <pl

jenkins 插件执行shell命令时,提示“Command not found”处理方法

首先提示找不到“Command not found,可能我们第一反应是查看目标机器是否已支持该命令,不过如果相信能找到这里来的朋友估计遇到的跟我一样,其实目标机器是没有问题的通过一些远程工具执行shell命令是可以执行。奇怪的就是通过jenkinsSSH插件无法执行,经一番折腾各种搜索发现是jenkins没有加载/etc/profile导致。 【解决办法】: 需要在jenkins调用shell脚

Jenkins 插件 地址证书报错问题解决思路

问题提示摘要: SunCertPathBuilderException: unable to find valid certification path to requested target...... 网上很多的解决方式是更新站点的地址,我这里修改了一个日本的地址(清华镜像也好),其实发现是解决不了上述的报错问题的,其实,最终拉去插件的时候,会提示证书的问题,几经周折找到了其中一遍博文

如何更优雅地对接第三方API

如何更优雅地对接第三方API 本文所有示例完整代码地址:https://github.com/yu-linfeng/BlogRepositories/tree/master/repositories/third 我们在日常开发过程中,有不少场景会对接第三方的API,例如第三方账号登录,第三方服务等等。第三方服务会提供API或者SDK,我依稀记得早些年Maven还没那么广泛使用,通常要对接第三方

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

eclipse安装subversion(SVN)版本控制插件

陈科肇 查看插件更新站点 网址:http://subclipse.tigris.org/servlets/ProjectProcess?pageID=p4wYuA 网站截图: 根据自己的eclipse版本,选择需要的更新站点. 使用eclipse集成subservion插件 Help > Install New Software…> 等待下载安装插件…