【消息中间件】Rabbitmq消息可靠性、持久化机制、各种消费

本文主要是介绍【消息中间件】Rabbitmq消息可靠性、持久化机制、各种消费,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


文章目录

  • 前言
  • 常见用法
    • 1.消息可靠性
    • 2.持久化机制
    • 3.消息积压
      • 批量消费:增加 prefetch 的数量,提高单次连接的消息数
      • 并发消费:多部署几台消费者实例
    • 4.重复消费
  • 二、其他
    • 1.队列存在大量unacked数据


前言


常见用法

1.消息可靠性

RabbitMQ 提供了多种机制来确保消息的可靠性,以防止消息丢失或被意外删除。以下是几种提高消息可靠性的方法:

  1. 持久化消息(Durable Message):在发布消息时,将消息的 deliveryMode 设置为 2,即可将消息设置为持久化消息。持久化消息会将消息写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。

  2. 持久化队列(Durable Queue):创建队列时,将队列的 durable 参数设置为 true,即可创建一个持久化队列。持久化队列会将队列的元数据和消息都存储在磁盘上,即使消息队列服务器重启,队列的元数据和消息仍然可以恢复。

  3. 确认模式(Publisher Confirms):使用确认模式可以确保消息被成功发送到 RabbitMQ 服务器,并得到确认。通过在信道上使用 channel.confirmSelect() 启用确认模式,然后通过 channel.waitForConfirms() 方法来等待服务器的确认。

  4. 事务模式(Transactions):使用事务模式可以保证消息的原子性,要么全部发送成功,要么全部失败。通过在信道上使用 channel.txSelect() 开启事务模式,在发送消息后使用 channel.txCommit() 提交事务,或使用 channel.txRollback() 进行回滚。

  5. 消费者应答(Consumer Acknowledgement):在消费者接收和处理消息后,必须发送确认应答给 RabbitMQ 服务器。通过使用 channel.basicAck() 方法发送确认应答,以告知服务器消息已经成功处理。

通过使用上述机制,可以在 RabbitMQ 中实现消息的可靠性传输和处理,以防止消息的丢失和重复传递。

2.持久化机制

在RabbitMQ中,消息持久化是一种机制,可以确保消息在服务器宕机或重启之后不丢失。默认情况下,RabbitMQ的消息是存储在内存中的,如果服务器宕机,则会导致消息的丢失。要实现消息的持久化,可以采取以下步骤:

  1. 创建一个持久化的交换机(Exchange):
    在定义交换机时,将其durable参数设置为true,例如:

    channel.exchangeDeclare("exchange_name", "direct", true);
    
  2. 创建一个持久化的队列(Queue):
    在定义队列时,将其durable参数设置为true,例如:

    channel.queueDeclare("queue_name", true, false, false, null);
    
  3. 将持久化的队列与交换机进行绑定:
    使用队列和交换机的bind方法进行绑定,例如:

    channel.queueBind("queue_name", "exchange_name", "routing_key");
    
  4. 发布持久化的消息:
    在发布消息时,将消息的deliveryMode属性设置为2,表示消息是持久化的,例如:

    String message = "Hello RabbitMQ!";
    channel.basicPublish("exchange_name", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    

通过以上步骤,就可以实现消息的持久化。当RabbitMQ服务器宕机或重启后,消息会被保存在磁盘中,并在服务器恢复后重新投递给消费者。需要注意的是,虽然消息被持久化了,但是在发送到队列之前,仍然有可能发生丢失,所以在实际的应用中,还需要考虑一些因素,比如网络故障、消费者的可靠性等。

3.消息积压

批量消费:增加 prefetch 的数量,提高单次连接的消息数

为了提高消费性能,可以将多个消息批量进行消费,减少消费者和消息队列的交互次数。通过设置合适的批量消费大小,可以在一次网络往返中消费多个消息,从而提高消费性能。
要实现RabbitMQ的批量消费,可以使用RabbitMQ的channel.basicQos方法来设置每次消费的消息数量。以下是一个示例代码,演示如何实现批量消费:

import pikadef callback(ch, method, properties, body):print("Received message: %s" % body)# 处理消息的逻辑# 发送确认给RabbitMQch.basic_ack(delivery_tag=method.delivery_tag)def consume_messages():connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 设置每个消费者一次性获取的消息数量channel.basic_qos(prefetch_count=10)# 注册消费者并开始消费消息channel.basic_consume(queue='my_queue', on_message_callback=callback)# 进入一个循环,一直等待消息的到来channel.start_consuming()consume_messages()

在上面的代码中,我们通过channel.basic_qos(prefetch_count=10)设置每次处理的消息数量为10。这样,在消费者处理完10条消息之前,RabbitMQ将不会再向其发送更多消息。

这样,就实现了RabbitMQ的批量消费。你可以根据需求,在basic_qos方法中设置适合你的消息数量。

并发消费:多部署几台消费者实例

可以采用多线程或多进程的方式进行消息的并发消费,将多个消费者并行处理消息。通过增加并发消费者的数量,可以提高消息的处理速度,提高消费的性能。
使用进程池来消费RabbitMQ的消息可以更好地管理并发性能。通过使用进程池,可以在一个固定的池子中创建多个进程,并且复用它们来消费消息,从而减少进程创建和销毁的开销。

以下是一个使用进程池消费RabbitMQ消息的示例:

import multiprocessing
import os
import time
import pikadef consumer(queue_name):connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue=queue_name)def callback(ch, method, properties, body):print(f'Process {os.getpid()} received message: {body}')time.sleep(1)channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()def main():# 创建进程池pool = multiprocessing.Pool(processes=5)# 在进程池中提交任务for _ in range(5):pool.apply_async(consumer, ('my_queue',))pool.close()pool.join()if __name__ == '__main__':main()

