本文主要是介绍RabbitMQ消息中间件技术精讲-深入RabbitMQ高级特性-100%投递成功-幂等性概念-TTL队列/消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
RabbitMQ的高级特性和实际场景应用,包括消息如何保障 100% 的投递成功 ?幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题?Confirm确认消息、Return返回消息,自定义消费者,消息的ACK与重回队列,消息的限流,TTL消息,死信队列等 ...
消息如何保障100?投递成功?
幂等性概念详解
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
Confirm确认消息、 Return返回消息
自定义消费者
消息的ACK与重回队列
消息的限流
TTL消息
死信队列
消息如何保障100%投递成功?
什么是生产端的可靠性投递?
保障消息的成功发出
保障MQ节点的成功接收
发送端收到MQ节点( Broker)确认应答
完善的消息进行补偿机制
生产端-可靠性投递
BAT/TMD互联网大厂的解决方案:
消息落库,对消息状态进行打标----数据中定义不同状态
消息的延迟投递,做二次确认,回调检查
消息落库,对消息状态进行打标(需要对数据库持久化两次,消息状态的修改)
BIZ DB(业务数据库)
MSG DB(消息数据库)
- step1业务入库和消息入库
- step2:step1成功,生产端的Sender进行消息发送(消息状态初始值为0)
- step3:Broker(Server)收到消息并发送应答给生产端的Confirm Listener
- step4:Confirm Listener异步监听Broker的应答消息并进行判断
假设step2通过,在step3回送响应时,网络突然出现了闪断,导致生产端的Listener收不到这条消息的confirm应答,消息的状态始终为0 - step5:分布式定时任务抓取状态为0的消息
- step6:将状态为0的消息重发
- step7如果尝试了3次(可按实际情况修改)以上则将状态置为2(消息投递失败状态)
网络中断突然borker没有发送消息到producer消费者。可以设置规则,如果初始状态5分钟之类没有收到确认,分布式定时任务处理,找出这部分数据处理,重试次数限制,重试几次,不成功,设置某个状态
保障MQ我们思考如果第一种可靠性投递,在高并发的场景下是否适合?
消息的延迟投递,做二次确认,回调检查
保证MQ我们思考如果第一种可靠性投递,在高并发的场景下是否合适
在第一种情况下,高并发的情境下,数据库的两次写操作和读取操作会存在数据库IO瓶颈
解决方案
消息延迟投递,做二次确认,回调检查
目的是为了减少像方案一中的数据库操作
Upstream service:上游服务,可能为生产端
Downstream service:下游服务,可能为消费端
MQ Broker:可能为集群
Callback service:回调服务,监听confirm消息,独立的服务
下述直接假定上述可能
- step1 业务数据入库,成功后生产端发送消息到Broker
- step2 消息发送成功之后,生产端发送一条延迟消息(Second Send Delay Check),需要设置延迟时间
- step3 消息队列进行指定队列的监听,对收到的消息进行处理
- step4 消费端处理完毕之后,发送Confirm(不是ACK)到Broker
- step5 Callback service是一个单独的服务,其实它扮演了方案一的存储消息的DB角色,它通过Broker去监听消费端发送的Confirm消息,如果收到消息,那么将消息持久化到DB当中.
- step6 一定延迟时间之后再次发送消息给Broker,然后还是Callback Service去监听延迟消息所对应的队列.收到之后去检查MSG DB中是否有这条消息,如果存在,通过.不存在或者是消毒费失败了,那么Callback Service就需要主动发起RPC通信给上游服务,告诉它延迟投递的这条消息没有找到,需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去,循环第一步。
方案二主要目的是为了减少对数据库的操作,提高并发量。 而不是关心消息是不是能够100%的投递成功.
幂等性概念
幂等性是什么?----不管执行多少次,结果都不变,不重复消费
我们可以借鉴数据库的乐观锁机制:
比如我们执行一条更新库存的SQL语句
UPDATE T_REPS SET COUNT = COUNT-1, VERSION =
VERSION +1
WHERE VERSION =1
消费端-幂等性保障
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
唯一ID+指纹码机制,利用数据库主键去重
利用 Redis的原子性去实现
乐观锁方案
借鉴数据库的乐观锁机制,如:
根据version版本,也就是在操作库存前先获取当前商品的version版本号,然后操作的时候带上此version号。我们梳理下,我们第一次操作库存时,得到version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传如的version还是1,再执行上面的sql语句时,就不会执行;因为version已经变为2了,where条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。
唯一ID+指纹码机制,利用数据库主键去重
- 唯一ID就是业务表的唯一的主键,如商品ID
- 指纹码就是为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间戳+业务编号的方式。
上面的sql语句:
- 返回如果为0 表示没有操作过,那业务操作后就可以insert into t_check(唯一ID+指纹码)
- 返回如果大于0 表示操作过,就直接返回
好处:实现简单
坏处:高并发下数据库瓶颈
解决方案:根据ID进行分库分表进行算法路由
Redis原子操作
利用redis的原子操作,做个操作完成的标记。这个性能就比较好。但会遇到一些问题。
第一:我们是否需要把业务结果进行数据落库,如果落库,关键解决的问题时数据库和redis操作如何做到原子性?
这个意思就是库存减1了,但redis进行操作完成标记时,失败了怎么办?也就是一定要保证落库和redis 要么一起成功,要么一起失败
第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步策略?
这个意思就是库存减1,不落库,直接先操作redis操作完成标记,然后由另外的同步服务进行库存落库,这个就是增加了系统复杂性,而且同步策略如何设置
Confirm确认消息
理解 Confirm消息确认机制
消息的确认,是指生产者投递消息后,如果 Broker收到消息,则会给我们生产者一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker,这种方式也是消息的可靠性投递的核心保障!
如何实现 Confirm确认消息?
第一步:在 channel上开启确认模式: channel. confirmSelect0
第二步:在 channel上添加监听: add Confirmlistener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志
等后续处理!
public class Consumer {public static void main(String[] args) throws Exception {//1 创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2 获取C onnectionConnection connection = connectionFactory.newConnection();//3 通过Connection创建一个新的ChannelChannel channel = connection.createChannel();String exchangeName = "test_confirm_exchange";String routingKey = "confirm.#";String queueName = "test_confirm_queue";//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Keychannel.exchangeDeclare(exchangeName, "topic", true);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);//5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, queueingConsumer);while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端: " + msg);}}
}
public class Producer {public static void main(String[] args) throws Exception {//1 创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2 获取C onnectionConnection connection = connectionFactory.newConnection();//3 通过Connection创建一个新的ChannelChannel channel = connection.createChannel();//4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect();String exchangeName = "test_confirm_exchange";String routingKey = "confirm.save";//5 发送一条消息String msg = "Hello RabbitMQ Send confirm message!";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());//6 添加一个确认监听channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.err.println("-------no ack!-----------");}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.err.println("-------ack!-----------");}});}
}
Return消息机制
Return listener用于处理一些不可路由的消息!
我们的消息生产者,通过指定一个 Exchange和 Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作
但是在某些情况下,如果我们在发送消息的时候,当前的 exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return listener!
在基础AP中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,那么 broke端自动删除该消息!
public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_return_exchange";String routingKey = "return.#";String queueName = "test_return_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, queueingConsumer);while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费者: " + msg);}}
}
public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_return_exchange";String routingKey = "return.save";String routingKeyError = "abc.save";String msg = "Hello RabbitMQ Return Message";channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange,String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("---------handle return----------");System.err.println("replyCode: " + replyCode);System.err.println("replyText: " + replyText);System.err.println("exchange: " + exchange);System.err.println("routingKey: " + routingKey);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}});channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());}
}
消费端自定义监听
我们一般就是在代码中编写 While循环,进行 consumer. nextDelivery方法进行获取下一条消息,然后进行消费处理!
但是我们使用自定义的 Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!
public class MyConsumer extends DefaultConsumer {public MyConsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}}
public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_consumer_exchange";String routingKey = "consumer.save";String msg = "Hello RabbitMQ Consumer Message";for(int i = 0; i < 5; i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}
public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_consumer_exchange";String routingKey = "consumer.#";String queueName = "test_consumer_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);channel.basicConsume(queueName, true, new MyConsumer(channel));}
}
消费端限流
什么是消费端的限流?
假设一个场景,首先,我们 RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:
巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
RabbitMQ提供了一种qoS(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume或者 channe设置Qos的值)未被确认前,不进行消费新的消息。
void BasicQos(uint prefetchSize, ushort prefetch Count, bool global)
参数解释:
prefetchSize: 0
prefetchCount:会告诉 RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该 consumer将bock
掉,直到有消息aCk
global: true\false是否将上面设置应用于 channel
简单点说,就是上面限制是 channe级别的还是 consumer级别
注意
prefetchSize和 global这两项, rabbitmq没有实现,暂且不研究
prefetch count在 no ask= false的情况下生效,即在自动应答的
情况下这两个值是不生效的。
public class MyConsumer extends DefaultConsumer {private Channel channel;public MyConsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}}
public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_qos_exchange";String queueName = "test_qos_queue";String routingKey = "qos.#";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);// 1 限流方式 第一件事就是 autoAck设置为 falsechannel.basicQos(0, 1, false);channel.basicConsume(queueName, false, new MyConsumer(channel));}
}
public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_qos_exchange";String routingKey = "qos.save";String msg = "Hello RabbitMQ QOS Message";for(int i = 0; i < 5; i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}
消费端ACK与重回队列
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!
如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功
消费端重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker
一般我们在实际应用中,都会关闭重回队列,也就是设置为 False
TTL队列/消息
TTL
TTL是 Time To live的缩写,也就是生存时间
RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要
超过了队列的超时时间配置,那么消息会自动的清除
死信队列
死信队列:DLX,Dead- Letter- Exchange
利用DLX,当消息在一个队列中变成死信( dead message)之后,
它能被重新 publish到另一个 Exchange,这个 Exchange就是DLX
消息变成死信有一下几种情况
消息被拒绝( basic reject/ basic nack)并且 requeue= false
消息TTL过期
队列达到最大长度
DLX也是一个正常的 Exchange,和一般的 Exchange没有区别,它能
在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时, RabbitMQ就会自动的将这个消息重新发布到
设置的 Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补 RabbitMQ
30以前支持的 immediate参数的功能。
死信队列设置
首先需要设置死信队列的 exchange和 queue,然后进行绑定
Exchange: dlx. exchange
Queue: dlxqueue
Routingkey: #
然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可: arguments,put(" x-dead- -letterexchange",
dlx exchange")
public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 这就是一个普通的交换机 和 队列 以及路由String exchangeName = "test_dlx_exchange";String routingKey = "dlx.#";String queueName = "test_dlx_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);Map<String, Object> agruments = new HashMap<String, Object>();agruments.put("x-dead-letter-exchange", "dlx.exchange");// 这个agruments属性,要设置到声明队列上channel.queueDeclare(queueName, true, false, false, agruments);channel.queueBind(queueName, exchangeName, routingKey);// 要进行死信队列的声明:channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);channel.queueDeclare("dlx.queue", true, false, false, null);channel.queueBind("dlx.queue", "dlx.exchange", "#");channel.basicConsume(queueName, true, new MyConsumer(channel));}
}
public class MyConsumer extends DefaultConsumer {public MyConsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}}
public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_dlx_exchange";String routingKey = "dlx.save";String msg = "Hello RabbitMQ DLX Message";for(int i = 0; i < 1; i++) {AMQP.BasicProperties properties =new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").expiration("10000").build();channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());}}
}
这篇关于RabbitMQ消息中间件技术精讲-深入RabbitMQ高级特性-100%投递成功-幂等性概念-TTL队列/消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!