rabbitmq之可靠性投递与生产实践(二)

2023-11-07 01:38

本文主要是介绍rabbitmq之可靠性投递与生产实践(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 可靠性投递与生产实践
      • 一 可靠性投递
        • 1、确保消息发送到RabbitMQ服务器
        • 2、确保消息路由到正确的队列
        • 3、确保消息在队列正确地存储
        • 4、确保消息从队列正确地投递到消费者
        • 5、消费者回调
        • 6、补偿机制
        • 7、消息幂等性
        • 8、消息的顺序性
      • 二 高可用架构
        • RabbitMQ集群
        • RabbitMQ镜像队列
        • HAproxy负载+Keepalived高可用
        • 网络分区
        • 广域网的同步方案
      • 三 实践经验总结
        • 1、配置文件与命名规范
        • 2、调用封装
        • 3、信息落库+定时任务
        • 4、运维监控
        • 5、插件
        • 6、如何减少连接数
        • 思考
        • 面试题

可靠性投递与生产实践

一 可靠性投递

首先需要明确,效率与可靠性是无法兼得的,如果要保证每一个环节都成功,势必会对消息的收发效率造成影响。
如果是一些业务实时一致性要求不是特别高的场合,可以牺牲一些可靠性来换取效率。

image-20200727080057893

① 代表消息从生产者发送到Exchange;
② 代表消息从Exchange路由到Queue;
③ 代表消息在Queue中存储;
④ 代表消费者订阅Queue并消费消息。

1、确保消息发送到RabbitMQ服务器

可能因为网络或者Broker的问题导致①失败,而生产者是无法知道消息是否正确发送到Broker的。
有两种解决方案,第一种是Transaction(事务)模式,第二种Confirm(确认)模式。

在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。使用事务机制的话会“吸干”RabbitMQ的性能,一般不建议使用。

生产者通过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式。一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。

2、确保消息路由到正确的队列

可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。
使用mandatory参数和ReturnListener,可以实现消息无法路由的时候返回给生产者。
另一种方式就是使用备份交换机(alternate-exchange),无法路由的消息会发送到这个交换机上

Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); // 指定交换机的备份交换机
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
3、确保消息在队列正确地存储

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题。

解决方案:
1、队列持久化

// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,
Object> arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

2、交换机持久化

// String exchange, boolean durable
channel.exchangeDeclare("MY_EXCHANGE","true");

3、消息持久化

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2)  // 2代表持久化,其他代表瞬态.build();channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
4、确保消息从队列正确地投递到消费者

如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息。
如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志)。

5、消费者回调

消费者处理消息以后,可以再发送一条消息给生产者,或者调用生产者的API,告知消息处理完毕。
参考:二代支付中异步通信的回执,多次交互。某提单APP,发送碎屏保消息后,消费者必须回调API。

6、补偿机制

对于一定时间没有得到响应的消息,可以设置一个定时重发的机制,但要控制次数,比如最多重发3次,否则会造成消息堆积。
参考:ATM存款未得到应答时发送5次确认;ATM取款未得到应答时,发送5次冲正。根据业务表状态做一个重发。

7、消息幂等性

服务端是没有这种控制的,只能在消费端控制。
如何避免消息的重复消费?
消息重复可能会有两个原因:
1、生产者的问题,环节①重复发送消息,比如在开启了Confirm模式但未收到确认。
2、环节④出了问题,由于消费者未发送ACK或者其他原因,消息重复投递。
对于重复发送的消息,每个消息有一个消息id 也可以对每一条消息生成一个唯一的业务ID,通过日志或者建表来做重复控制。
参考:银行的重账控制环节。

8、消息的顺序性

消息的顺序性指的是消费者消费的顺序跟生产者产生消息的顺序是一致的。
在RabbitMQ中,一个队列有多个消费者时,由于不同的消费者消费消息的速度是不一样的,顺序无法保证。
参考:消息:1、新增门店 2、绑定产品 3、激活门店,这种情况下消息消费顺序不能颠倒。

二 高可用架构

image-20200727080825034

RabbitMQ集群

集群主要用于实现高可用与负载均衡。
RabbitMQ通过/var/lib/rabbitmq/.erlang.cookie来验证身份,需要在所有节点上保持一致。
集群有两种节点类型,一种是磁盘节点,一种是内存节点。集群中至少需要一个磁盘节点以实现元数据的持久化,
未指定类型的情况下,默认为磁盘节点。
集群通过25672端口两两通信,需要开放防火墙的端口。
需要注意的是,RabbitMQ集群无法搭建在广域网上,除非使用federation或者shovel等插件。
集群的配置步骤:
1、配置hosts
2、同步erlang.cookie
3、加入集群

RabbitMQ镜像队列

集群方式下,队列和消息是无法在节点之间同步的,因此需要使用RabbitMQ的镜像队列机制进行同步。

