RabbitMQ 延时消息队列

2024-05-30 18:48
文章标签 队列 消息 rabbitmq 延时

本文主要是介绍RabbitMQ 延时消息队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

https://www.cnblogs.com/xiaoxing/p/9250823.html

一、简述

二、示例demo

  • 单个延迟队列
  • 多个延迟队列 

、简述

延时消息在日常随处可见:

1、订单创建10min之后不发起支付,自动取消。

2、30min定时推送一次邮件信息。

最常用到方式为定时任务轮训,数据量小的时候使用没什么问题 而当有千万甚至上亿的数据量时就会出现数据读取的瓶颈,此时全表扫面进行处理一定是下下策。但是也有比较讨巧的方式,分享公司内部订单拆分的例子:

由于线上每天订单量50万+的增长量,单表早已无法吃撑这个增长的速度。采取的方式为订单归档:线上热数据保留2-3天的数据,其余都归档进入历史订单表中,这样热数据在200万以内。
订单超过10min不支付即取消的功能,可以采取简单的扫表形式而不会出现数据读取性能的问题。

 这样的方式很简单,但需要跟业务进行沟通妥协,本文会讲另一种方式即RabbitMQ延迟队列。RabbitMQ实际并没有直接实现延时队列,但可利用RabbitMQ提供的属性来模拟延时队列,甚至已经有的配套的插件rabbitmq_delayed_message_exchange 下面先介绍使用到的RabbitMQ的属性。

1、消息的Time To Live (TTL) 

x-message-ttl:消息过期时间,超过过期时间之后即变为死信(Dead-letter)不会再被消费者消费。

设置消息TTL有两种方式:

  • 创建队列时指定x-message-ttl,此时队列所有的消息具有统一过期时间。
  • 发送消息为每个消息设置 expiration,此时消息之间过期时间不同。 

如果两者都设置,过期时间取两者最小。如果设置TTL为0即表示除非立马能发送到队列,否则直接丢弃该消息。利用TTL为0的特性再结合死信转发器可以替代RabbitMQ 3.0的immediate参数。

2、队列的TTL

x-expires: RabbitMQ会确保时间达到后将队列删除,但是并不保障这个动作有多及时。队列过期代表着处于未使用状态,即

  • 队列无任何消费者
  • 队列没有被重新声明
  • 队列在过期未调用Basic.Get命令获取消息

3、x-dead-letter-exchange(RabbitMQ文档):死信转发器(转发器类型)当消息达到过期时间未被消费则会由该exchange按照配置的x-dead-letter-routing-key转发到指定队列,最后被消费者消费,如果未配置x-dead-letter-routing-key则会按照原队列的key进行转发。

4、队列的消息在以下几种情况会变成死信(Dead-letter)

  • 设置的x-message-ttl或者expiration到期,即消息过期
  • 消息被消费者拒绝(调用Basic.Reject / Basic.Nack)且 requeue参数设置为false
  • 队列达到最大长度

 

二、示例demo

  • 单个延迟队列

RabbitMQ延时队列逻辑:

 

 

  

1、exchange_delay_begin:缓冲队列exchange交换器,用于将消息转发至缓存消息队列 queue_delay_begin 。

2、exchange_delay_done:死信(dead-letter)队列exchange交换器,用于将队列 queue_delay_begin 转发到死信队列。

3、queue_delay_begin:缓冲消息队列,等待消息过期。

4、queue_delay_done:死信消息队列,消费者能够真正消费信息。

 spring-rabbitmq.xml :

复制代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.1.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"><!--配置connection-factory,指定连接rabbit server参数 --><rabbit:connection-factory id="connectionFactory"  username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true"/><!-- 延时队列 --><rabbit:direct-exchange id="exchange_delay_begin" name="exchange_delay_begin"  durable="false" auto-delete="false" ><rabbit:bindings><rabbit:binding queue="queue_delay_begin" key="delay" /></rabbit:bindings></rabbit:direct-exchange><rabbit:queue name="queue_delay_begin" durable="false"><rabbit:queue-arguments><!--  队列过期时间 --><entry key="x-message-ttl" value="30000" value-type="java.lang.Long" /><entry key="x-dead-letter-exchange" value="exchange_delay_done" /><entry key="x-dead-letter-routing-key" value="delay" /></rabbit:queue-arguments></rabbit:queue><rabbit:direct-exchange id="exchange_delay_done" name="exchange_delay_done"  durable="false" auto-delete="false" ><rabbit:bindings><rabbit:binding queue="queue_delay_done" key="delay" /><!--  binding key 相同为 【delay】exchange转发消息到多个队列 --><!--<rabbit:binding queue="queue_delay_done_two" key="delay" />--></rabbit:bindings></rabbit:direct-exchange><rabbit:queue name="queue_delay_done" durable="false"/><rabbit:template id="delayMsgTwoTemplate" connection-factory="connectionFactory" /><bean id="messageConsumer" class="com.nancy.rabbitmq.demo.MessageConsumer"></bean><!-- 消息接收者 --><rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" ><rabbit:listener queues="queue_delay_done" ref="messageConsumer" /></rabbit:listener-container>
</beans>