在上述示例中,我们使用multiprocessing.Pool来创建一个包含5个进程的进程池。然后,我们使用apply_async方法向进程池中提交任务,每个任务都是调用consumer函数来消费"my_queue"队列中的消息。进程池会自动分配任务给闲置的进程来处理。通过closejoin方法,我们可以确保所有任务都被完成。

4.重复消费

  1. 消息确认:在消费者处理完一条消息后,通过调用basic_ack方法手动确认消息已经成功消费。这样,RabbitMQ就会将该消息标记为已经处理,不会再次发送给其他消费者。同时,还可以设置auto_ack参数为False,禁用自动消息确认机制,以确保消息被正确确认。

  2. 消息持久化:可以通过设置消息的delivery_mode属性为2来将消息标记为持久化消息。这样,即使消费者在处理消息时发生故障,消息也会被保存在磁盘上,待消费者恢复正常后会重新投递。

  3. 唯一消费者:可以通过设置队列的exclusive参数为True,创建一个排他队列。这样,只有一个消费者可以连接到该队列,并独占地消费其中的消息,避免重复消费。

  4. 消息去重:在消费者端可以维护一个已消费消息的记录,例如在数据库或缓存中记录已消费的消息的ID或唯一标识。每次消费消息时,先检查记录中是否已经存在该消息,如果存在则跳过,避免重复处理。

  5. 幂等操作:在消费者的处理逻辑中,要确保操作是幂等的,即多次执行同一个操作的效果和执行一次的效果是一样的。这样,即使消息被重复消费,也不会产生副作用。

二、其他

1.队列存在大量unacked数据

通过rabbitmq的后台管理,进入相应的队列,滑到最下边,找到purge。purge将清空这个队列的消息。

这篇关于【消息中间件】Rabbitmq消息可靠性、持久化机制、各种消费的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/539935

相关文章

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Android 友盟消息推送集成遇到的问题

友盟消息推送遇到的问题 集成友盟消息推送,步骤根据提供的技术文档接入便可。可是当你集成到项目中去的时候,可能并不是一帆风顺就搞定,因为你项目里面是可能集成了其他的sdk(比如支付宝,微信,七鱼等等三方的sdk)。那么这个时候,再加上友盟的消息推送sdk集成可能就会出现问题。 问题清单 友盟消息推送sdk和支付宝sdk冲突问题 后台配置了消息推送,也显示发送成功,但是手机没有收到消息通知

Unity数据持久化 之 一个通过2进制读取Excel并存储的轮子(4)

本文仅作笔记学习和分享,不用做任何商业用途 本文包括但不限于unity官方手册,unity唐老狮等教程知识,如有不足还请斧正​​ Unity数据持久化 之 一个通过2进制读取Excel并存储的轮子(3)-CSDN博客  这节就是真正的存储数据了   理清一下思路: 1.存储路径并检查 //2进制文件类存储private static string Data_Binary_Pa

iptables持久化命令:netfilter-persistent save

在Linux上,使用netfilter-persistent命令可以保存iptables防火墙规则,确保它们在系统重启后仍然有效。以下是如何使用netfilter-persistent来保存iptables规则的步骤: 打开终端:首先,你需要打开Linux系统的终端。保存规则:使用netfilter-persistent save命令可以保存当前的iptables规则。这个命令会调用所有插件,将

Unity数据持久化 之 一个通过2进制读取Excel并存储的轮子(3)

本文仅作笔记学习和分享,不用做任何商业用途 本文包括但不限于unity官方手册,unity唐老狮等教程知识,如有不足还请斧正​​ Unity数据持久化 之 一个通过2进制读取Excel并存储的轮子(2) (*****生成数据结构类的方式特别有趣****)-CSDN博客 做完了数据结构类,该做一个存储类了,也就是生成一个字典类(只是声明)  实现和上一节的数据结构类的方式大同小异,所

消息队列的理解和应用场景

知乎上的一个通俗理解的优秀答案 by 祁达方 小红是小明的姐姐。 小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。 后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看。 书架就是一个消息队列,小红是生产者,小明是