RabbitMQ 实验消费原始队列消息, 拒绝(reject)投递死信交换机过程

本文主要是介绍RabbitMQ 实验消费原始队列消息, 拒绝(reject)投递死信交换机过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

如果你想通过 RabbitMQ 的死信队列功能实现消费者拒绝消息投递到死信交换机的行为,你可以按照以下步骤操作:

  1. 创建原始队列,并将其绑定到一个交换机上:
export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
rabbitmqadmin declare queue name=my_queue durable=true
rabbitmqadmin declare exchange name=my_exchange type=direct
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 创建死信交换机和死信队列,并将死信队列绑定到死信交换机:
rabbitmqadmin declare exchange name=my_dlx type=direct
rabbitmqadmin declare queue name=my_dlq durable=true
rabbitmqadmin declare binding source=my_dlx destination=my_dlq routing_key=dlx_routing_key
  1. 设置原始队列的参数,包括死信交换机和死信路由键:
rabbitmqadmin declare queue name=my_queue durable=true arguments='{"x-dead-letter-exchange":"my_dlx", "x-dead-letter-routing-key":"dlx_routing_key"}'
  1. 编写消费者代码,在消费者代码中拒绝投递消息到死信交换机:
//Consumer.ts
import * as amqp from 'amqplib';async function main() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const queue = 'my_queue';await channel.consume(queue, async (msg) => {if (msg !== null) {try {await new Promise(resolve => {setTimeout(()=>{resolve(true)},3000)})// Process the messageconsole.log("Received message:", msg.content.toString());// Simulate rejection of message delivery to dead letter exchangethrow new Error("Failed to process message, rejecting delivery to dead letter exchange");} catch (error) {// Reject the message delivery to dead letter exchangechannel.reject(msg, false);console.error("Error processing message:", error.message);}}});console.log('Waiting for messages...');
}main().catch(console.error);

在这里插入图片描述

在这个示例中,当消费者处理消息时发生错误时,它会将消息的投递拒绝到死信交换机。这样,这些被拒绝的消息将被重新路由到死信队列 my_dlq 中。

记得根据你的具体需求修改队列、交换机和消费者的配置,确保它们符合你的预期行为。

rabbitmqadmin purge queue name=my_dlq #删除队列中的所有消息

//Producer.ts 
import * as amqp from 'amqplib';async function sendMsg() {try {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel(); // 创建通道const delayQueue = 'my_queue'; // 延迟队列名称// 每秒发送一条消息for (let i = 0; i < 100; i++) {await new Promise(resolve => {setTimeout(() => {resolve(true)}, 1000)})const message = 'Hello RabbitMQ!';channel.sendToQueue(delayQueue, Buffer.from(message));console.log('消息已发送:', message);}} catch (error) {console.error('错误:', error);}
}sendMsg()

这篇关于RabbitMQ 实验消费原始队列消息, 拒绝(reject)投递死信交换机过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/858124

相关文章

redis群集简单部署过程

《redis群集简单部署过程》文章介绍了Redis,一个高性能的键值存储系统,其支持多种数据结构和命令,它还讨论了Redis的服务器端架构、数据存储和获取、协议和命令、高可用性方案、缓存机制以及监控和... 目录Redis介绍1. 基本概念2. 服务器端3. 存储和获取数据4. 协议和命令5. 高可用性6.

PLsql Oracle 下载安装图文过程详解

《PLsqlOracle下载安装图文过程详解》PL/SQLDeveloper是一款用于开发Oracle数据库的集成开发环境,可以通过官网下载安装配置,并通过配置tnsnames.ora文件及环境变... 目录一、PL/SQL Developer 简介二、PL/SQL Developer 安装及配置详解1.下

在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程

《在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程》本文介绍了在Java中使用ModelMapper库简化Shapefile属性转JavaBean的过程,对比... 目录前言一、原始的处理办法1、使用Set方法来转换2、使用构造方法转换二、基于ModelMapper

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat

本地搭建DeepSeek-R1、WebUI的完整过程及访问

《本地搭建DeepSeek-R1、WebUI的完整过程及访问》:本文主要介绍本地搭建DeepSeek-R1、WebUI的完整过程及访问的相关资料,DeepSeek-R1是一个开源的人工智能平台,主... 目录背景       搭建准备基础概念搭建过程访问对话测试总结背景       最近几年,人工智能技术

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

使用C/C++调用libcurl调试消息的方式

《使用C/C++调用libcurl调试消息的方式》在使用C/C++调用libcurl进行HTTP请求时,有时我们需要查看请求的/应答消息的内容(包括请求头和请求体)以方便调试,libcurl提供了多种... 目录1. libcurl 调试工具简介2. 输出请求消息使用 CURLOPT_VERBOSE使用 C

Linux部署jar包过程

《Linux部署jar包过程》文章介绍了在Linux系统上部署Java(jar)包时需要注意的几个关键点,包括统一JDK版本、添加打包插件、修改数据库密码以及正确执行jar包的方法... 目录linux部署jar包1.统一jdk版本2.打包插件依赖3.修改密码4.执行jar包总结Linux部署jar包部署

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf