手搭手RocketMQ重试机制

2024-03-17 12:28
文章标签 rocketmq 机制 搭手 重试

本文主要是介绍手搭手RocketMQ重试机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

dynamic-datasource

3.6.1

mybatis-plus

3.5.3.2

rocketmq

4.9.4

加入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><!-- 排除logback依赖 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--Log4j2场景启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3</version><exclusions><exclusion><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.14</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1</version></dependency><dependency><groupId>p6spy</groupId><artifactId>p6spy</artifactId><version>3.9.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version></dependency></dependencies>

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

Rocket重试机制

RocketMQ的重试机制是指:当消费者消费消息失败时,RocketMQ会在一定时间后重新将消息发送给消费者进行消费,以确保消息的可靠消费。

自动重试:Consumer在消费失败后,会在一定重试策略下定期重试消费失败的消息,直到成功或达到最大重试次数。

消息重发:如果Consumer在最大重试次数内仍然消费失败,Broker会定期扫描被标记为消费失败的消息,并将其重发给其他Consumer。

灵活的重试策略:RocketMQ提供多种重试策略来控制重试时机和频率,主要有:

生产者重试

生产者设置消息失败后重试次数

//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);

Int 重试的次数

//重试

@Test
void retryProducerTest()throws Exception{//创建生产者DefaultMQProducer producer = new DefaultMQProducer("retryGroup");//连接namesrvproducer.setNamesrvAddr("192.168.68.133:9876");//启动producer.start();producer.setDefaultTopicQueueNums(1);//自身业务key唯一String Key = UUID.randomUUID().toString();System.out.println(Key);//重试//同步producer.setRetryTimesWhenSendFailed(3);//异步//producer.setRetryTimesWhenSendAsyncFailed(2);//创建消息Message message = new Message("retry", null,Key, "重试测试内存内容".getBytes());//发送消息producer.send(message);System.out.println("发送成功");//关闭生产者producer.shutdown();
}

消费者重试

设置自定义最大重试

consumer.setMaxReconsumeTimes(6);

死信消息(超过重试次数,并未处理的消息),放在死信主题中,%DLQ% retry

@Test
void retryConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅主题   *表示该主题的所有消息consumer.subscribe("retry","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("消息内容"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

死信处理方案

死信处理方案1、单独订阅死信主题2、监听死信主题(业务流程控制)

通过存入单独数据库表,业务发送短信等方式通知人工处理

1、单独订阅死信主题

单独订阅监听主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅死信主题   *表示该主题的所有消息consumer.subscribe("%DLQ%retryConsumerTest","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

2、监听死信主题(业务流程控制)

通过业务流程监听多个主题

//死信处理方案二、监听死信主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//设置每次重试次数consumer.setMaxReconsumeTimes(3);// 订阅需要的多个主题列表List<String> topics = Arrays.asList("retry", "TopicA", "TopicB");// 订阅主题列表中的所有主题for (String topic : topics) {consumer.subscribe(topic, "*"); // 这里的tag是用来过滤消息的,"*" 表示接收这个主题下的所有消息}//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {try{//业务代码System.out.println("业务代码");System.out.println("消息内容"+new String(messageExt.getBody()));int i =1/0;//模拟代码出错}catch (Exception e){//获取重试次数int reconsumeTimes = messageExt.getReconsumeTimes();String key = messageExt.getKeys();if (reconsumeTimes > 2){OrderLog orderLog = new OrderLog();orderLog.setType(2);orderLog.setOrderid(key);orderLog.setUsername("重试超过2次,死信消息");orderMapper.insert(orderLog);System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));//发送短信通知人工处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}else {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

这篇关于手搭手RocketMQ重试机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/818942

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

【Tools】大模型中的自注意力机制

摇来摇去摇碎点点的金黄 伸手牵来一片梦的霞光 南方的小巷推开多情的门窗 年轻和我们歌唱 摇来摇去摇着温柔的阳光 轻轻托起一件梦的衣裳 古老的都市每天都改变模样                      🎵 方芳《摇太阳》 自注意力机制(Self-Attention)是一种在Transformer等大模型中经常使用的注意力机制。该机制通过对输入序列中的每个元素计算与其他元素之间的相似性,

如何通俗理解注意力机制?

1、注意力机制(Attention Mechanism)是机器学习和深度学习中一种模拟人类注意力的方法,用于提高模型在处理大量信息时的效率和效果。通俗地理解,它就像是在一堆信息中找到最重要的部分,把注意力集中在这些关键点上,从而更好地完成任务。以下是几个简单的比喻来帮助理解注意力机制: 2、寻找重点:想象一下,你在阅读一篇文章的时候,有些段落特别重要,你会特别注意这些段落,反复阅读,而对其他部分

【Tools】大模型中的注意力机制

摇来摇去摇碎点点的金黄 伸手牵来一片梦的霞光 南方的小巷推开多情的门窗 年轻和我们歌唱 摇来摇去摇着温柔的阳光 轻轻托起一件梦的衣裳 古老的都市每天都改变模样                      🎵 方芳《摇太阳》 在大模型中,注意力机制是一种重要的技术,它被广泛应用于自然语言处理领域,特别是在机器翻译和语言模型中。 注意力机制的基本思想是通过计算输入序列中各个位置的权重,以确

FreeRTOS内部机制学习03(事件组内部机制)

文章目录 事件组使用的场景事件组的核心以及Set事件API做的事情事件组的特殊之处事件组为什么不关闭中断xEventGroupSetBitsFromISR内部是怎么做的? 事件组使用的场景 学校组织秋游,组长在等待: 张三:我到了 李四:我到了 王五:我到了 组长说:好,大家都到齐了,出发! 秋游回来第二天就要提交一篇心得报告,组长在焦急等待:张三、李四、王五谁先写好就交谁的

UVM:callback机制的意义和用法

1. 作用         Callback机制在UVM验证平台,最大用处就是为了提高验证平台的可重用性。在不创建复杂的OOP层次结构前提下,针对组件中的某些行为,在其之前后之后,内置一些函数,增加或者修改UVM组件的操作,增加新的功能,从而实现一个环境多个用例。此外还可以通过Callback机制构建异常的测试用例。 2. 使用步骤         (1)在UVM组件中内嵌callback函

Smarty模板引擎工作机制(一)

深入浅出Smarty模板引擎工作机制,我们将对比使用smarty模板引擎和没使用smarty模板引擎的两种开发方式的区别,并动手开发一个自己的模板引擎,以便加深对smarty模板引擎工作机制的理解。 在没有使用Smarty模板引擎的情况下,我们都是将PHP程序和网页模板合在一起编辑的,好比下面的源代码: <?php$title="深处浅出之Smarty模板引擎工作机制";$content=