操作方式命令或步骤
rabbitmqctl(Windows)rabbitmqctl set_policy ha-all “^ha.” “{”“ha-mode”":"“all”"}"
HTTP APIPUT /api/policies/%2f/ha-all {“pattern”:"^ha.", “definition”:{“ha-mode”:“all”}}
Web UINavigate to Admin > Policies > Add / update a policy Name输入:mirror_image Pattern输入:^(代表匹配所有) Definition点击 HA mode,右边输入:al

image-20200727081052420

参考资料:

https://blog.csdn.net/u013256816/article/details/71097186

HAproxy负载+Keepalived高可用

在两个内存节点上安装HAProxy

yum install haproxy

编辑配置文件

vim /etc/haproxy/haproxy.cfg

内容修改为:

globallog     127.0.0.1 local2chroot   /var/lib/haproxypidfile   /var/run/haproxy.pidmaxconn   4000user    haproxygroup    haproxydaemonstats socket /var/lib/haproxy/stats
defaultslog           globaloption         dontlognulloption         redispatchretries         3timeout connect     10stimeout client     1mtimeout server     1mmaxconn         3000
listen http_frontmode httpbind 0.0.0.0:1080      #监听端口stats refresh 30s      #统计页面自动刷新时间stats uri /haproxy?stats   #统计页面urlstats realm Haproxy Manager #统计页面密码框上提示文本stats auth admin:123456   #统计页面用户名和密码设置
listen rabbitmq_adminbind 0.0.0.0:15673server node1 192.168.8.40:15672server node2 192.168.8.45:15672
listen rabbitmq_cluster 0.0.0.0:5673mode tcpbalance roundrobintimeout client 3htimeout server 3htimeout connect 3hserver  node1 192.168.8.40:5672 check inter 5s rise 2 fall 3server  node2 192.168.8.45:5672 check inter 5s rise 2 fall 3

启动HAProxy

haproxy -f /etc/haproxy/haproxy.cfg

安装Keepalived

yum -y install keepalived

修改配置文件

vim /etc/keepalived/keepalived.conf

内容改成(物理网卡和当前主机IP要修改):

global_defs {notification_email {acassen@firewall.locfailover@firewall.locsysadmin@firewall.loc}notification_email_from Alexandre.Cassen@firewall.locsmtp_server 192.168.200.1smtp_connect_timeout 30router_id LVS_DEVELvrrp_skip_check_adv_addr# vrrp_strict  # 注释掉,不然访问不到VIPvrrp_garp_interval 0vrrp_gna_interval 0
}
global_defs {notification_email {acassen@firewall.locfailover@firewall.locsysadmin@firewall.loc}notification_email_from Alexandre.Cassen@firewall.locsmtp_server 192.168.200.1smtp_connect_timeout 30router_id LVS_DEVELvrrp_skip_check_adv_addr# vrrp_strict  # 注释掉,不然访问不到VIPvrrp_garp_interval 0vrrp_gna_interval 0
}
# 检测任务
vrrp_script check_haproxy {# 检测HAProxy监本script "/etc/keepalived/script/check_haproxy.sh"# 每隔两秒检测interval 2# 权重weight 2
}
# 虚拟组
vrrp_instance haproxy {state MASTER # 此处为`主`,备机是 `BACKUP`interface ens33 # 物理网卡,根据情况而定mcast_src_ip 192.168.8.40 # 当前主机ipvirtual_router_id 51 # 虚拟路由id,同一个组内需要相同priority 100 # 主机的优先权要比备机高advert_int 1 # 心跳检查频率,单位:秒authentication { # 认证,组内的要相同auth_type PASSauth_pass 1111}# 调用脚本track_script {check_haproxy}# 虚拟ip,多个换行virtual_ipaddress {192.168.8.201}
}

启动keepalived

keepalived -D
网络分区

为什么会出现分区?因为RabbitMQ对网络延迟非常敏感,为了保证数据一致性和性能,在出现网络故障时,集群节点会出现分区。

https://blog.csdn.net/u013256816/article/details/53588206

https://blog.csdn.net/u013256816/article/details/73757884

https://blog.csdn.net/u013256816/article/details/74998896

广域网的同步方案

federation插件
shovel插件

三 实践经验总结

1、配置文件与命名规范

集中放在properties文件中
体现元数据类型(_VHOST _EXCHANGE _QUEUE);
体现数据来源和去向(XXX_TO_XXX);

2、调用封装

可以对Template做进一步封装,简化消息的发送。

3、信息落库+定时任务

将需要发送的消息保存在数据库中,可以实现消息的可追溯和重复控制,需要配合定时任务来实现。

4、运维监控

https://blog.51cto.com/yanconggod/2069376

5、插件

tracing

https://www.rabbitmq.com/plugins.html

6、如何减少连接数

合并消息的发送,建议单条消息不要超过4M(4096KB)

思考

消费者的集群或者微服务的多个实例,会不会重复接收消息?
生产者先发送消息还是先登记业务表?(打款错误的例子)先登记业务表
谁来创建对象(交换机、队列、绑定关系)?消费者

重复创建会有什么问题?
持久化的队列和非持久化的交换机可以绑定吗?可以
如何设计一个MQ服务? http://www.xuxueli.com/xxl-mq/#/

面试题

1、消息队列的作用与使用场景?

异步:批量数据异步处理 例如:批量上传文件 比如代发代扣文件

削峰:高负载任务负载均衡 例如:电商秒杀抢购

解耦:串行任务并行化 例如:退货流程解耦

广播:基于发布、订阅模型实现一对多通信。

2、创建队列和交换机的方法?
3、多个消费者监听一个生产者时,消息如何分发?

轮询和公平分发

4、无法被路由的消息,去了哪里?

没有任何设置的话就直接被丢弃了

解决方案:

  1. 使用mandatory=true 配合ReturnListener 实现消息回发。
  2. 声明交换机时,指定备份交换机。

5、消息在什么时候会变成Dead Letter(死信)?

有三种情况消息会进入DLX(Dead Letter Exchange)死信交换机。
1、消息被消费者拒绝并且不能重新入队 (NACK || Reject ) && requeue == false
2、消息过期(消息过期时间 队列过期时间)
3、队列达到最大长度(先入队的消息会被发送到DLX)

6、RabbitMQ如何实现延迟队列?

  1. 使用TTL结合DLX的方式来实现消息的延迟投递
  2. 使用rabbitmq-delayed-message-exchange插件

7、如何保证消息的可靠性投递?

  1. 确保消息发送到RabbitMQ服务器

    可能因为网络或者Broker的问题导致①失败,而生产者是无法知道消息是否正确发送到Broker的。
    有两种解决方案,第一种是Transaction(事务)模式(不建议),第二种Confirm(确认)模式。一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。

  2. 确保消息路由到正确的队列

    可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。

    使用mandatory参数和ReturnListener,可以实现消息无法路由的时候返回给生产者。
    另一种方式就是使用备份交换机(alternate-exchange),无法路由的消息会发送到这个交换机上

  3. 确保消息在队列正确地存储

    可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题。

    解决方案:队列持久化 交换机持久化 消息持久化

  4. 确保消息从队列正确地投递到消费者

    RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志)。

  5. 消费者回调

    消费者处理消息以后,可以再发送一条消息给生产者,或者调用生产者的API,告知消息处理完毕。
    参考:二代支付中异步通信的回执,多次交互。某提单APP,发送碎屏保消息后,消费者必须回调API。

  6. 补偿机制

    对于一定时间没有得到响应的消息,可以设置一个定时重发的机制,但要控制次数,比如最多重发3次,否则会造成消息堆积。
    参考:ATM存款未得到应答时发送5次确认;ATM取款未得到应答时,发送5次冲正。根据业务表状态做一个重发。

