rabbitmq集群docker快速搭建 https://blog.csdn.net/u011058700/article/details/78708767
rabbitmq原理博客 https://www.jianshu.com/p/6376936845ff
基础概念
- Queue
- 队列,用于储存消息,先入先出,prefetchCount限制平分给消费者的消息个数
- Exchange
- 交换机,生产者生产的消息先经过交换机,再路由到一个或多个Queue,这个过程通过binding key完成
- Exchange交换类别
- fanout:会把所有发到Exchange的消息路由到所有和它绑定的Queue
- direct:会把消息路由到routing key和binding key完全相同的Queue,不相同的丢弃
- topic:direct是严格匹配,那么topic就算模糊匹配,routing key和binding key都用.来区分单词串,比如A.B.C,匹配任意单词,#匹配任意多个或0个单词,比如。B.*可以匹配到A.B.C
- headers:不依赖routing key和binding key,通过对比消息属性中的headers属性,对比Exchange和Queue绑定时指定的键值对,相同就路由过来
集群
- rabbitmq基于Erlang语言编写,天生支持分布式特性
- rabbitmq集群会进行元数据同步(相当于索引),同步的内容大致为队列,交换机,路由,安全属性等相关信息,这样做使得客户端访问任意集群几点,查询到的队列交换机等信息都是相同的。
- RabbitMQ集群中的节点只有两种类型:内存节点/磁盘节点,单节点系统只运行磁盘类型的节点。而在集群中,可以选择配置部分节点为内存节点。内存节点将所有的队列,交换器,绑定关系,用户,权限,和vhost的元数据信息保存在内存中。
rabbitmq实现回调
- 背景
- 找的例子大多只是生产和消费,要实现消息的可靠性还是需要回调确认,下面记录下最简单的回调实现案例,使用的springboot搭建
- 生产者
package com.neo.rabbit.topic;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class TopicSender implements RabbitTemplate.ConfirmCallback {private RabbitTemplate rabbitTemplate;@Autowiredpublic TopicSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this);}public void send2() {for (int i = 0; i < 1000; i++) {String context = "hi, i am messages " + i;System.out.println("Sender : " + context);CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());System.out.println("callbackSender UUID: " + correlationData.getId());this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context, correlationData);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm: " + correlationData.getId() + ",s=" + s + ",b:" + b);}
}
- 消费者
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("Topic Receiver2 : " + message);}}
- 配置文件
spring.application.name=spirng-boot-rabbitmq-example
spring.rabbitmq.addresses=ip:5672,ip:5673,ip:5674
spring.rabbitmq.username=dev
spring.rabbitmq.password=xxx
spring.rabbitmq.publisher-confirms=true
- test
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {@Autowiredprivate TopicSender sender;@Testpublic void topic() throws Exception {sender.send2();}}
- 完成上述步骤,调用测试方法,可以看到日志如下
Sender : hi, i am messages 19
callbackSender UUID: 23e5768f-ce01-400b-81ad-3259a6d9a312
Topic Receiver2 : hi, i am messages 19
confirm: 23e5768f-ce01-400b-81ad-3259a6d9a312,s=null,b:true
Sender : hi, i am messages 20
callbackSender UUID: d7c4757a-0311-4de9-bb6d-661de36ef03e
confirm: d7c4757a-0311-4de9-bb6d-661de36ef03e,s=null,b:true
表明回调测试成功