RabbitMQ消息中间件技术精讲-深入RabbitMQ高级特性-100%投递成功-幂等性概念-TTL队列/消息

本文主要是介绍RabbitMQ消息中间件技术精讲-深入RabbitMQ高级特性-100%投递成功-幂等性概念-TTL队列/消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

       RabbitMQ的高级特性和实际场景应用,包括消息如何保障 100% 的投递成功 ?幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题?Confirm确认消息、Return返回消息,自定义消费者,消息的ACK与重回队列,消息的限流,TTL消息,死信队列等 ...

消息如何保障100?投递成功?

幂等性概念详解
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
Confirm确认消息、 Return返回消息

自定义消费者
消息的ACK与重回队列
消息的限流

TTL消息
死信队列

消息如何保障100%投递成功?

什么是生产端的可靠性投递?

保障消息的成功发出
保障MQ节点的成功接收
发送端收到MQ节点( Broker)确认应答

完善的消息进行补偿机制

生产端-可靠性投递

BAT/TMD互联网大厂的解决方案:

消息落库,对消息状态进行打标----数据中定义不同状态
消息的延迟投递,做二次确认,回调检查

消息落库,对消息状态进行打标(需要对数据库持久化两次,消息状态的修改)

BIZ DB(业务数据库)
MSG DB(消息数据库)

  • step1业务入库和消息入库
  • step2:step1成功,生产端的Sender进行消息发送(消息状态初始值为0)
  • step3:Broker(Server)收到消息并发送应答给生产端的Confirm Listener
  • step4:Confirm Listener异步监听Broker的应答消息并进行判断
    假设step2通过,在step3回送响应时,网络突然出现了闪断,导致生产端的Listener收不到这条消息的confirm应答,消息的状态始终为0
  • step5:分布式定时任务抓取状态为0的消息
  • step6:将状态为0的消息重发
  • step7如果尝试了3次(可按实际情况修改)以上则将状态置为2(消息投递失败状态)


 

网络中断突然borker没有发送消息到producer消费者。可以设置规则,如果初始状态5分钟之类没有收到确认,分布式定时任务处理,找出这部分数据处理,重试次数限制,重试几次,不成功,设置某个状态

 

保障MQ我们思考如果第一种可靠性投递,在高并发的场景下是否适合?
消息的延迟投递,做二次确认,回调检查

保证MQ我们思考如果第一种可靠性投递,在高并发的场景下是否合适

在第一种情况下,高并发的情境下,数据库的两次写操作和读取操作会存在数据库IO瓶颈

解决方案
消息延迟投递,做二次确认,回调检查

目的是为了减少像方案一中的数据库操作

Upstream service:上游服务,可能为生产端
Downstream service:下游服务,可能为消费端
MQ Broker:可能为集群
Callback service:回调服务,监听confirm消息,独立的服务
下述直接假定上述可能

  • step1 业务数据入库,成功后生产端发送消息到Broker
  • step2 消息发送成功之后,生产端发送一条延迟消息(Second Send Delay Check),需要设置延迟时间
  • step3 消息队列进行指定队列的监听,对收到的消息进行处理
  • step4 消费端处理完毕之后,发送Confirm(不是ACK)到Broker
  • step5 Callback service是一个单独的服务,其实它扮演了方案一的存储消息的DB角色,它通过Broker去监听消费端发送的Confirm消息,如果收到消息,那么将消息持久化到DB当中.
  • step6 一定延迟时间之后再次发送消息给Broker,然后还是Callback Service去监听延迟消息所对应的队列.收到之后去检查MSG DB中是否有这条消息,如果存在,通过.不存在或者是消毒费失败了,那么Callback Service就需要主动发起RPC通信给上游服务,告诉它延迟投递的这条消息没有找到,需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去,循环第一步。

方案二主要目的是为了减少对数据库的操作,提高并发量。 而不是关心消息是不是能够100%的投递成功.

幂等性概念

 

幂等性是什么?----不管执行多少次,结果都不变,不重复消费
我们可以借鉴数据库的乐观锁机制:
比如我们执行一条更新库存的SQL语句
UPDATE T_REPS SET COUNT = COUNT-1, VERSION =
VERSION +1
WHERE VERSION =1


消费端-幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
唯一ID+指纹码机制,利用数据库主键去重
利用 Redis的原子性去实现

 

乐观锁方案

借鉴数据库的乐观锁机制,如:

 

 