8、如何在服务端和消费端做限流?

  1. 服务端流控
  2. 消费端限流 通过preFetchCount
  3. 网关或接入层

9、如何保证消息的顺序性?

一个队列对应一个消费者 设置全局id messageId parentMessageId 前一个没消费完毕就不消费下一个 或者前一个没处理 完毕就不发布下一个

集群模式:

  1. 普通模式 无法实现节点之间的同步

  2. 镜像模式 设置镜像策略实现节点之间的同步

节点类型:ram(内存) disc(磁盘)

11、消息的幂等性

对于重复发送的消息,每个消息有一个消息id 也可以对每一条消息生成一个唯一的业务ID,通过日志或者建表来做重复控制。
参考:银行的重账控制环节。

这篇关于rabbitmq之可靠性投递与生产实践(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360367

相关文章

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构

Springboot使用RabbitMQ实现关闭超时订单(示例详解)

《Springboot使用RabbitMQ实现关闭超时订单(示例详解)》介绍了如何在SpringBoot项目中使用RabbitMQ实现订单的延时处理和超时关闭,通过配置RabbitMQ的交换机、队列和... 目录1.maven中引入rabbitmq的依赖:2.application.yml中进行rabbit

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

springboot(集成篇):RabbitMQ集成详解

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将RocketMQ捐献给了apache,当然了今天的主角还是讲RabbitMQ。消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在

spring boot实战(番外篇)整合RabbitMQ

前言 最近几篇文章将围绕消息中间件RabbitMQ展开,对于RabbitMQ基本概念这里不阐述,主要讲解RabbitMQ的基本用法、Java客户端API介绍、spring Boot与RabbitMQ整合、 Spring Boot与RabbitMQ整合源码分析。   RabbitMQ安装   在使用消息中间件RabbitMQ之前就是安装RabbitMQ。   安装erlang:yum i

深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念

文章目录 文章导图RabbitMQ架构及相关概念四大核心概念名词解读 七大工作模式及四大交换机类型0、前置了解-默认交换机DirectExchange1、简单模式(Simple Queue)-默认DirectExchange2、 工作队列模式(Work Queues)-默认DirectExchange3、发布/订阅模式(Publish/Subscribe)-FanoutExchange4、路