本文主要是介绍【RabbitMQ】使用SpringAMQP的消息队列(Hello Word)和工作队列(Work Queue),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
SpringAMQP
SpringAMQP中文文档
Hello Word
**案例1:**利用SpringAMQP实现HelloWord中的集成消息队列功能
项目结构,如图:
1.引入AMQP依赖(父工程中)
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.publisher包的application.yml中添加MQ连接信息
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.174.129 # rabbitMQ的ip地址(我是在虚拟机上用docker安装的RabbitMQ)port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码
3.在消息发送者(publisher包)中新建一个测试类SpringAmqpTest,编写测试方法
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate@Testpublic void testSimpleQueue(){String queueName ="simple.queue"; //队列名称String message ="hello, spring amgp!"; //消息rabbitTemplate.convertAndSend(queueName,message);}
}
4.运行,结果
5.在consumer中编写消费逻辑,监听simple.queue
5.1consumer包的application.yml中添加MQ连接信息
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.174.129 # rabbitMQ的ip地址(我是在虚拟机上用docker安装的RabbitMQ)port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码
5.2在消费者(consumer包)新建一个类SpringRabbitListener,编写消费逻辑
@Component
public class SpringRabbitListener {@RabbitListener(queues="simple.queue")public void listenSimple0ueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息 :【"+ msq +"】");}
}
5.3运行,结果
simple.queue队列中的消息也为空了
Work Queue
案例2:WorkQueue工作队列,实现一个队列绑定多个消费者
Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
前提:在案例1的基础上
1.在SpringAmqpTest添加新的发布
//消息发送50次
@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20); //休眠20秒}}
2.注释掉SpringRabbitListener上原有方法,添加新的消费
@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);}
3.运行,结果
结果为消费,平均分配
4.消费预取限制,能者多劳—在消费者(consumer包)的application.yml修改
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.174.129 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
结果,如图:
这篇关于【RabbitMQ】使用SpringAMQP的消息队列(Hello Word)和工作队列(Work Queue)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!