本文主要是介绍RabbitMQ详解(值得珍藏),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1. 基本概念
RabbitMQ是一款开源,使用Erlang编写的,基于AMQP协议的消息中间件;
提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。先了解一下AMQP协议中间的几个重要概念:
-
Server:接收客户端的连接,实现AMQP实体服务。
-
Connection:连接,应用程序与Server的网络连接,TCP连接。
-
Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
-
Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
-
Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
-
Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
-
Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
-
RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
-
Queue:消息队列,用来保存消息,供消费者消费。
2. 系统架构
AMQP协议模型由三部分组成:生产者、消费者和服务端。生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。
总结一下整体过程:生产者投递消息 -> 和Server建立连接,开启信道 -> 声明交换器和队列,并通过路由键将交换机和队列绑定 -> 投递消息到虚拟主机 -> 消息发送到消息队列 -> 消费者建立连接 -> 消费消息 -> 关闭信道和连接。
RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种:
Direct Exchange:Direct交换器需要消息的Routing Key与 Exchange和Queue 之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式。RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange。简单点说就是一对一的,点对点的发送。
Fanout Exchange:Fanout交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。订阅模式与Binding Key和Routing Key无关,交换器将接受到的消息分发给有绑定关系的所有消息队列队列(不论Binding Key和Routing Key是什么)。类似于子网广播,子网内的每台主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
Topic(通配符模式):Topic交换器按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(也可以是零个或一个)。
Headers Exchange:这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如下图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。
3. 消费原理
我们先看几个基本概念:
- broker:每个节点运行的服务程序,功能为维护该节点的队列的增删以及转发队列操作请求。
- master queue:每个队列都分为一个主队列和若干个镜像队列。
- mirror queue:镜像队列,作为master queue的备份。在master queue所在节点挂掉之后,系统把mirror queue提升为master queue,负责处理客户端队列操作请求。注意,mirror queue只做镜像,设计目的不是为了承担客户端读写压力。
集群中有两个节点,每个节点上有一个broker,每个broker负责本机上队列的维护,并且borker之间可以互相通信。集群中有两个队列A和B,每个队列都分为master queue和mirror queue(备份)。那么队列上的生产消费怎么实现的呢?
对于消费队列,如下图有两个consumer消费队列A,这两个consumer连在了集群的不同机器上。RabbitMQ集群中的任何一个节点都拥有集群上所有队列的元信息,所以连接到集群中的任何一个节点都可以,主要区别在于有的consumer连在master queue所在节点,有的连在非master queue节点上。
因为mirror queue要和master queue保持一致,故需要同步机制,正因为一致性的限制,导致所有的读写操作都必须都操作在master queue上(想想,为啥读也要从master queue中读?和数据库读写分离是不一样的),然后由master节点同步操作到mirror queue所在的节点。即使consumer连接到了非master queue节点,该consumer的操作也会被路由到master queue所在的节点上,这样才能进行消费。
对于生产队列,原理和消费一样,如果连接到非 master queue 节点,则路由过去。
所以,到这里小伙伴们就可以看到 RabbitMQ的不足:由于master queue单节点,导致性能瓶颈,吞吐量受限。虽然为了提高性能,内部使用了Erlang这个语言实现,但是终究摆脱不了架构设计上的致命缺陷。
4. 高级特性
Time To Live,也就是生存时间,是一条消息在队列中的最大存活时间,单位是毫秒,下面看看RabbitMQ过期时间特性:
-
RabbitMQ可以对消息和队列设置TTL。
-
RabbitMQ支持设置消息的过期时间,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。
-
RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除。
-
如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。
-
当然也可以不设置TTL,不设置表示消息不会过期;如果设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息将被立即丢弃。
RabbitMQ中实现RPC的机制是:
- 生产者发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14个属性,这些属性会随着消息一起发送)中设置两个属性值replyTo(一个Queue名称,用于告诉消费者处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,消费者处理完成后需要将此属性返还,生产者将根据这个id了解哪条请求被成功执行了或执行失败)。
- 消费者收到消息并处理。
- 消费者处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。
- 生产者之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
4.1 消息确认
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消费者订阅队列的时候,可以指定autoAck参数,当autoAck为true的时候,RabbitMQ采用自动确认模式,RabbitMQ自动把发送出去的消息设置为确认,然后从内存或者硬盘中删除,而不管消费者是否真正消费到了这些消息。当autoAck为false的时候,RabbitMQ会等待消费者回复的确认信号,收到确认信号之后才从内存或者磁盘中删除消息。
消息确认机制是RabbitMQ消息可靠性投递的基础,只要设置autoAck参数为false,消费者就有足够的时间处理消息,不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在Timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的问题,Queue中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑。如果我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ会立即把这个Message标记为完成,然后从queue中删除了。
4.2 持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。
交换器持久化可以通过在声明队列时将durable参数设置为true。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器了。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。队列的持久化可以通过在声明队列时将durable参数设置为true。
设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依然存在。如果只设置队列持久化或者消息持久化,重启之后消息都会消失。
当然,也可以将所有的消息都设置为持久化,但是这样做会影响RabbitMQ的性能,因为磁盘的写入速度比内存的写入要慢得多。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。鱼和熊掌不可兼得,关键在于选择和取舍。在实际中,需要根据实际情况在可靠性和吞吐量之间做一个权衡。
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。
4.3 事务
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器接收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
4.4 死信队列
当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器成为死信交换器,与该交换器绑定的队列称为死信队列。消息变成死信有下面几种情况:
- 消息被拒绝。
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换器,和一般的交换器没有区别,他能在任何的队列上面被指定,实际上就是设置某个队列的属性。当这个队列中有死信的时候,RabbitMQ会自动将这个消息重新发送到设置的交换器上,进而被路由到另一个队列,我们可以监听这个队列中消息做相应的处理。
死信队列有什么用?当发生异常的时候,消息不能够被消费者正常消费,被加入到了死信队列中。后续的程序可以根据死信队列中的内容分析当时发生的异常,进而改善和优化系统。
4.5 延迟队列
一般的队列,消息一旦进入队列就会被消费者立即消费。延迟队列就是进入该队列的消息会被消费者延迟消费,延迟队列中存储的对象是的延迟消息,“延迟消息”是指当消息被发送以后,等待特定的时间后,消费者才能拿到这个消息进行消费。
延迟队列用于需要延迟工作的场景。最常见的使用场景:淘宝或者天猫我们都使用过,用户在下单之后通常有30分钟的时间进行支付,如果这30分钟之内没有支付成功,那么订单就会自动取消。除了延迟消费,延迟队列的典型应用场景还有延迟重试。比如消费者从队列里面消费消息失败了,可以延迟一段时间以后进行重试。
4.6 消息分发机制:
我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据。因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里。这时,就得使用工作队列了。一个队列有多个消费者同时消费数据。工作队列有两种分发数据的方式:轮询分发(Round-robin)和 公平分发(Fair dispatch)。轮询分发:队列给每一个消费者发送数量一样的数据。公平分发:消费者设置每次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据。
-
轮询分发
如果工作队列中有两个消费者,两个消费者得到的数据量一样的,并不会因为两个消费者处理数据速度不一样使得两个消费者取得不一样数量的数据。但是这种分发方式存在着一些隐患,消费者虽然得到了消息,但是如果消费者没能成功处理业务逻辑,在RabbitMQ中也不存在这条消息。就会出现消息丢失并且业务逻辑没能成功处理的情况。
-
公平分发
消费者设置每次从队列里取一条数据,并且关闭自动回复机制,每次取完一条数据后,手动回复并继续取下一条数据。与轮询分发不同的是,当每个消费都设置了每次只会从队列取一条数据时,并且关闭自动应答,在每次处理完数据后手动给队列发送确认收到数据。这样队列就会公平给每个消息费者发送数据,消费一条再发第二条,而且可以在管理界面中看到数据是一条条随着消费者消费完从而减少的,并不是一下子全部分发完了。采用公平分发方式就不会出现消息丢失并且业务逻辑没能成功处理的情况。
5. 特性分析
这里才是内容的重点,不仅需要知道Rabbit的特性,还需要知道支持这些特性的原因:
- 消息路由(支持):RabbitMQ可以通过不同的交换器支持不同种类的消息路由;
- 消息有序(不支持):当消费消息时,如果消费失败,消息会被放回队列,然后重新消费,这样会导致消息无序;
- 消息时序(非常好):通过延时队列,可以指定消息的延时时间,过期时间TTL等;
- 容错处理(非常好):通过交付重试和死信交换器(DLX)来处理消息处理故障;
- 伸缩(一般):伸缩其实没有非常智能,因为即使伸缩了,master queue还是只有一个,负载还是只有这一个master queue去抗,所以我理解RabbitMQ的伸缩很弱(个人理解)。
- 持久化(不太好):没有消费的消息,可以支持持久化,这个是为了保证机器宕机时消息可以恢复,但是消费过的消息,就会被马上删除,因为RabbitMQ设计时,就不是为了去存储历史数据的。
- 消息回溯(不支持):因为消息不支持永久保存,所以自然就不支持回溯。
queue还是只有一个,负载还是只有这一个master queue去抗,所以我理解RabbitMQ的伸缩很弱(个人理解)。 - 持久化(不太好):没有消费的消息,可以支持持久化,这个是为了保证机器宕机时消息可以恢复,但是消费过的消息,就会被马上删除,因为RabbitMQ设计时,就不是为了去存储历史数据的。
- 消息回溯(不支持):因为消息不支持永久保存,所以自然就不支持回溯。
- 高吞吐(中等):因为所有的请求的执行,最后都是在master queue,它的这个设计,导致单机性能达不到十万级的标准。
这篇关于RabbitMQ详解(值得珍藏)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!