手搭手RocketMQ重试机制

2024-03-17 12:28
文章标签 rocketmq 机制 搭手 重试

本文主要是介绍手搭手RocketMQ重试机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

dynamic-datasource

3.6.1

mybatis-plus

3.5.3.2

rocketmq

4.9.4

加入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><!-- 排除logback依赖 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--Log4j2场景启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3</version><exclusions><exclusion><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.14</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1</version></dependency><dependency><groupId>p6spy</groupId><artifactId>p6spy</artifactId><version>3.9.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version></dependency></dependencies>

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

Rocket重试机制

RocketMQ的重试机制是指:当消费者消费消息失败时,RocketMQ会在一定时间后重新将消息发送给消费者进行消费,以确保消息的可靠消费。

自动重试:Consumer在消费失败后,会在一定重试策略下定期重试消费失败的消息,直到成功或达到最大重试次数。

消息重发:如果Consumer在最大重试次数内仍然消费失败,Broker会定期扫描被标记为消费失败的消息,并将其重发给其他Consumer。

灵活的重试策略:RocketMQ提供多种重试策略来控制重试时机和频率,主要有:

生产者重试

生产者设置消息失败后重试次数

//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);

Int 重试的次数

//重试

@Test
void retryProducerTest()throws Exception{//创建生产者DefaultMQProducer producer = new DefaultMQProducer("retryGroup");//连接namesrvproducer.setNamesrvAddr("192.168.68.133:9876");//启动producer.start();producer.setDefaultTopicQueueNums(1);//自身业务key唯一String Key = UUID.randomUUID().toString();System.out.println(Key);//重试//同步producer.setRetryTimesWhenSendFailed(3);//异步//producer.setRetryTimesWhenSendAsyncFailed(2);//创建消息Message message = new Message("retry", null,Key, "重试测试内存内容".getBytes());//发送消息producer.send(message);System.out.println("发送成功");//关闭生产者producer.shutdown();
}

消费者重试

设置自定义最大重试

consumer.setMaxReconsumeTimes(6);

死信消息(超过重试次数,并未处理的消息),放在死信主题中,%DLQ% retry

@Test
void retryConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅主题   *表示该主题的所有消息consumer.subscribe("retry","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("消息内容"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

死信处理方案

死信处理方案1、单独订阅死信主题2、监听死信主题(业务流程控制)

通过存入单独数据库表,业务发送短信等方式通知人工处理

1、单独订阅死信主题

单独订阅监听主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅死信主题   *表示该主题的所有消息consumer.subscribe("%DLQ%retryConsumerTest","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

2、监听死信主题(业务流程控制)

通过业务流程监听多个主题

//死信处理方案二、监听死信主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//设置每次重试次数consumer.setMaxReconsumeTimes(3);// 订阅需要的多个主题列表List<String> topics = Arrays.asList("retry", "TopicA", "TopicB");// 订阅主题列表中的所有主题for (String topic : topics) {consumer.subscribe(topic, "*"); // 这里的tag是用来过滤消息的,"*" 表示接收这个主题下的所有消息}//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {try{//业务代码System.out.println("业务代码");System.out.println("消息内容"+new String(messageExt.getBody()));int i =1/0;//模拟代码出错}catch (Exception e){//获取重试次数int reconsumeTimes = messageExt.getReconsumeTimes();String key = messageExt.getKeys();if (reconsumeTimes > 2){OrderLog orderLog = new OrderLog();orderLog.setType(2);orderLog.setOrderid(key);orderLog.setUsername("重试超过2次,死信消息");orderMapper.insert(orderLog);System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));//发送短信通知人工处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}else {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

这篇关于手搭手RocketMQ重试机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/818942

相关文章

Android ClassLoader加载机制详解

《AndroidClassLoader加载机制详解》Android的ClassLoader负责加载.dex文件,基于双亲委派模型,支持热修复和插件化,需注意类冲突、内存泄漏和兼容性问题,本文给大家介... 目录一、ClassLoader概述1.1 类加载的基本概念1.2 android与Java Class

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

Redis的持久化之RDB和AOF机制详解

《Redis的持久化之RDB和AOF机制详解》:本文主要介绍Redis的持久化之RDB和AOF机制,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述RDB(Redis Database)核心原理触发方式手动触发自动触发AOF(Append-Only File)核

PostgreSQL中MVCC 机制的实现

《PostgreSQL中MVCC机制的实现》本文主要介绍了PostgreSQL中MVCC机制的实现,通过多版本数据存储、快照隔离和事务ID管理实现高并发读写,具有一定的参考价值,感兴趣的可以了解一下... 目录一 MVCC 基本原理python1.1 MVCC 核心概念1.2 与传统锁机制对比二 Postg

Maven 配置中的 <mirror>绕过 HTTP 阻断机制的方法

《Maven配置中的<mirror>绕过HTTP阻断机制的方法》:本文主要介绍Maven配置中的<mirror>绕过HTTP阻断机制的方法,本文给大家分享问题原因及解决方案,感兴趣的朋友一... 目录一、问题场景:升级 Maven 后构建失败二、解决方案:通过 <mirror> 配置覆盖默认行为1. 配置示

Redis过期删除机制与内存淘汰策略的解析指南

《Redis过期删除机制与内存淘汰策略的解析指南》在使用Redis构建缓存系统时,很多开发者只设置了EXPIRE但却忽略了背后Redis的过期删除机制与内存淘汰策略,下面小编就来和大家详细介绍一下... 目录1、简述2、Redis http://www.chinasem.cn的过期删除策略(Key Expir

Go语言中Recover机制的使用

《Go语言中Recover机制的使用》Go语言的recover机制通过defer函数捕获panic,实现异常恢复与程序稳定性,具有一定的参考价值,感兴趣的可以了解一下... 目录引言Recover 的基本概念基本代码示例简单的 Recover 示例嵌套函数中的 Recover项目场景中的应用Web 服务器中

Jvm sandbox mock机制的实践过程

《Jvmsandboxmock机制的实践过程》:本文主要介绍Jvmsandboxmock机制的实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景二、定义一个损坏的钟1、 Springboot工程中创建一个Clock类2、 添加一个Controller

Dubbo之SPI机制的实现原理和优势分析

《Dubbo之SPI机制的实现原理和优势分析》:本文主要介绍Dubbo之SPI机制的实现原理和优势,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Dubbo中SPI机制的实现原理和优势JDK 中的 SPI 机制解析Dubbo 中的 SPI 机制解析总结Dubbo中