消费者 ack 以及 生产者 confirm

2024-06-24 07:48

本文主要是介绍消费者 ack 以及 生产者 confirm,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RabbitMQ

https://my.oschina.net/u/3523423/blog/1620885

这篇文章主要讲 RabbitMQ 中 消费者 ack 以及 生产者 confirms。

如上图,生产者把消息发送到 RabbitMQ,然后 RabbitMQ 再把消息投递到消费者。

生产者和 RabbitMQ,以及 RabbitMQ 和消费者都是通过 TCP 连接,但是他们之间是通过信道(Channel)传递数据的。多个线程共享一个连接,但是每个线程拥有独自的信道。

消费者 ack

  • 问题:怎么保证 RabbitMQ 投递的消息被成功投递到了消费者?

    RabbitMQ 投递的消息,刚投递一半,产生了网络抖动,就有可能到不了消费者。

  • 解决办法:

    RabbitMQ 对消费者说:“如果你成功接收到了消息,给我说确认收到了,不然我就当你没有收到,我还会重新投递”

在 RabbitMQ 中,有两种 acknowledgement 模式。

自动 acknowledgement 模式

这也称作发后即忘模式

在这种模式下,RabbitMQ 投递了消息,在投递成功之前,如果消费者的 TCP 连接 或者 channel 关闭了,这条消息就会丢失。

会有丢失消息问题。

手动 acknowledgement 模式

在这种模式下,RabbitMQ 投递了消息,在投递成功之前,如果消费者的 TCP 连接 或者 channel 关闭了,导致这条消息没有被 acked,RabbitMQ 会自动把当前消息重新入队,再次投递。

会有重复投递消息的问题,所以消费者得准备好处理重复消息的问题,就是所谓的:幂等性。

为了启用 手动 ack 模式,消费者需要实现 ChannelAwareMessageListener 接口。

@Component
public class Consumer implements ChannelAwareMessageListener {@Autowiredprivate MessageConverter messageConverter;@Overridepublic void onMessage(Message message, Channel channel) throws Exception {MessageProperties messageProperties = message.getMessageProperties();// 代表投递的标识符,唯一标识了当前信道上的投递,通过 deliveryTag ,消费者就可以告诉 RabbitMQ 确认收到了当前消息,见下面的方法long deliveryTag = messageProperties.getDeliveryTag();// 如果是重复投递的消息,redelivered 为 trueBoolean redelivered = messageProperties.getRedelivered();// 获取生产者发送的原始消息Object originalMessage = messageConverter.fromMessage(message);Console.log("consume message = {} , deliveryTag = {} , redelivered = {}", originalMessage, deliveryTag, redelivered);// 代表消费者确认收到当前消息,第二个参数表示一次是否 ack 多条消息channel.basicAck(deliveryTag, false);// 代表消费者拒绝一条或者多条消息,第二个参数表示一次是否拒绝多条消息,第三个参数表示是否把当前消息重新入队
//        channel.basicNack(deliveryTag, false, false);// 代表消费者拒绝当前消息,第二个参数表示是否把当前消息重新入队
//        channel.basicReject(deliveryTag,false);}
}
  • channel.basicAck

    代表消费者确认收到当前消息,语义上表示消费者成功处理了当前消息。

  • channel.basicNack

    代表消费者拒绝一条或者多条消息。basicNack 算是 basicReject 的一个扩展,因为 basicReject 不能一次拒绝多条消息。

  • channel.basicReject

    代表消费者拒绝这条消息,语义上表示消费者没有处理当前消息。

    对于 basicNack 和 basicReject ,如果参数 boolean requeue 传入 false,消息还是会从队列里面删除。这三个方法只是语义上的不同。

  • deliveryTag

    deliveryTag 是 64 bit long 值,从 1 开始,不停的递增 1。不同的 channel 有独立的 deliveryTag。比如有两个消费者,你会发现,都是从 1 开始递增,互不影响。

由于上面创建的消费者,没有指明监听那个队列,所以还需要创建一个 MessageListenerContainer

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, ChannelAwareMessageListener listener) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 指定消费者container.setMessageListener(listener);// 指定监听的队列container.setQueueNames(QUEUE_NAME);// 设置消费者的 ack 模式为手动确认模式container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setPrefetchCount(300);return container;
}

这样就开启了消费者手动 ack 模式。

注意

如果开启了消费者手动 ack 模式,但是又没有调用手动确认方法(比如:channel.basicAck),那问题就大了,RabbitMQ 会在当前 channel 上一直阻塞,等待消费者 ack。

