本文主要是介绍rabbitmq 学习三-spring-boot中配置交换器和队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
rabbitmq主要的交换器类型有fanout、direct、topic、headers
下面分别介绍三种常用的交换器使用方法
1.fanout交换器
将所有发送到该交换器的消息会路由到所有与该交换器绑定的队列中
2.direct交换器
将消息路由到RoutingKey完全匹配的队列中
3.topic 交换器
将消息路由到RoutingKey匹配的队列中,匹配的规则支持特殊字符 ”*“和“#”
一、fanout交换器使用和配置
1.声明队列、交换器并将队列绑定到交换器
@Configuration
public class RabbitFanoutExchangeConfiguration {/*** 声明队列* @return*/@Bean(name = "q.test1")public Queue queue() {return new Queue("q.test1");}@Bean("q.test2")public Queue queue2() {return new Queue("q.test2");}/*** 声明交换器* @return*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("x.test1");}/*** 绑定队列到交换器* @return*/@Beanpublic Binding bindingQueue2Exchange() {return BindingBuilder.bind(queue()).to(fanoutExchange());}@Beanpublic Binding bindingQueue2FanoutExchange() {return BindingBuilder.bind(queue2()).to(fanoutExchange());}
}
2.创建消息生产者
@Component
public class RabbitProducer {@AutowiredAmqpTemplate amqpTemplate;public void sendMessage(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.test1amqpTemplate.send("x.test1","",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}
3.创建消息消费者
@Component
@RabbitListener(queues = "q.test1")
public class FanoutExchangeConsumer1 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.test1------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}@Component
@RabbitListener(queues = "q.test2")
public class FanoutExchangeConsumer2 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.test2------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}
二、direct交换器使用和配置
1.声明队列、交换器并将队列绑定到交换器
@Configuration
public class RabbitDirectExchangeConfiguration {/*** 动态声明队列* @return*/@Bean(name = "q.direct1")public Queue queue1() {return new Queue("q.direct1");}/*** 动态声明队列* @return*/@Bean(name = "q.direct2")public Queue queue2() {return new Queue("q.direct2");}/*** 动态声明交换器* @return*/@Beanpublic DirectExchange directExchange() {return new DirectExchange("x.direct");}/*** 使用路由键r.direct.routingKey1将交换器绑定到队列 q.direct1* @return*/@Beanpublic Binding bindingQueue1Exchange() {return BindingBuilder.bind(queue1()).to(directExchange()).with("r.direct.routingKey1");}/*** 使用路由键r.direct.routingKey2 将交换器绑定到队列 q.direct1* @return*/@Beanpublic Binding bindingExchange2Queue2() {return BindingBuilder.bind(queue1()).to(directExchange()).with("r.direct.routingKey2");}/*** 使用路由键 r.direct.routingKey1将交换器绑定到队列 q.direct2* @return*/@Beanpublic Binding bindingExchange2Queue() {return BindingBuilder.bind(queue2()).to(directExchange()).with("r.direct.routingKey1");}
}
2.创建消息生产者
public void sendMessage2DirectExchange(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.direct, 路由键为 r.direct.routingKey1amqpTemplate.send("x.direct","r.direct.routingKey1",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}public void sendMessage2DirectDirectExchange(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.direct, 路由键为 r.direct.routingKey2amqpTemplate.send("x.direct","r.direct.routingKey2",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
3.创建消息消费者
@Component
@RabbitListener(queues = "q.direct1")
public class DirectExchangeConsumer1 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.direct1------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}@Component
@RabbitListener(queues = "q.direct2")
public class DirectExchangeConsumer2 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.direct2------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}
三、topic交换器配置和使用
1.声明队列、交换器并将队列绑定到交换器
@Configuration
public class TopicExchangeConfiguration {/*** 动态声明队列* @return*/@Beanpublic Queue topicQueue1() {return new Queue("q.topic1");}/*** 动态声明队列* @return*/@Beanpublic Queue topicQueue2() {return new Queue("q.topic2");}/*** 动态声明交换器* @return*/@Beanpublic TopicExchange topicExchange() {return new TopicExchange("x.topic");}@Beanpublic Binding bindingTopicExchange2Queue1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.*.test");}@Beanpublic Binding bindingTopicExchange2Queue2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.topic.*");}@Beanpublic Binding bindingTopicExchange2Queue3() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("com.#");}
}
2.创建消息生产者
public void sendMessage2TopicMessage(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.direct, 路由键为 r.direct.routingKey2amqpTemplate.send("x.topic","com.test",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}public void sendMessage2TopicMessage2(Object object) {try {Message message= MessageBuilder.withBody(JSON.toJSONString(object).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID().toString()).build();// 指定exchange 为x.direct, 路由键为 r.direct.routingKey2amqpTemplate.send("x.topic","client.topic.test",message);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
3.创建消息消费者
@Component
@RabbitListener(queues = "q.topic1")
public class TopicExchangeConsumer1 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.topic1------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}@Component
@RabbitListener(queues = "q.topic2")
public class TopicExchangeConsumer2 {@RabbitHandlerpublic void receiveMessage(Object object) {Message message=(Message)object;byte bytes[]=null;if (message != null) {bytes=message.getBody();}try {String msg=new String(bytes,"UTF-8");System.out.println("q.topic2------------------------"+msg);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}
四、创建消息发送测试controller
@RestController
@RequestMapping(value = "/messages")
public class MessageController {@AutowiredRabbitProducer rabbitProducer;@PostMapping(value = "/fanout")public Map<String,Object> sendMessage() {Map<String,Object> map=new HashMap<>();map.put("username","test1");map.put("password","123456");map.put("name","我是fanout 交换器测试人员");rabbitProducer.sendMessage(map);Map<String,Object> resultMap=new HashMap<>();resultMap.put("code","200");resultMap.put("message","success");return resultMap;}@PostMapping(value = "/direct")public Map<String,Object> sendMessage2DirectExchange() {Map<String,Object> map=new HashMap<>();map.put("username","test2");map.put("password","123456");map.put("name","我是direct 交换器测试人员");rabbitProducer.sendMessage2DirectExchange(map);Map<String,Object> resultMap=new HashMap<>();resultMap.put("code","200");resultMap.put("message","success");return resultMap;}@PostMapping(value = "/topic")public Map<String,Object> sendMessage2TopicExchange() {Map<String,Object> map=new HashMap<>();map.put("username","test3");map.put("password","123456");map.put("name","我是topic 交换器测试人员");rabbitProducer.sendMessage2TopicMessage(map);Map<String,Object> resultMap=new HashMap<>();resultMap.put("code","200");resultMap.put("message","success");return resultMap;}@PostMapping(value = "/topic1")public Map<String,Object> sendMessage2TopicExchange1() {Map<String,Object> map=new HashMap<>();map.put("username","test3");map.put("password","123456");map.put("name","我是topic 交换器测试人员");rabbitProducer.sendMessage2TopicMessage2(map);Map<String,Object> resultMap=new HashMap<>();resultMap.put("code","200");resultMap.put("message","success");return resultMap;}
}
源码下载地址
https://github.com/tangyajun/spring-boot-rabbit-consumer
https://github.com/tangyajun/rabbitmq-spring-demo
这篇关于rabbitmq 学习三-spring-boot中配置交换器和队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!