RabbitMQ消息队列的高可靠使用

2024-06-23 12:48

本文主要是介绍RabbitMQ消息队列的高可靠使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

整体解决方案

  TopicExchange型交换机,会根据RouteKey将消息路由至匹配队列(推模式下,消息将被推送至消费者的监听函数处理),消息流转和路由的示意图如下。
在这里插入图片描述
在这里插入图片描述

高可靠机制

  RabbitMQ的高可靠保障主要在投递、持久化、消费三个方面,分别对应于生产者、消息队列、消费者,具体部流程如下图所示。
在这里插入图片描述

生产端可靠性保障

  为防止消息在未发送到Broker前就由于网络原因或其他情况导致丢失,生产者端需要保障消息成功送达中间件,手段一般有两种,一是使用事务机制,但这种做法是非异步的,吞吐量性能较差;另一种是确认机制,本例使用确认机制。

  1. 将Channel设为confirm模式,RabbitMQ会在消息到达Exchange后回调生产者ConfirmCallback接口;(如果消息和队列是可持久化的,那么包含msgId的ack会在消息被写入磁盘之后发出)
  2. 将Channel设为return模式,若消息没有被路由到任何一个队列,RabbitMQ会回调生产者ReturnCallback接口。

生产端高可靠只能让生产者知道自己投递的消息是否成果到达交换机或某个队列,但是无法知道消费者有没有成功消费。

// 设置消息入队失败回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {log.debug(s); }
});// 设置消息发送至交换机回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message m,int i,String s,String q,String q) {log.debug(s); }
});MessageProperties messageProperties = new MessageProperties();
// 开启消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setReceivedDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息ID
messageProperties.setMessageId(UUID.randomUUID().toString());
// 设置消息格式
messageProperties.setContentType("application/json");
Message message = new Message("msgBody".getContent().getBytes(), messageProperties);
// 发送消息
rabbitTemplate.send("exchangeName", "routingKey", message);

消息和队列可靠性保障

  为防止消息中间件在重启或宕机等场景丢失消息,开启消息和队列的持久化设置。主要有两个步骤:

  1. 申明队列时,开启durable模式,队列的持久化能保证其本身的元数据不会因异常情况而丢失。
  2. 发送消息时选择MessageDeliveryMode.PERSISTENT投递模式,消息会被持久化到磁盘,确保消息不会丢失。
/*************** 交换机配置**************/
@Bean
public TopicExchange exchange() {return new TopicExchange(EXCHANGE_NAME);
}/*************** 队列配置**************/
@Bean
public Queue queue() {// 参数2true代表持久化队列return new Queue(QUEUE_NAME, true, false, false);
}@Bean
public Binding binding(Queue queue, TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}

消费端可靠性保障

  为防止消息不能成功送达消费者,需要使用消费确认机制,消费者在订阅队列时将acknowledge-mode设为manual,在消息被成功消费后主动响应。

此处为推模式:即队列把消息推送给监听者

@Component
@RabbitListener(queues = QUEUE_NAME, containerFactory = "rabbitListenerContainerFactory")
public class MsgListener {@Autowiredprivate MsgService msgService;@RabbitHandlerpublic void processStatisticMessage(Message message, Channel channel, Map content) throws Exception {MessageProperties msgProperties = message.getMessageProperties();String messageId = msgProperties.getMessageId();String routingKey = msgProperties.getReceivedRoutingKey();long deliveryTag = msgProperties.getDeliveryTag();// 幂等判断,拒绝重复消息if (msgService.hasDuplicateMsgId(messageId)) {channel.basicReject(deliveryTag, false);return;}try {// 消费消息msgService.process(content, routingKey);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}

复用连接、并发消费

  1. 使用连接池CachingConnectionFactory复用连接,避免频繁新建销毁等性能开销。缓存模式为CacheMode.CHANNEL。
@Bean
public CachingConnectionFactory cachingConnectionFactory() {CachingConnectionFactory factory = new CachingConnectionFactory(HOST, PORT);// 设置用户名密码factory.setUsername(rabbitHostConfig.getUsername());factory.setPassword(rabbitHostConfig.getPassword());// 设置虚拟主机factory.setVirtualHost(VIRTUAL_HOST);// 消息送达交换机确认factory.setPublisherConfirms(true);factory.setPublisherReturns(true);// 设置缓存模式factory.setCacheMode(CacheMode.CHANNEL);// 设置缓存数factory.setChannelCacheSize(8);return factory;
}@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(cachingConnectionFactory);rabbitTemplate.setMandatory(true);return rabbitTemplate;
}
  1. 如果异步化的业务不存在顺序性消费需求,可通过设置并发消费来提升消费者性能,且每个消费者可预取多条消息,增加吞吐量。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(cachingConnectionFactory);// 消息转换器factory.setMessageConverter(new Jackson2JsonMessageConverter());// ack模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 消费者并发数factory.setConcurrentConsumers(8);// 预取消息数factory.setPrefetchCount(8);return factory;
}

消费幂等性

  在消息生产时,使用UUID构建唯一msgId。
  在消息消费前,通过判断Redis中msgId来确认是否重复消费;在消息成功消费后,将msgId保存至Redis并设置过期时间。

指数回退策略重试机制

  在消息消费重试机制上,摒弃了requeue模式,因为requeue会和幂等性消费产生冲突,最致命的是requeue可能产生循环,导致队列被阻塞。
  采用Spring自带的Retry可以在消费端业务逻辑里重试,避免消息重新入队导致的各种问题。这里将回退策略设置为指数型,最多重试3次,初始间隔6秒,每次间隔10倍。

需在启动类加上注解@EnableRetry

@Override
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(maxDelay = 600000L, delay = 6000L, multiplier = 10))
public void process(Map content, String type) throws Exception {log.info("process msg:{}", content);
}

死信队列

  消息消费时,若重试超限,将消息转发至死信队列,以备人工排查定位或后期做补偿机制。

@Bean(name = "deadQueue")
public Queue deadQueue() {Map<String, Object> args = new HashMap<String, Object>();// 设置队列长度// args.put("x-max-length", 1000);// 设置死信队列args.put("x-dead-letter-exchange", "dlx.exchange");// 持久化队列return new Queue("dlx.queue", true, false, false, args);
}@Bean(name = "deadExchange")
public TopicExchange deadExchange() {return new TopicExchange("dlx.exchange");
}@Bean(name = "deadBinding")
public Binding deadBinding(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dead.#");
}

这篇关于RabbitMQ消息队列的高可靠使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1087200

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

git使用的说明总结

Git使用说明 下载安装(下载地址) macOS: Git - Downloading macOS Windows: Git - Downloading Windows Linux/Unix: Git (git-scm.com) 创建新仓库 本地创建新仓库:创建新文件夹,进入文件夹目录,执行指令 git init ,用以创建新的git 克隆仓库 执行指令用以创建一个本地仓库的

【北交大信息所AI-Max2】使用方法

BJTU信息所集群AI_MAX2使用方法 使用的前提是预约到相应的算力卡,拥有登录权限的账号密码,一般为导师组共用一个。 有浏览器、ssh工具就可以。 1.新建集群Terminal 浏览器登陆10.126.62.75 (如果是1集群把75改成66) 交互式开发 执行器选Terminal 密码随便设一个(需记住) 工作空间:私有数据、全部文件 加速器选GeForce_RTX_2080_Ti