生产者 confirms

  • 问题:怎么保证生产者发送的消息被 RabbitMQ 成功接收?

    生产者发送的消息,刚发送一半,产生了网络抖动,就有可能到不了 RabbitMQ。

  • 解决办法:

    生产者对 RabbitMQ 说:“如果你成功接收到了消息,给我说确认收到了,不然我就当你没有收到”

自定义消息元数据

/*** 自定义消息元数据*/
@NoArgsConstructor
@Data
public class RabbitMetaMessage implements Serializable{/*** 是否是 returnCallback*/private boolean returnCallback;/*** 承载原始消息数据数据*/private Object payload;public RabbitMetaMessage(Object payload) {this.payload = payload;}
}
  • returnCallback 标记当前消息是否触发了 returnCallback(后面会解释)
  • payload 保存原始消息数据

生产者

先把消息存储到 redis,再发送到 rabbitmq

@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate DefaultKeyGenerator keyGenerator;@GetMapping("/sendMessage")public Object sendMessage() {new Thread(() -> {HashOperations hashOperations = redisTemplate.opsForHash();for (int i = 0; i < 1; i++) {String id = keyGenerator.generateKey() + "";String value = "message " + i;RabbitMetaMessage rabbitMetaMessage = new RabbitMetaMessage(value);// 先把消息存储到 redishashOperations.put(RedisConfig.RETRY_KEY, id, rabbitMetaMessage);Console.log("send message = {}", value);// 再发送到 rabbitmqrabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, value, (message) -> {message.getMessageProperties().setMessageId(id);return message;}, new CorrelationData(id));}}).start();return "ok";}}

配置 ConnectionFactory

@Bean
public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.238.132", 5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 设置 生产者 confirmsconnectionFactory.setPublisherConfirms(true);// 设置 生产者 ReturnsconnectionFactory.setPublisherReturns(true);return connectionFactory;
}

配置 RabbitTemplate

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 必须设置为 true,不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调// 而且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallbackrabbitTemplate.setMandatory(true);// 设置 ConfirmCallback 回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {Console.log("ConfirmCallback , correlationData = {} , ack = {} , cause = {} ", correlationData, ack, cause);// 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false// 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意)if (ack) {String messageId = correlationData.getId();RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);Console.log("rabbitMetaMessage = {}", rabbitMetaMessage);if (!rabbitMetaMessage.isReturnCallback()) {// 到这一步才能完全保证消息成功发送到了 rabbitmq// 删除 redis 里面的消息redisTemplate.opsForHash().delete(RedisConfig.RETRY_KEY, messageId);}}});// 设置 ReturnCallback 回调// 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调rabbitTemplate.setReturnCallback((message, replyCode, replyText,exchange, routingKey) -> {Console.log("ReturnCallback unroutable messages, message = {} , replyCode = {} , replyText = {} , exchange = {} , routingKey = {} ", message, replyCode, replyText, exchange, routingKey);// 从 redis 取出消息,设置 returnCallback 设置为 trueString messageId = message.getMessageProperties().getMessageId();RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);rabbitMetaMessage.setReturnCallback(true);redisTemplate.opsForHash().put(RedisConfig.RETRY_KEY, messageId, rabbitMetaMessage);});return rabbitTemplate;
}

ReturnCallback 回调

必须 rabbitTemplate.setMandatory(true),不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调。而且 ReturnCallback 比 ConfirmCallback 先回调。

如何模拟 发送到交换器成功,但是没有匹配的队列,先把项目启动,然后再把队列解绑,再发送消息,就会触发 ReturnCallback 回调,而且发现消息也丢失了,没有到任何队列。

这样就解绑了。

运行项目,然后打开浏览器,输入 http://localhost:9999/sendMessage

控制台打出如下日志

这样就触发了 ReturnCallback 回调 ,从 redis 取出消息,设置 returnCallback 设置为 true。你会发现 ConfirmCallback 的 ack 返回值还是 true。

ConfirmCallback 回调

这里有个需要注意的地方,如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意,就像上面那种情况!!!)。所以不能单靠这个来判断消息真的发送成功了。这个时候会先触发 ReturnCallback 回调,我们把 returnCallback 设置为 true,所以还得判断 returnCallback 是否为 true,如果为 ture,表示消息发送不成功,false 才能完全保证消息成功发送到了 rabbitmq。

如何模拟 ack 返回值为 false,先把项目启动,然后再把交换器删除,就会发现 ConfirmCallback 的 ack 为 false。

运行项目,然后打开浏览器,输入 http://localhost:9999/sendMessage

控制台打出如下日志

你会发现 ConfirmCallback 的 ack 返回值才是 false。

注意

不能单单依靠 ConfirmCallback 的 ack 返回值为 true,就断定当前消息发送成功了。

源码地址

  • GitHub

参考资料

Consumer Acknowledgements and Publisher Confirms

结语

由于本人知识和能力有限,文中如有没说清楚的地方,希望大家能在评论区指出,以帮助我将博文写得更好。

这篇关于消费者 ack 以及 生产者 confirm的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1089553

相关文章

java线程深度解析(五)——并发模型(生产者-消费者)

http://blog.csdn.net/Daybreak1209/article/details/51378055 三、生产者-消费者模式     在经典的多线程模式中,生产者-消费者为多线程间协作提供了良好的解决方案。基本原理是两类线程,即若干个生产者和若干个消费者,生产者负责提交用户请求任务(到内存缓冲区),消费者线程负责处理任务(从内存缓冲区中取任务进行处理),两类线程之

数字经济时代,零售企业如何实现以消费者为中心的数字化转型?

在数字经济时代,零售企业正面临着前所未有的挑战与机遇。随着消费者行为的数字化和多样化,传统的零售模式已难以满足市场需求。为了在激烈的市场竞争中立于不败之地,零售企业必须实现以消费者为中心的数字化转型。这一转型不仅仅是技术的升级,更是一场涉及企业战略、组织结构、运营模式和人才管理的深刻变革。本文将探讨零售企业在数字化转型过程中遇到的难点,并提出相应的解决策略,通过实际案例分析,展示如何通过综合措施进

生产者消费者模型(能看懂文字就能明白系列)

系列文章目录 能看懂文字就能明白系列 C语言笔记传送门 Java笔记传送门 🌟 个人主页:古德猫宁- 🌈 信念如阳光,照亮前行的每一步 前言 本节目标: 理解什么是阻塞队列,阻塞队列与普通队列的区别理解什么是生产者消费者模型生产者消费者模型的主要作用 一、阻塞队列 阻塞独立是一个特殊的队列,它具有以下特点: 线程安全带有阻塞特性:即如果队列为空,这时继续出队列的话,

三个同步与互斥问题之生产者与消费者

#include<stdio.h> #include<pthread.h> pthread_mutex_t  mutex; #define Max 10 pthread_cond_t pro; pthread_cond_t con; int buffer=0;//全局变量----一开始为0,只有生产者可以执行 void deal_produce(

序列号SYN+确认号ACK

处于对于wireshark中的SYN和ACK如何计算出来的疑惑 找的这篇译文! From:  http://blog.csdn.net/a19881029/article/details/38091243 原文见:http://packetlife.net/blog/2010/jun/7/understanding-tcp-sequence-acknowledgment-numbers/

编写一个生产者消费者模式的JAVA工程

编写一个生产者消费者模式的JAVA工程; 要求: 1)符合生产者消费者模式,避免出现资源访问冲突; 2)输出生产和消费的执行过程; 3)分别统计生产者和消费者的执行时长和等待时长(目前还不知道怎么搞,其他的参考http://blog.csdn.net/monkey_d_meng/article/details/6251879) 创建类Storage,作为仓库 import java.ut

Kafka【十二】消费者拉取主题分区的分配策略

【1】消费者组、leader和follower 消费者想要拉取主题分区的数据,首先必须要加入到一个组中。 但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办? 所以这里,我们需要学习Kafka中基本的消费者组中的消费者和分区之间的分配规则: 同一个消费者组的消费者都订

Kafka【十三】消费者消费消息的偏移量

偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据。如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。 【1】起始偏移量 在消费者的配置中,我们可以增加偏移量相关参数auto.offset.re

生产者-消费者,使用C++11的版本

前言 multi-threading以及lambda是C++11的重要升级,下面的经典的生产者-消费者的代码,既使用了C++11的multi-threading相关的库, 又使用了lambda。代码中有注释,应该比较详细。 Talk is cheap show me the code #include <iostream> #include <queue>#inc

java线程 yield,sleep,join,synchronized wait notify notifyAll,ReentrantLock lock condition, 生产者消费者

yield,sleep,join yield,join,sleep,join是Thread中的方法,不需要 在synchronized 代码块中调用,和synchronized 没关系,也不会释放锁。 Thread.sleep(100);Thread.yield();Thread t;t.join(); (1)yield()不一定保证让出cpu yield()只是使当前线程重新回