复制代码

 DelayMessageProducer.java

复制代码

@Service
public class DelayMessageProducer {@Resource(name="delayMsgTwoTemplate")private AmqpTemplate delayMsgTwoTemplate;public void delayMsgTwo(String exchange, String routingKey, Object msg) {delayMsgTwoTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(String.valueOf(10000));return message;}});}
}

复制代码

 MessageConsumer.java

复制代码

public class MessageConsumer implements MessageListener {@Overridepublic void onMessage(Message message) {System.out.println("consumer receive message 22------->:{}"+ message);}
}

复制代码

 application.xml 

复制代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"><import resource="spring-rabbitmq.xml" /><!-- 扫描指定package注释的注册为Spring Beans --><context:component-scan base-package="com.nancy.rabbitmq" /><!-- 激活annotation功能 --><context:annotation-config /><!-- 激活annotation功能 --><context:spring-configured />
</beans>

复制代码

DelayQueueTest.java

复制代码

public class DelayQueueTest {private ApplicationContext context = null;@org.junit.Beforepublic void setUp() throws Exception {context = new ClassPathXmlApplicationContext("rabbitmq/application.xml");}@Testpublic void delayQueueTest() throws Exception {DelayMessageProducer messageProducer = context.getBean(DelayMessageProducer.class);int a = 10;while (a > 0) {System.out.println("send "+ a);messageProducer.delayMsgTwo("exchange_delay_begin","delay", "hello world delay2 :" + a--);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("sended ");Thread.sleep(1000*60);}
}

复制代码

