本文主要是介绍尚硅谷阳哥SpringCloud第二季学习笔记(三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
导航目录
- 一、Gateway新一代网关
- 1.1 概述简介
- 1.1.1 官网
- 1.1.2 是什么
- 1.1.3 能干嘛
- 1.1.4 微服务架构中网关在哪里
- 1.1.5 有Zuul了怎么又出来了gateway
- 1.2 三大核心概念
- 1.2.1 Route(路由)
- 1.2.2 Predicate(断言)
- 1.2.3 Filter(过滤)
- 1.2.4 总体
- 1.3 Gateway工作流程
- 1.4 入门配置
- 1.4.1 导入依赖
- 1.4.2 yml
- 1.4.3 主启动
- 1.4.4 使用测试
- 1.5 通过微服务名实现动态路由
- 1.6 Predicate的使用
- 1.6.1 其它的Predicate
- 1.7 Filter的使用
- 1.7.1 是什么
- 1.7.2 生命周期
- 1.7.3 种类
- 1.7.4自定义全局GlobalFilter
- 二、SpringCloud Config配置中心
- 2.1 概述
- 2.1.1 是什么
- 2.1.2 怎么玩
- 2.1.3 能干嘛
- 2.1.4 服务端与GitHub整合配置
- 2.2 服务端Config配置总控中心搭建
- 2.2.1 在GitHub上新建Repository
- 2.2.2 本地磁盘目录新建git仓库并clone
- 2.2.3 新建module
- 2.3 客户端使用
- 2.3.1 新建模块cloud-config-client-3355
- 2.3.2 POM文件
- 2.3.3 新建`bootstrap.yml`文件
- 2.3.4 主启动
- 2.3.5 仿业务类
- 访问结果
- 2.4 随之而来的问题
- 2.4.1 Config动态刷新手动版
- 三、SpringCloud Bus消息总线
- 3.1 概述
- 3.1.1 是什么
- 3.1.2 能干什么
- 3.1.3 为什么被称为总线
- 3.2 RabbitMQ环境配置
- 3.3 SpringCloud Bus动态刷新全局广播
- 3.3.1 以客户端3355为模板新建module
- 3.3.2 导入pom依赖
- 3.3.3 bootstrap.yml配置文件
- 3.3.4 主启动
- 3.3.5 仿业务类
- 3.3.6 设计思想
- 3.3.7 配置中心服务端添加消息总线支持
- 3.3.7 两个客户端分别添加消息总线支持
- 3.4 SpringCloud Bus动态刷新定点通知
- 四、SpringCloud Stream消息驱动
- 4.1 Stream为什么被引入
- 4.2 Stream是什么及Binder介绍
- 4.3 Stream的设计思想
- 4.4 Stream编码常用注解简介
- 4.5 Stream消息驱动之生产者
- 4.6 Stream消息驱动之消费者
- 4.7 Stream之消息重复消费
- 4.7.1 解决重复消费
- 4.8 Stream之消息持久化
一、Gateway新一代网关
1.1 概述简介
1.1.1 官网
2.2.1版本官方文档
1.1.2 是什么
Cloud全家桶中有个很重要的组件就是网关,在1.X版本中都是采用的Zuul网关;但在2.X版本中,Zuul的升级一直跳票,SpringCloud最后自己研发了一个网关替代Zuul,那就是SpringCloud Gateway。
Gateway是在Spring生态系统之上构建的API网关服务,基于Spring 5,Spring Boot 2和Project Reactor等技术。
Gateway旨在提供一种简单而有效的方式来对API进行路由,以及提供一些强大的过滤器功能,例如:熔断、限流、重试等。
SpringCloud Gateway的目标提供统一的路由方式基于Filter链的方式提供了网关基本的功能,例如:安全,监控/指标,和限流。
1.1.3 能干嘛
- 反向代理
- 鉴权
- 流量控制
- 熔断
- 日志监控
1.1.4 微服务架构中网关在哪里
1.1.5 有Zuul了怎么又出来了gateway
为什么选择Gateway
一方面是因为Zuul1.0已经进入了维护阶段,而且很多功能Zuul都没有用起来。Gateway是基于异步非阻塞模型上进行开发的,性能方面不需要担心。虽然Netflix早就发布了最新的Zuul2.X,但Spring Cloud貌似没有整合计划。而且Netflix相关组件都宣布进入维护期。
SpringCloud Gateway具有如下特性
SpringCloud Gateway与Zuul的区别
Zuul1.x模型
SpringCloud中所集成的Zuul版本,采用的是Tomcat容器,使用的是传统的Servlet IO处理模型。
Servlet生命周期
Servlet由servlet container进行生命周期管理。
container启动时构造servlet对象并调用servlet init()进行初始化;
container运行时接受请求,并为每个请求分配一个线程(一般从线程池中获取空闲线程),然后调用service()。
container关闭时调用servlet destory()销毁servlet。
Gateway模型
1.2 三大核心概念
1.2.1 Route(路由)
路由是构建网关的基本模块,它由ID,目标URI,一系列的断言和过滤器组成,如果断言为true则匹配该路由。
1.2.2 Predicate(断言)
参考的是Java8的java.util.function.Predicate
开发人员可以匹配HTTP请求中所有内容(例如请求头或者请求参数),如果请求与断言相匹配则进行路由。
1.2.3 Filter(过滤)
指的是Spring框架中GatewayFilter的实例,使用过滤器,可以在请求被路由前或者之后对请求进行修改。
1.2.4 总体
1.3 Gateway工作流程
客户端向Spring Cloud Gateway发出请求。然后在Gateway Handler Mapping中找到与请求相匹配的路由,将其发送到Gateway Web Handler。
Handler再通过指定的过滤器链来将请求发送到我们实际的服务之星逻辑,然后返回。
过滤器之间用虚线分开是因为过滤器可能会在发送代理请求之前(“pre”)或之后(“post”)之星业务逻辑。
Filter在“pre”类型的过滤器可以做参数校验、权限校验、流量监控、日志输出、协议转换等。
在“post”类型的过滤器中可以做响应内容、响应头的修改,日志的输出,流量监控等有着非常重要的作用。
Gateway的核心逻辑
路由转发+执行过滤器链。
1.4 入门配置
1.4.1 导入依赖
<dependencies><!--gateway--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency><!--eureka-client--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity --><dependency><groupId>com.jg.springcloud</groupId><artifactId>cloud-api-commons</artifactId><version>${project.version}</version></dependency><!--一般基础配置类--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
需要注意的一些细节,在Gateway服务模块不要加以下依赖,否则启动会报错
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
1.4.2 yml
server:port: 9527spring:application:name: cloud-gatewaycloud:gateway:routes:- id: payment_routh #payment_route #路由的ID,没有固定规则但要求唯一,建议配合服务名uri: http://localhost:8001 #匹配后提供服务的路由地址predicates:- Path=/payment/get/** # 断言,路径相匹配的进行路由- id: payment_routh2 #payment_route #路由的ID,没有固定规则但要求唯一,建议配合服务名uri: http://localhost:8001 #匹配后提供服务的路由地址predicates:- Path=/payment/lb/** # 断言,路径相匹配的进行路由
eureka:instance:hostname: cloud-gateway-serviceclient: #服务提供者provider注册进eureka服务列表内service-url:register-with-eureka: truefetch-registry: truedefaultZone: http://eureka7001.com:7001/eureka
1.4.3 主启动
Gateway也需要注册到Eureka注册中心
@SpringBootApplication
@EnableEurekaClient
public class GatewayMain9527 {public static void main(String[] args) {SpringApplication.run(GatewayMain9527.class,args);}
}
1.4.4 使用测试
在没配置Gateway之前我们对cloud-provider-payment8001
模块的访问是依靠它自己的端口号的,现在有了网关我们可以使用网关的端口号,路由到具体的服务模块然后在根据断言找到对应的方法名,调用。
1.5 通过微服务名实现动态路由
默认情况下Gateway会根据注册中心注册的服务列表,以注册中心上微服务名为路径创建动态路由进行转发,从而实现动态路由的功能。
spring:cloud:gateway:discovery:locator:enabled: true # 开启从注册中心动态创建路由的功能,利用微服务名进行路由。
1.6 Predicate的使用
断言除了- Path
,还可以通过- Cookie
,采用正则匹配
- id: payment_routh2 #payment_route #路由的ID,没有固定规则但要求唯一,建议配合服务名#uri: http://localhost:8001 #匹配后提供服务的路由地址uri: lb://cloud-payment-service #匹配后提供服务的路由地址predicates:- Path=/payment/lb/** # 断言,路径相匹配的进行路由- Cookie=username,zzyy
访问效果
1.6.1 其它的Predicate
1.7 Filter的使用
1.7.1 是什么
指的是Spring框架中GatewayFilter的实例,使用过滤器,可以在请求被路由前或者路由之后对请求进行修改。
路由过滤器可用于修改进入的HTTP请求和返回的HTTP响应,路由过滤器只能指定路由进行使用。
Spring Cloud Gateway内置了多种路由过滤器,他们都由GatewayFilter的工厂类来产生。
1.7.2 生命周期
- pre:业务逻辑之前
- post:业务逻辑之后
1.7.3 种类
- GatewayFilter:单一功能的过滤器
- GlobalFilter:全局的过滤器
1.7.4自定义全局GlobalFilter
@Component
@Slf4j
public class MyLogGatewayFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {log.info("进入全局过滤器方法" + new Date());String uname = exchange.getRequest().getQueryParams().getFirst("uname");if (uname == null) {log.info("****用户名为null,非法用户");exchange.getResponse().setStatusCode(HttpStatus.NOT_ACCEPTABLE);return exchange.getResponse().setComplete();}return chain.filter(exchange);}@Overridepublic int getOrder() {//数越小,优先级越高return 0;}
}
二、SpringCloud Config配置中心
2.1 概述
2.1.1 是什么
SpringCloud Config为微服务架构中的微服务提供集中化的外部配置支持,配置服务器为各个不同微服务应用的所有环境提供了一个中心化的外部配置。
2.1.2 怎么玩
SpringCloud Config分为服务端和客户端两部分。
服务端也称为分布式配置中心,它是一个独立的微服务应用,用来连接配置服务器并为客户端提供获取配置信息,加密/解密信息等访问接口。
客户端则是通过指定的配置中心来管理应用资源,以及与业务相关的配置内容,并在启动的时候从配置中心获取和加载配置信息配置服务器默认采用git来存储配置信息,这样就有助于对环境配置进行版本管理,并且可以通过git客户端工具来方便的管理和访问配置内容。
2.1.3 能干嘛
- 集中管理配置文件
- 不同环境不同配置,动态化的配置更新,分环境部署,比如dev/test/prod/beta/release
- 运行期间动态调整配置,不再需要在每个服务部署的机器上编写配置文件,服务会向配置中心统一拉取配置自己的信息。
- 当配置发生变动时,服务不需要重启即可感知到配置的变化并应用新的配置
- 将配置信息以REST接口的形式暴露。
官网
springcloud config官网
2.1.4 服务端与GitHub整合配置
由于SpringCloud Config默认使用Git来存储配置文件(也有其它方式,比如支持SVN和本地文件),但最推荐的还是Git,而且使用的是http/https访问的形式。
2.2 服务端Config配置总控中心搭建
2.2.1 在GitHub上新建Repository
用自己的账号在GitHub上新建一个名为springcloud-config的新Repository
2.2.2 本地磁盘目录新建git仓库并clone
2.2.3 新建module
新建模块spring-config-center-3344
pom文件
<dependencies><!--添加消息总线RabbitMQ支持--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-server</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
yml
server:port: 3344spring:application:name: cloud-config-center #注册进Eureka服务器的微服务名cloud:config:server:git:uri: https://github.com/lkp19960619/springcloud-config.git #GitHub上面的git仓库名字#搜索目录search-paths: /**##git的用户名username: xxxxxxx#git的密码password: xxxxxxx# 指定分支default-label: master#服务注册到eureka地址
eureka:client:service-url:defaultZone: http://localhost:7001/eureka
主启动类
@SpringBootApplication
//开启配置中心注解
@EnableConfigServer
public class ConfigCenterMain3344 {public static void main(String[] args) {SpringApplication.run(ConfigCenterMain3344.class,args);}
}
windows下修改hosts文件,增加映射
127.0.0.1 config-3344.com
测试通过Config微服务是否可以从GitHub上获取配置内容
- 启动ConfigCenterMain3344
- 浏览器访问:http://config-3344.com:3344/master/config-dev.yml
- 页面返回结果:
2.3 客户端使用
2.3.1 新建模块cloud-config-client-3355
2.3.2 POM文件
<dependencies><!--添加消息总线RabbitMQ支持--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-config</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2.3.3 新建bootstrap.yml
文件
application.yml配置文件是用户级的资源配置项
bootstrap.yml是系统级的资源配置项,它的优先级更高
SpringCloud会创建一个Bootstrap Context,作为Spring应用的Application Context的父上下文。
初始化的时候,BootstrapContext负责从外部资源加载配置属性并解析配置。这两个上下文共享一个从外部获取的Environment
。Bootstrap属性有高优先级,默认情况下,它们不会被本地配置覆盖。Bootstrap Context和Application Context有着不同的约定,所以新增了一个bootstrap.yml
文件,保证Bootstrap Context和Application Context配置的分离。
要将Client模块下的application.yml
文件改为bootstrap.yml
,这是很关键的,因为bootstrap.yml
是比application.yml
先加载的。bootstrap.yml
优先级高于application.yml
。
2.3.4 主启动
@EnableEurekaClient
@SpringBootApplication
public class ConfigClientMain3355 {public static void main(String[] args) {SpringApplication.run(ConfigClientMain3355.class,args);}
}
2.3.5 仿业务类
@RestController
public class ConfigClientController {@Value("${config.info}")private String configInfo;@GetMapping("/configInfo")public String getConfigInfo(){return configInfo;}
}
访问结果
可以看到,通过客户端的Rest接口可以拿到服务端配置的dev环境下的配置信息。
2.4 随之而来的问题
如果说我们想要对某个配置文件的内容进行修改,那么修改之后刷新Config的服务端,配置中心会立即响应,但是它的客户端没有任何响应,除非客户端自己重启或者重新加载。这就是Config配置中心的分布式配置的动态刷新问题。
2.4.1 Config动态刷新手动版
step1
修改客户端模块的pom文件,加入actuator监控
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
step2
修改bootstrap.yml
配置文件,添加暴露监控端口配置
# 暴露监控端点
management:endpoints:web:exposure:include: "*"
step3
业务类添加@RefreshScope
注解
@RestController
//支持动态刷新
@RefreshScope
public class ConfigClientController {@Value("${config.info}")private String configInfo;@GetMapping("/configInfo")public String getConfigInfo(){return configInfo;}
}
测试
此时修改github中配置文件内容->访问3344->访问3355
发现3344的配置可以改为最新的配置,但是3355的配置并没有更新为最新的, 我们还要多做一个工作。即POST
请求通知客户端配置已经修改。
curl -X POST "http://localhost:3355/actuator/refresh"
此时,再次访问客户端,就会发现配置已经被改成最新的。避免了服务重启。
想想还有什么问题?
- 假如有多个微服务客户端3355/3366/3377
- 每个微服务都要执行—次post请求,手动刷新?
- 可否广播,一次通知,处处生效?
- 我们想大范围的自动刷新,求方法
三、SpringCloud Bus消息总线
3.1 概述
想要实现分布式自动刷新配置功能,就要使用SpringCloud Bus配合SpringCloud Config使用可以实现配置的动态刷新。
3.1.1 是什么
Bus支持两种消息代理:RabbitMQ和Kafka。
3.1.2 能干什么
SpringCloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可以当作微服务间的通信通道。
3.1.3 为什么被称为总线
什么是总线
在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。
基本原理
ConfigClient实例都监听MQ中同一个topic(默认是springCloudBus)。当一个服务刷新数据的时候,它会把这个消息放入到Topic中,这样其它监听同一个Topic的服务就能得到通知,然后去更新自身的配置。
3.2 RabbitMQ环境配置
- 安装Erlang,下载地址:http://erlang.org/download/otp_win64_21.3.exe
- 安装RabbitMQ,下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3.exe
- 打开cmd进入RabbitMQ安装目录下的sbin目录,如:D:\devSoft\RabbitMQ Scrverk\rabbitmq_server-3.7.14\sbin
- 输入以下命令启动管理功能
rabbitmq-plugins enable rabbitmq _management
这样就可以添加可视化插件。
- 访问地址查看是否安装成功:http://localhost:15672/
- 输入账号密码并登录:guest guest
3.3 SpringCloud Bus动态刷新全局广播
3.3.1 以客户端3355为模板新建module
3.3.2 导入pom依赖
<dependencies><!--添加消息总线RabbitMQ支持--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-config</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
3.3.3 bootstrap.yml配置文件
server:port: 3366spring:application:name: config-clientcloud:#Config客户端配置config:label: master #分支名称name: config #配置文件名称profile: dev #读取后缀名称 上述3个综合:master分支上config-dev.yml的配置文件被读取http://config-3344.com:3344/master/config-dev.ymluri: http://localhost:3344 #配置中心地址#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口rabbitmq:host: 192.168.9.132port: 5672username: jgpassword: 123456#服务注册到eureka地址
eureka:client:service-url:defaultZone: http://localhost:7001/eureka# 暴露监控端点
management:endpoints:web:exposure:include: "*"
3.3.4 主启动
@SpringBootApplication
@EnableEurekaClient
public class ConfigClientMain3366 {public static void main(String[] args) {SpringApplication.run(ConfigClientMain3366.class,args);}
}
3.3.5 仿业务类
@RestController
@RefreshScope
public class ConfigClientController {@Value("${server.port}")private String serverPort;@Value("${config.info}")private String configInfo;@GetMapping("/configInfo")public String configInfo(){return "serverPort: "+serverPort+"\t\n\n configInfo: "+configInfo;}}
3.3.6 设计思想
1)利用消息总线触发一个客户端/bus/refresh,而刷新所有客户端的配置
2)利用消息总线触发一个服务端ConfigServer的/bus/refresh端点,而刷新所有客户端的配置。
图二的架构显然更加适合,图—不适合的原因如下:
-
打破了微服务的职责单一性,因为微服务本身是业务模块,它本不应该承担配置刷新的职责。
-
坏了微服务各节点的对等性。
-
有一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,那就会增加更多的修改。
3.3.7 配置中心服务端添加消息总线支持
pom
<!--添加消息总线RabbitNQ支持-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amap</artifactId>
</dependency>
<dependency><groupId>org-springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
yml
server:port: 3344spring:application:name: cloud-config-center #注册进Eureka服务器的微服务名cloud:config:server:git:uri: git@github.com:zzyybs/springcloud-config.git #GitHub上面的git仓库名字####搜索目录search-paths:- springcloud-config####读取分支label: master
#rabbitmq相关配置<--------------------------
rabbitmq:host: localhostport: 5672username: guestpassword: guest#服务注册到eureka地址
eureka:client:service-url:defaultZone: http://localhost:7001/eureka##rabbitmq相关配置,暴露bus刷新配置的端点<--------------------------
management:endpoints: #暴露bus刷新配置的端点web:exposure:include: 'bus-refresh'
3.3.7 两个客户端分别添加消息总线支持
pom
<!--添加消息总线RabbitNQ支持-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amap</artifactId>
</dependency>
<dependency><groupId>org-springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
yml
server:port: 3355spring:application:name: config-clientcloud:#Config客户端配置config:label: master #分支名称name: config #配置文件名称profile: dev #读取后缀名称 上述3个综合:master分支上config-dev.yml的配置文件被读取http://config-3344.com:3344/master/config-dev.ymluri: http://localhost:3344 #配置中心地址k#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口<----------------------rabbitmq:host: localhostport: 5672username: guestpassword: guest#服务注册到eureka地址
eureka:client:service-url:defaultZone: http://localhost:7001/eureka# 暴露监控端点
management:endpoints:web:exposure:include: "*"
此处是基于设计思想二来实现的,即通知服务端更新配置从而带动客户端更新配置。启动服务之后,发送POST请求给服务端,通知刷新配置。
curl -X POST "http://localhost:3344/actuator/bus-refresh"
做完了这些工作,此时再在github上对配置文件修改,客户端就可以动态刷新。
3.4 SpringCloud Bus动态刷新定点通知
指定通知某一个实例生效而不是全部
- 公式:http://localhost:3344/actuator/bus-refresh/{destination}
- curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355
四、SpringCloud Stream消息驱动
4.1 Stream为什么被引入
常见MQ(消息中间件)
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。
Cloud Stream是什么?屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
4.2 Stream是什么及Binder介绍
官方文档
官方文档
Cloud Stream中文指导手册
什么是Spring Cloud Stream?
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
应用程序通过inputs
或者outputs
来与Spring cloud Stream中binder对象交互
通过我们配置的binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka。
4.3 Stream的设计思想
标准MQ
- 生产者/消费者之间靠消息媒介传递信息内容
- 消息必须走特定的通道 - 消息通道 Message Channel
- 消息通道里的消息如何被消费呢,谁负责收发处理 - 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅。
为什么用Cloud Stream
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候Spring Cloud Stream给我们提供了—种解耦合的方式。
Stream凭什么可以统一底层的差异?
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder:
- INPUT对应于消费者
- OUTPUT对应于生产者
Stream中的消息通信方式遵循了发布-订阅模式
Topic主题进行广播
- 在RabbitMQ就是Exchange
- 在Kafka就是Topic
4.4 Stream编码常用注解简介
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。
编码API和常用注解
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输乎通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
案例说明
工程中新建三个子模块
- cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
- cloud-stream-rabbitmq-consumer8802,作为消息接收模块
- cloud-stream-rabbitmq-consumer8803,作为消息接收模块
4.5 Stream消息驱动之生产者
新建Module:cloud-stream-rabbitmq-provider8801
pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--基础配置--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
yml
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8801.class,args);}
}
业务类接口
public interface IMessageProvider {public String send();
}
业务类接口实现类
import com.lun.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{@Resourceprivate MessageChannel output; // 消息发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("*****serial: "+serial);return null;}
}
控制器
import com.lun.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class SendMessageController
{@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage() {return messageProvider.send();}}
需要注意的是,如果使用的远程RabbitMQ,可能启动服务之后会报错
Rabbit health check failed
,此时,我们需要关闭RabbitMQ的心跳检测,在application.yml
配置文件中添加如下配置
#关闭Rabbit的心跳检测
management:health:rabbit:enabled: false
4.6 Stream消息驱动之消费者
新建模块cloud-stream-rabbitmq-consumer8802
POM
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--基础配置--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
yml
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.168.9.132port: 5672username: jgpassword: 123456virtual-host: /blogbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://127.0.0.1:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
#关闭Rabbit的心跳检测
management:health:rabbit:enabled: false
主启动
@SpringBootApplication
public class StreamMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMain8802.class,args);}
}
消费者控制器
@Component
@EnableBinding(Sink.class)
public class MessageConsumerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1号,接收到的消息:"+message.getPayload());}
}
测试:
- 启动注册中心模块、生产者模块和消费者模块,生产者调用生产消息控制器,在消费者模块后台控制条打印出生产的具体的消息。
4.7 Stream之消息重复消费
上面已经实现了消息的生产和消费,但是如果我们拷贝多个消费者,并且在没有对消费者进行特殊配置的前提下,会发现运行后有两个问题:
- 有重复消费的问题
- 消息持久化问题
4.7.1 解决重复消费
为了解决重复消费的问题,需要用到一个分组和持久化属性group
生产实际案例
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。
在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费)。
修改消费者模块的yml
spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置group: A_Group #<----------------------------------------关键
同样位置,为8803修改分组为group: B_Group
这样做的结果还是重复消费,因为8802和8803是不同的组,对于不同的组消费策略是组外轮询。
此时我们再分别修改8802和8803的分组为同一个组,那么同一个组的多个微服务实例,每次只会有一个拿到。
4.8 Stream之消息持久化
通过上述,解决了重复消费问题,再看看持久化。
停止8802/8803并去除掉8802的分组group: A_Group,8803的分组group: A_Group没有去掉。
8801先发送4条消息到RabbitMq。
先启动8802,无分组属性配置,后台没有打出来消息。
再启动8803,有分组属性配置,后台打出来了MQ上的消息。(消息持久化体现)
这篇关于尚硅谷阳哥SpringCloud第二季学习笔记(三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!