根据version版本,也就是在操作库存前先获取当前商品的version版本号,然后操作的时候带上此version号。我们梳理下,我们第一次操作库存时,得到version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传如的version还是1,再执行上面的sql语句时,就不会执行;因为version已经变为2了,where条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。

唯一ID+指纹码机制,利用数据库主键去重

  • 唯一ID就是业务表的唯一的主键,如商品ID
  • 指纹码就是为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间戳+业务编号的方式。

上面的sql语句:

  • 返回如果为0 表示没有操作过,那业务操作后就可以insert into t_check(唯一ID+指纹码)
  • 返回如果大于0 表示操作过,就直接返回

好处:实现简单

坏处:高并发下数据库瓶颈

解决方案:根据ID进行分库分表进行算法路由

 

Redis原子操作

利用redis的原子操作,做个操作完成的标记。这个性能就比较好。但会遇到一些问题。

第一:我们是否需要把业务结果进行数据落库,如果落库,关键解决的问题时数据库和redis操作如何做到原子性?

这个意思就是库存减1了,但redis进行操作完成标记时,失败了怎么办?也就是一定要保证落库和redis 要么一起成功,要么一起失败

第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步策略?

这个意思就是库存减1,不落库,直接先操作redis操作完成标记,然后由另外的同步服务进行库存落库,这个就是增加了系统复杂性,而且同步策略如何设置

 

Confirm确认消息

理解 Confirm消息确认机制

消息的确认,是指生产者投递消息后,如果 Broker收到消息,则会给我们生产者一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker,这种方式也是消息的可靠性投递的核心保障!

如何实现 Confirm确认消息?
第一步:在 channel上开启确认模式: channel. confirmSelect0
第二步:在 channel上添加监听: add Confirmlistener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志
等后续处理!

public class Consumer {public static void main(String[] args) throws Exception {//1 创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2 获取C	onnectionConnection connection = connectionFactory.newConnection();//3 通过Connection创建一个新的ChannelChannel channel = connection.createChannel();String exchangeName = "test_confirm_exchange";String routingKey = "confirm.#";String queueName = "test_confirm_queue";//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Keychannel.exchangeDeclare(exchangeName, "topic", true);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);//5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, queueingConsumer);while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端: " + msg);}}
}

 

public class Producer {public static void main(String[] args) throws Exception {//1 创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2 获取C	onnectionConnection connection = connectionFactory.newConnection();//3 通过Connection创建一个新的ChannelChannel channel = connection.createChannel();//4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect();String exchangeName = "test_confirm_exchange";String routingKey = "confirm.save";//5 发送一条消息String msg = "Hello RabbitMQ Send confirm message!";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());//6 添加一个确认监听channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.err.println("-------no ack!-----------");}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.err.println("-------ack!-----------");}});}
}

 

Return消息机制

Return listener用于处理一些不可路由的消息!
我们的消息生产者,通过指定一个 Exchange和 Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作

但是在某些情况下,如果我们在发送消息的时候,当前的 exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return listener!

在基础AP中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,那么 broke端自动删除该消息!
 

public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_return_exchange";String routingKey = "return.#";String queueName = "test_return_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, queueingConsumer);while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费者: " + msg);}}
}

 

public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_return_exchange";String routingKey = "return.save";String routingKeyError = "abc.save";String msg = "Hello RabbitMQ Return Message";channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange,String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("---------handle  return----------");System.err.println("replyCode: " + replyCode);System.err.println("replyText: " + replyText);System.err.println("exchange: " + exchange);System.err.println("routingKey: " + routingKey);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}});channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());}
}

 

消费端自定义监听
 

我们一般就是在代码中编写 While循环,进行 consumer. nextDelivery方法进行获取下一条消息,然后进行消费处理!
但是我们使用自定义的 Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!

public class MyConsumer extends DefaultConsumer {public MyConsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}}

 

public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_consumer_exchange";String routingKey = "consumer.save";String msg = "Hello RabbitMQ Consumer Message";for(int i = 0; i < 5; i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}

 

public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_consumer_exchange";String routingKey = "consumer.#";String queueName = "test_consumer_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);channel.basicConsume(queueName, true, new MyConsumer(channel));}
}

 

消费端限流

什么是消费端的限流?

假设一个场景,首先,我们 RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:
巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!

RabbitMQ提供了一种qoS(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume或者 channe设置Qos的值)未被确认前,不进行消费新的消息。

void BasicQos(uint prefetchSize, ushort prefetch Count, bool global)
 

参数解释:

prefetchSize: 0
prefetchCount:会告诉 RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该 consumer将bock
掉,直到有消息aCk
global: true\false是否将上面设置应用于 channel
简单点说,就是上面限制是 channe级别的还是 consumer级别