 运行结果: 发送消息 10s之后, 消费监听到消息 消费。

复制代码

send 10
send 9
send 8
send 7
send 6
send 5
send 4
send 3
send 2
send 1
sended 
consumer receive message 22------->:{}(Body:'hello world delay2 :10' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :9' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :8' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :7' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :6' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :5' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :4' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=7, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :3' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=8, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :2' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=9, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :1' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=10, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])

复制代码

  

  • 多个延迟队列

实际的业务需求中会出现不同的时间延迟,此时可设置多个队列以达到不同的延迟效果。例如5个队列 common-queue_5s、common-queue_15s、common-queue_30s、common-queue_45s、common-queue_50s达到不同的延迟效果,整体的结构如下:

 

 上述bindingKey的值有所简化,但对路由结构图无影响。这里使用一个死信转发器(转发器类型)通过绑定不同的key路由到不同的死信队列。也可以死信转发器和死信队列一对一绑定 即 成对出现(dlx_exchange_5s 和 dead-letter-queue_5s)

这里贴出部分xml部分配置:

复制代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"><rabbit:connection-factoryid="connectionFactory"host="${rabbit.host}"port="${rabbit.port}"username="${rabbit.username}"password="${rabbit.password}"publisher-confirms="true"/><rabbit:admin connection-factory="connectionFactory" ignore-declaration-exceptions="true" /><!-- 正常队列 --><!-- 5s过期 --><rabbit:queue name="common-queue_5s"><rabbit:queue-arguments><entry key="x-message-ttl" value="5000" value-type="java.lang.Long" /><entry key="x-dead-letter-exchange" value="dlx-exchange" /><entry key="x-dead-letter-routing-key" value="dead-letter-queue_5s" /></rabbit:queue-arguments></rabbit:queue><!-- 15s过期 --><rabbit:queue name="common-queue_15s"><rabbit:queue-arguments><entry key="x-message-ttl" value="15000" value-type="java.lang.Long" /><entry key="x-dead-letter-exchange" value="dlx-exchange" /><entry key="x-dead-letter-routing-key" value="dead-letter-queue_15s" /></rabbit:queue-arguments></rabbit:queue><!-- 30s过期 --><rabbit:queue name="common-queue_30s"><rabbit:queue-arguments><entry key="x-message-ttl" value="30000" value-type="java.lang.Long" /><entry key="x-dead-letter-exchange" value="dlx-exchange" /><entry key="x-dead-letter-routing-key" value="dead-letter-queue_30s" /></rabbit:queue-arguments></rabbit:queue><!-- 45s过期 --><rabbit:queue name="common-queue_45s"><rabbit:queue-arguments><entry key="x-message-ttl" value="45000" value-type="java.lang.Long" /><entry key="x-dead-letter-exchange" value="dlx-exchange" /><entry key="x-dead-letter-routing-key" value="dead-letter-queue_45s" /></rabbit:queue-arguments></rabbit:queue><!-- 50s过期 --><rabbit:queue name="common-queue_50s"><rabbit:queue-arguments><entry key="x-message-ttl" value="50000" value-type="java.lang.Long" /><entry key="x-dead-letter-exchange" value="dlx-exchange" /><entry key="x-dead-letter-routing-key" value="dead-letter-queue_50s" /></rabbit:queue-arguments></rabbit:queue><!-- 正常路由 --><rabbit:direct-exchange name="common-exchange" durable="false" id="common-exchange"><rabbit:bindings><rabbit:binding queue="common-queue_5s" /><rabbit:binding queue="common-queue_15s" /><rabbit:binding queue="common-queue_30s" /><rabbit:binding queue="common-queue_45s" /><rabbit:binding queue="common-queue_50s" /></rabbit:bindings></rabbit:direct-exchange><!-- 死信队列 --><rabbit:queue name="dead-letter-queue_5s" /><rabbit:queue name="dead-letter-queue_15s" /><rabbit:queue name="dead-letter-queue_30s" /><rabbit:queue name="dead-letter-queue_45s" /><rabbit:queue name="dead-letter-queue_50s" /><rabbit:direct-exchange name="dlx-exchange" durable="false" id="dlx-exchange"><rabbit:bindings><rabbit:binding queue="dead-letter-queue_5s" /><rabbit:binding queue="dead-letter-queue_15s" /><rabbit:binding queue="dead-letter-queue_30s" /><rabbit:binding queue="dead-letter-queue_45s" /><rabbit:binding queue="dead-letter-queue_50s" /></rabbit:bindings></rabbit:direct-exchange><!-- 配置consumer, 监听的类和queue的对应关系 --><rabbit:listener-containerconnection-factory="connectionFactory" acknowledge="manual" ><rabbit:listener queues="dead-letter-queue_5s" ref="receiveConfirmTestListener" /><rabbit:listener queues="dead-letter-queue_15s" ref="receiveConfirmTestListener" /><rabbit:listener queues="dead-letter-queue_30s" ref="receiveConfirmTestListener" /><rabbit:listener queues="dead-letter-queue_45s" ref="receiveConfirmTestListener" /><rabbit:listener queues="dead-letter-queue_50s" ref="receiveConfirmTestListener" /></rabbit:listener-container></beans>

复制代码

 junit测试:

复制代码

@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration(locations = {"classpath:application-context.xml"})  
public class TestDeadLetter {  @Autowired  private DeadLetterPublishService publishService;  @Testpublic void testALL() throws InterruptedException{String message = "currentTime:" + System.currentTimeMillis();System.out.println("test1---message: "+ message);publishService.send("common-exchange","common-queue_5s", message);publishService.send("common-exchange","common-queue_15s", message);publishService.send("common-exchange","common-queue_30s", message);publishService.send("common-exchange","common-queue_45s", message);publishService.send("common-exchange","common-queue_50s", message);Thread.sleep(100000);}} 

复制代码

 最后运行结果:消息实际发送时间点 和 消息被延迟消费时间点无限接近 五个消息分别延迟大约 5s 15s 30s 45s 50s  但做不到精确一致。

复制代码

test1---message: currentTime:1566920053524
// 。。。。
1566920058551 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:18 CST 2019, routing-keys=[common-queue_5s], queue=common-queue_5s}], x-first-death-reason=expired, x-first-death-queue=common-queue_5s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_5s, deliveryTag=1, messageCount=0]:currentTime:1566920053524
1566920068578 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:28 CST 2019, routing-keys=[common-queue_15s], queue=common-queue_15s}], x-first-death-reason=expired, x-first-death-queue=common-queue_15s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_15s, deliveryTag=1, messageCount=0]:currentTime:1566920053524
1566920083550 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:43 CST 2019, routing-keys=[common-queue_30s], queue=common-queue_30s}], x-first-death-reason=expired, x-first-death-queue=common-queue_30s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_30s, deliveryTag=1, messageCount=0]:currentTime:1566920053524
1566920098549 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:58 CST 2019, routing-keys=[common-queue_45s], queue=common-queue_45s}], x-first-death-reason=expired, x-first-death-queue=common-queue_45s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_45s, deliveryTag=1, messageCount=0]:currentTime:1566920053524
1566920103551 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:35:03 CST 2019, routing-keys=[common-queue_50s], queue=common-queue_50s}], x-first-death-reason=expired, x-first-death-queue=common-queue_50s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_50s, deliveryTag=1, messageCount=0]:currentTime:1566920053524

