RabbitMQ之Plugins插件----AMQP对接MQTT

2024-03-24 11:28

本文主要是介绍RabbitMQ之Plugins插件----AMQP对接MQTT,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.启用插件

rabbitmq-plugins enable rabbitmq_mqtt

2.检查是否启动成功,打开rabbitmq后台

3.概念:
AMQP是由交换器和queue队列组成的消息队列机制,MQTT是由订阅主题组成的消息机制

1.MQTT创建连接时会向rabbitmq创建一个自己的queue,默认使用amq.topic交换器,由mqtt-subscription-***-qos1组成queue名。其中qos概念自行了解。

2.AMQP连接消费也需要绑定到MQTT对应的交换器和队列中,其中routingKey则使用MQTT订阅主题。

上代码:

//****************************************************************** */
//MQTTconst mqtt = require("mqtt")
const client = mqtt.connect({url: "mqtt://127.0.0.1:1883",clientId: 'nickchen111', // 客户端ID,此ID可自定义。queue后缀username: 'guest',   // 用户名password: 'guest'
}) // 连接到mqtt服务器var quque = "device_aaa"
var quque_rpc = "device_aaa_rpc";
client.subscribe(quque_rpc, { qos: 1 })var status = false;
var count = 0;
var error = 0;
setInterval(() => {if (status) {count += 1;var msg = Math.ceil(Math.random() * 40)console.log("MQTT发送消息:" + msg)client.publish(quque, msg.toString(), { qos: 0, retain: true })}
}, 3000);client.on("message", function (top, message) {if (top == "device_aaa") {error += 1;}console.log("MQTT接收消息:" + top + "  : " + message.toString() + "*******丢失返回消息共:[" + error + "]条")if (JSON.parse(message.toString()).command == "stop") {status = false;}if (JSON.parse(message.toString()).command == "start") {status = true;}
})//****************************************************************** */
//AMQP
const amqp = require('amqplib');async function consumer() {// 创建链接对象const connection = await amqp.connect('amqp://localhost:5672');// 获取通道const channel = await connection.createChannel();// 声明参数const exchangeName = 'amq.topic';const queueName = 'mqtt-subscription-nickchen111qos1';const routingKey = 'device_aaa';// // 声明一个交换机// await channel.assertExchange(exchangeName, 'amq.topic', { durable: true });//消息持久化// // 声明一个队列await channel.assertQueue(queueName, { autoDelete: true, durable: true });// 绑定关系(队列、交换机、路由键)await channel.bindQueue(queueName, exchangeName, routingKey);var data = await channel.publish(exchangeName, "device_aaa_rpc", Buffer.from(JSON.stringify({ command: "start" })))// 消费await channel.consume(queueName, msg => {console.log('Consumer:', msg.content.toString());channel.ack(msg);},{ noAck: false });//开启通知console.log('消费端启动成功!');
}
consumer();

        

这篇关于RabbitMQ之Plugins插件----AMQP对接MQTT的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/841450

相关文章

Loaded plugins: fastestmirror, langpacks

https://blog.csdn.net/weixin_37282478/article/details/82152239

WordPress网创自动采集并发布插件

网创教程:WordPress插件网创自动采集并发布 阅读更新:随机添加文章的阅读数量,购买数量,喜欢数量。 使用插件注意事项 如果遇到404错误,请先检查并调整网站的伪静态设置,这是最常见的问题。需要定制化服务,请随时联系我。 本次更新内容 我们进行了多项更新和优化,主要包括: 界面设置:用户现在可以更便捷地设置文章分类和发布金额。代码优化:改进了采集和发布代码,提高了插件的稳定

vscode-创建vue3项目-修改暗黑主题-常见错误-element插件标签-用法涉及问题

文章目录 1.vscode创建运行编译vue3项目2.添加项目资源3.添加element-plus元素4.修改为暗黑主题4.1.在main.js主文件中引入暗黑样式4.2.添加自定义样式文件4.3.html页面html标签添加样式 5.常见错误5.1.未使用变量5.2.关闭typescript检查5.3.调试器支持5.4.允许未到达代码和未定义代码 6.element常用标签6.1.下拉列表

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

ROS2从入门到精通4-4:局部控制插件开发案例(以PID算法为例)

目录 0 专栏介绍1 控制插件编写模板1.1 构造控制插件类1.2 注册并导出插件1.3 编译与使用插件 2 基于PID的路径跟踪原理3 控制插件开发案例(PID算法)常见问题 0 专栏介绍 本专栏旨在通过对ROS2的系统学习,掌握ROS2底层基本分布式原理,并具有机器人建模和应用ROS2进行实际项目的开发和调试的工程能力。 🚀详情:《ROS2从入门到精通》 1 控制插

uniapp 使用uview 插件

看创建项目版本vue2 、 vue3 Button 按钮 | uView 2.0 - 全面兼容 nvue 的 uni-app 生态框架 - uni-app UI 框架 1.  npm install uview-ui@2.0.36 2. // main.js,注意要在use方法之后执行import uView from 'uview-ui'Vue.use(uView)// 如此

MQTT之CONNECT控制报文详解

目录 1.  MQTT协议规范 2.  名词解释 3.  CONNECT控制报文详解 3.1  固定报头 Fixed header 3.2  可变报头 Variable header 3.2.1  协议名 Protocol Name 3.2.2  协议级别 Protocol Level 3.2.3  连接标志 Connect Flags 3.2.4  保持连接 Keep

RabbitMQ实践——临时队列

临时队列是一种自动删除队列。当这个队列被创建后,如果没有消费者监听,则会一直存在,还可以不断向其发布消息。但是一旦的消费者开始监听,然后断开监听后,它就会被自动删除。 新建自动删除队列 我们创建一个名字叫queue.auto.delete的临时队列 绑定 我们直接使用默认交换器,所以不用创建新的交换器,也不用建立绑定关系。 实验 发布消息 我们在后台管理页面的默认交换器下向这个队列

MyBatis系列之分页插件及问题

概述 无论是C端产品页面,还是后台系统页面,不可能一次性将全部数据加载出来。后台系统一般都是PC端登录,用Table组件(如Ant Design Table)渲染展示数据,可点击列表的下一页(或指定某一页)查看数据。C端产品如App,在下滑时可查看更多数据,看起来像是一次性加载数据,实际上也是分批请求后台系统获取数据。而这,就是分页功能。 如果没有使用Hibernate或MyBatis这样的O

Spring 集成 RabbitMQ 与其概念,消息持久化,ACK机制

目录 RabbitMQ 概念exchange交换机机制 什么是交换机binding?Direct Exchange交换机Topic Exchange交换机Fanout Exchange交换机Header Exchange交换机RabbitMQ 的 Hello - Demo(springboot实现)RabbitMQ 的 Hello Demo(spring xml实现)RabbitMQ 在生产环境