注意

prefetchSize和 global这两项, rabbitmq没有实现,暂且不研究
prefetch count在 no ask= false的情况下生效,即在自动应答的
情况下这两个值是不生效的。
 

public class MyConsumer extends DefaultConsumer {private Channel channel;public MyConsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}}

 

public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_qos_exchange";String queueName = "test_qos_queue";String routingKey = "qos.#";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);// 1 限流方式 第一件事就是 autoAck设置为 falsechannel.basicQos(0, 1, false);channel.basicConsume(queueName, false, new MyConsumer(channel));}
}

 

public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_qos_exchange";String routingKey = "qos.save";String msg = "Hello RabbitMQ QOS Message";for(int i = 0; i < 5; i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}

 

消费端ACK与重回队列

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!
如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功

消费端重回队列

消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker
一般我们在实际应用中,都会关闭重回队列,也就是设置为 False

TTL队列/消息

TTL

TTL是 Time To live的缩写,也就是生存时间
RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要
超过了队列的超时时间配置,那么消息会自动的清除

 

死信队列

死信队列:DLX,Dead- Letter- Exchange

利用DLX,当消息在一个队列中变成死信( dead message)之后,
它能被重新 publish到另一个 Exchange,这个 Exchange就是DLX
消息变成死信有一下几种情况

消息被拒绝( basic reject/ basic nack)并且 requeue= false
消息TTL过期
队列达到最大长度

 

DLX也是一个正常的 Exchange,和一般的 Exchange没有区别,它能
在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时, RabbitMQ就会自动的将这个消息重新发布到
设置的 Exchange上去,进而被路由到另一个队列。

可以监听这个队列中消息做相应的处理,这个特性可以弥补 RabbitMQ
30以前支持的 immediate参数的功能。

 

死信队列设置

首先需要设置死信队列的 exchange和 queue,然后进行绑定
Exchange: dlx. exchange
Queue: dlxqueue
Routingkey: #
然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可: arguments,put(" x-dead- -letterexchange",
dlx exchange")

 

public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 这就是一个普通的交换机 和 队列 以及路由String exchangeName = "test_dlx_exchange";String routingKey = "dlx.#";String queueName = "test_dlx_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);Map<String, Object> agruments = new HashMap<String, Object>();agruments.put("x-dead-letter-exchange", "dlx.exchange");// 这个agruments属性,要设置到声明队列上channel.queueDeclare(queueName, true, false, false, agruments);channel.queueBind(queueName, exchangeName, routingKey);// 要进行死信队列的声明:channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);channel.queueDeclare("dlx.queue", true, false, false, null);channel.queueBind("dlx.queue", "dlx.exchange", "#");channel.basicConsume(queueName, true, new MyConsumer(channel));}
}

 

public class MyConsumer extends DefaultConsumer {public MyConsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}}
public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_dlx_exchange";String routingKey = "dlx.save";String msg = "Hello RabbitMQ DLX Message";for(int i = 0; i < 1; i++) {AMQP.BasicProperties properties =new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").expiration("10000").build();channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());}}
}

 

 

 

 

 

 


 

 

 

 

 

这篇关于RabbitMQ消息中间件技术精讲-深入RabbitMQ高级特性-100%投递成功-幂等性概念-TTL队列/消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1000904

相关文章

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

新特性抢先看! Ubuntu 25.04 Beta 发布:Linux 6.14 内核

《新特性抢先看!Ubuntu25.04Beta发布:Linux6.14内核》Canonical公司近日发布了Ubuntu25.04Beta版,这一版本被赋予了一个活泼的代号——“Plu... Canonical 昨日(3 月 27 日)放出了 Beta 版 Ubuntu 25.04 系统镜像,代号“Pluc

一文带你深入了解Python中的GeneratorExit异常处理

《一文带你深入了解Python中的GeneratorExit异常处理》GeneratorExit是Python内置的异常,当生成器或协程被强制关闭时,Python解释器会向其发送这个异常,下面我们来看... 目录GeneratorExit:协程世界的死亡通知书什么是GeneratorExit实际中的问题案例

kotlin中的行为组件及高级用法

《kotlin中的行为组件及高级用法》Jetpack中的四大行为组件:WorkManager、DataBinding、Coroutines和Lifecycle,分别解决了后台任务调度、数据驱动UI、异... 目录WorkManager工作原理最佳实践Data Binding工作原理进阶技巧Coroutine

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在