复制代码

 

  

 

这篇关于RabbitMQ 延时消息队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1016721

相关文章

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

消息认证码解析

1. 什么是消息认证码         消息认证码(Message Authentication Code)是一种确认完整性并进行认证的技术,取三个单词的首字母,简称为MAC。         消息认证码的输入包括任意长度的消息和一个发送者与接收者之间共享的密钥,它可以输出固定长度的数据,这个数据称为MAC值。         根据任意长度的消息输出固定长度的数据,这一点和单向散列函数很类似

【数据结构与算法 经典例题】使用队列实现栈(图文详解)

💓 博客主页:倔强的石头的CSDN主页               📝Gitee主页:倔强的石头的gitee主页    ⏩ 文章专栏:《数据结构与算法 经典例题》C语言                                   期待您的关注 ​​ 目录  一、问题描述 二、前置知识 三、解题思路 四、C语言实现代码 🍃队列实现代码:

RabbitMQ实践——临时队列

临时队列是一种自动删除队列。当这个队列被创建后,如果没有消费者监听,则会一直存在,还可以不断向其发布消息。但是一旦的消费者开始监听,然后断开监听后,它就会被自动删除。 新建自动删除队列 我们创建一个名字叫queue.auto.delete的临时队列 绑定 我们直接使用默认交换器,所以不用创建新的交换器,也不用建立绑定关系。 实验 发布消息 我们在后台管理页面的默认交换器下向这个队列

Java并发编程—阻塞队列源码分析

在前面几篇文章中,我们讨论了同步容器(Hashtable、Vector),也讨论了并发容器(ConcurrentHashMap、CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便。今天我们来讨论另外一类容器:阻塞队列。   在前面我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了D

剑指offer—编程题7(用两个栈实现一个队列)

题目:用两个栈实现一个队列。队列的声明如下,请实现它的两个函数appendTail 和deleteHead,分别完成在队列尾部插入结点和在队列头部删除结点的功能。 代码如下: [java]  view plain copy print ? public class Test07 {       /**       * 用两个栈模拟的队列       *

rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费

业务描述 由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个broker组成的集群):  producer如何实现 producer只需发送消息时调用如下方法即可 /*** 发送有序消息** @param messageMap 消息数据* @param

Spring 集成 RabbitMQ 与其概念,消息持久化,ACK机制

目录 RabbitMQ 概念exchange交换机机制 什么是交换机binding?Direct Exchange交换机Topic Exchange交换机Fanout Exchange交换机Header Exchange交换机RabbitMQ 的 Hello - Demo(springboot实现)RabbitMQ 的 Hello Demo(spring xml实现)RabbitMQ 在生产环境

Spring boot+RabbitMQ环境

消息队列在目前分布式系统下具备非常重要的地位,如下的场景是比较适合消息队列的: 跨系统的调用,异步性质的调用最佳。高并发问题,利用队列串行特点。订阅模式,数据被未知数量的消费者订阅,比如某种数据的变更会影响多个系统的数据,订单数据就是比较好理解的。 之前有一个场景是商品数据在修改后需要推送到elasticsearch中,由于修改产品的并发量以及数据量均不大,所以对于消息未做持久化,而且为了快速

SpringBoot中如何监听两个不同源的RabbitMQ消息队列

spring-boot如何配置监听两个不同的RabbitMQ 由于前段时间在公司开发过程中碰到了一个问题,需要同时监听两个不同的rabbitMq,但是之前没有同时监听两个RabbitMq的情况,因此在同事的帮助下,成功实现了监听多个MQ。下面我给大家一步一步讲解下,也为自己做个笔记; 详细步骤: 1. application.properties 文件配置: u.rabbitmq.ad