十七、SpringAMQP

2023-11-21 22:36
文章标签 十七 springamqp

本文主要是介绍十七、SpringAMQP,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

一、SpringAMQP的介绍:

二、利用SpringAMQP实现HelloWorld中的基础消息队列功能

1、因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中

2、编写yml文件

3、编写测试类,并进行测试

三、在consumer中编写消费逻辑,监听simple.queue

1、导入依赖,刚才在父工程中已经导入了,所以省略

2、编写yml文件

3、新建类,实现消费逻辑

4、运行并测试

四、模拟WorkQueue,实现一个队列绑定多个消费者

1、编写生产者(生产50个消息)

2、编写消费者(一个消费者更快,一个消费者更慢)

3、测试

4、消费预取的修改

5、重新测试

五、发布和订阅

(一)利用SpringAMQP演示FanoutExchange的使用

1、新建config类,声明交换机和队列

2、启动项目,查看配置

3、编写消费者代码

4、编写生产者代码

5、运行代码,观察输出

(二)交换机的作用

(三)声明队列、交换机、绑定关系的Bean是什么?

(四)DirectExchange

1、编写消费者代码

2、编写生产者代码

(五)Direct交换机与Fanout交换机的差异

(六)TopicExchange

1、编写消费者代码

2、编写生产者代码

3、运行测试

4、描述下Direct交换机与Topic交换机的差异

(七)测试发送Object类型信息

1、新增队列

2、发送对象

3、查看

4、优化(使用jackson进行序列化)

5、接收消息


一、SpringAMQP的介绍:

  1. AMQP是一种高级消息队列协议。

  2. SpringAMQP是基于Spring Framework的AMQP扩展,提供了一个抽象层,使得使用AMQP进行消息传递变得更加简单。

  3. SpringAMQP支持多种消息传递模式,包括点对点、发布/订阅和请求/响应等。

  4. SpringAMQP提供了许多高级功能,例如队列管理、消息确认、事务和消息过滤等。

  5. SpringAMQP提供了集成测试工具和基于Spring Boot的自动配置,使得集成AMQP变得更加容易。

  6. 总之,SpringAMQP是一个灵活、可扩展的AMQP实现,它使得使用消息队列时变得更加容易和高效。

二、利用SpringAMQP实现HelloWorld中的基础消息队列功能

1、因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中

<!--        AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、编写yml文件

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.248.152port: 5672virtual-host: /username: itcastpassword: 123456

3、编写测试类,并进行测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMassage(){String queue = "simple.queue";String massage="aaaaaaa";rabbitTemplate.convertAndSend(queue,massage);}
}

三、在consumer中编写消费逻辑,监听simple.queue

1、导入依赖,刚才在父工程中已经导入了,所以省略

2、编写yml文件

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.248.152port: 5672virtual-host: /username: itcastpassword: 123456

3、新建类,实现消费逻辑

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者接收到消息:"+msg);}
}

4、运行并测试

注意:

消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

四、模拟WorkQueue,实现一个队列绑定多个消费者

1、编写生产者(生产50个消息)

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMassage(){String queue = "simple.queue";String massage="HelloWorld";for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queue,massage);}}
}

2、编写消费者(一个消费者更快,一个消费者更慢)

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者0接收到消息:"+msg+ LocalTime.now());try {Thread.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}@RabbitListener(queues = "simple.queue")public void listenSimpleQueue1(String msg){System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());try {Thread.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

3、测试

我们发现,虽然消费者0更快,但是它并没有承担更多的工作量;

这是因为消费预取机制会让消费者事先分配好要处理的消息,而不是按能力分配;

4、消费预取的修改

可以在yml文件中修改

    listener:simple:prefetch: 1 #表示预取上限为1

5、重新测试

五、发布和订阅

(一)利用SpringAMQP演示FanoutExchange的使用

1、新建config类,声明交换机和队列
@Configuration
public class FanoutConfig {///1@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}@Beanpublic Binding bindingQueue1(FanoutExchange exchange,Queue fanoutQueue1){return BindingBuilder.bind(fanoutQueue1).to(exchange);}///2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}@Beanpublic Binding bindingQueue2(FanoutExchange exchange,Queue fanoutQueue2){return BindingBuilder.bind(fanoutQueue2).to(exchange);}
}
2、启动项目,查看配置

绑定成功

3、编写消费者代码
    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.err.println("消费者2接收到消息__________-:"+msg+ LocalTime.now());}
4、编写生产者代码
    @Testpublic void sendFanoutMassage(){String exchangeName = "itcast.fanout";String message = "Hello Every One";rabbitTemplate.convertAndSend(exchangeName,"",message);}
5、运行代码,观察输出

发现两个消费者都接收到了消息

(二)交换机的作用

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

(三)声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding
     

(四)DirectExchange

实现:

1、编写消费者代码
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue1(String msg){System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者2接收到消息__________-:"+msg+ LocalTime.now());}
2、编写生产者代码
    @Testpublic void sendDirectMassage(){String exchangeName = "itcast.direct";String message = "Hello Every One1111";rabbitTemplate.convertAndSend(exchangeName,"blue",message);}
    @Testpublic void sendDirectMassage(){String exchangeName = "itcast.direct";String message = "Hello Every One1111";rabbitTemplate.convertAndSend(exchangeName,"red",message);}

(五)Direct交换机与Fanout交换机的差异

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
     

基于@RabbitListener注解声明队列和交换机有哪些常见注解

  • @Queue
  • @Exchange
     

(六)TopicExchange

利用SpringAMQP演示TopicExchange的使用

1、编写消费者代码
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者1接收到消息aaaaaa__-:"+msg+ LocalTime.now());}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.err.println("消费者2接收到消息a__-:"+msg+ LocalTime.now());}
2、编写生产者代码
    @Testpublic void sendTopicMassage(){String exchangeName = "itcast.topic";String message = "Hello Every One12222";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
3、运行测试

4、描述下Direct交换机与Topic交换机的差异

(七)测试发送Object类型信息

1、新增队列
    @Beanpublic Queue objectQueue(){return new Queue("object.queue");}
2、发送对象
    @Testpublic void sendObjectMassage(){Map<String ,Object> message = new HashMap<>();message.put("name","11");message.put("age","22");rabbitTemplate.convertAndSend("object.queue",message);}
3、查看

对象被序列化了,这种方式性能差,不安全(容易被注入)

4、优化(使用jackson进行序列化)

引入依赖 

        <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

添加配置Bean

    @Beanpublic Jackson2JsonMessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}

5、接收消息

编写配置Bean

    @Beanpublic Jackson2JsonMessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}

编写消费者代码

@RabbitListener(queues = "object.queue")
public void listenObjectQueue1(Map<String,Object> msg){System.err.println("消费者接收到消息___da_______-:"+msg+ LocalTime.now());
}

验证

注意:

这篇关于十七、SpringAMQP的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/405571

相关文章

ASP.NET Core 入门教学十七 GraphQL入门指南

GraphQL 是一种用于 API 的查询语言,允许客户端请求所需的数据,并能够合并多个资源到一个请求中。在 ASP.NET Core 中使用 GraphQL 可以提供更灵活、高效和实用的数据查询方式。以下是 ASP.NET Core 中 GraphQL 的入门指南: 1. 安装必要的 NuGet 包 首先,你需要安装以下 NuGet 包: GraphQLGraphQL.Server.Tra

Flink实战案例(十七):Flink 异步IO (二)原理

1 原理实现 AsyncDataStream.(un)orderedWait方法的主要工作就是创建了一个 AsyncWaitOperator。AsyncWaitOperator 是支持异步 IO 访问的算子实现,该算子会运行 AsyncFunction 并处理异步返回的结果,其内部原理如下图所示:   如图所示,AsyncWaitOperator 主要由两部分组成:StreamElem

【硬刚ES】ES基础(十七)结构化搜索

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的ES部分补充。

【MySQL】Explain执行计划(十七)

🚗MySQL学习·十七站~ 🚩本文已收录至专栏:MySQL通关路 ❤️每章节附章节思维导图,文末附全文思维导图,感谢各位点赞收藏支持~ ⭐学习汇总贴,超详细思维导图:【MySQL】学习汇总(完整思维导图) 一.引入 当我们需要对SQL语句进行优化时必须先分析其性能时,EXPLAIN是一个不可或缺的神器,它可以帮助我们获取 MySQL 如何执行 SELECT 语句的信息,例如 表如何

【Unity 3D】学习笔记十七:自定义游戏中字体

在已开发出来的游戏里,总有些好看的字体,来增加的游戏的趣味性。此时,系统自带的默认字体就显得有些砢碜了。Unity支持所有的.tff的字集,默认的是Arial。 在改变字体设置前,需要得到一个.tff的字符集。可以选择去网上下,也可以直接复制电脑中的字符集。将字符集拖放只Unity的工程文件夹下就行。然后再project视图中选择create——GUI Skin 创建一个GUI皮肤来设置我

非标独立设计选型--十七--滚珠丝杆选型计算

螺纹种类:锁紧螺纹、传动螺纹                      自锁性        高效率 传动螺纹:滑动丝杆、滚珠丝杆 滑动丝杆(梯形丝杆):纯滑动摩擦--黄铜(自润滑性好)效率很低60%,结构简单成本低--没有精度可言---线接触--大负载---启动阻力大导致超低速运行时出现爬行蠕动现象 没有精度要求,需要较大的轴向负载,预算较低需要降低成本,低速运行,不重要场合 1、手摇调节机

十七、模拟 实现栈和队列类

Ⅰ . 模拟实现 stack 01 实现思路 插入数据删除数据这些逻辑其实没有必要自己实现,而是采用转换的方式 之前我们讲解了适配器的知识,这里采用的就是一些其他的适配的容器去实现 至于转换什么,我们可以进一步想到,好像有很多容器适合去转换 所以 STL 中增加了一个模板参数 Container,利用 Container 来进行转换 上一章末尾,我们利用了 deque 去实现栈和队列

【git之窗】(十七)线上问题如何拉取紧急分支

一、前提       通常使用git,都会在上线前把代码合并到master分支,在master上打好tag,由上线tag、回退tag确保上线正常。       例如:       上线tag: VINCENT_tag_V1.3.1       回滚tag: VINCENT_tag_V1.3.0   二、问题      如上所述,如果master上线的tag(VINCENT_tag_V

ExtJs 入门教程十七[项目 :items]

一、语法 items:[{layout:'column',//布局值为:column、formautoHeight:true,xtype:'fieldset',//见说明[1]style:'padding-top:10px;',items:[{columnWidth:.5,//列宽layout:'form',items:[{xtype:'combo',inputType: 'password',

(十七)Flink 容错机制

目录 分布式快照 Checkpoint Checkpoint 模式 Checkpoint 配置 非对齐 Checkpointing 状态存储 Savepoint 分配算子 ID Savepoint 操作 Checkpoint 与 Savepoint 区别 作业重启与故障恢复策略 重启策略 恢复策略 对于不间断 24 小时运行的程序来说,容错至关重要。Flink 定期