Spring Cloud Stream如何屏蔽不同MQ带来的差异性?

2023-11-30 08:30

本文主要是介绍Spring Cloud Stream如何屏蔽不同MQ带来的差异性?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言

在当前的微服务架构下,使用消息队列(MQ)技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性,我们需要避免对单一开源技术的依赖

市面上有多种消息队列技术,如 Kafka、RocketMQ、RabbitMQ 等。关键在于如何在微服务体系中实现这些MQ组件的无缝切换,以减少代码修改需求。

Spring Cloud Stream 通过其与主流消息中间件的灵活集成,实现了通过仅修改配置文件的方式来切换不同的MQ实现,从而提高了系统的适应性和可维护性。

什么是 Spring Cloud Stream

Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。

基于 Spring Boot 构建,用于创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。它提供了来自多个供应商的中间件的固定配置,引入了持久发布-订阅语义、消费者组和分区的概念。

简单来说 Spring Cloud Stream 是对 Spring Integration 和 Spring Boot 的合并。

图一

图一

主要概念:

1. application model(应用模型)

图二.Spring Cloud Stream 应用程序

图二.Spring Cloud Stream 应用程序

由中间件提供的 Binder 来处理绑定。 应用程序通过绑定这个 Binder 与其建立联系,发送消息时应用程序通过 outputs 通道将消息传递给 BinderBinder 再把消息给消息中间件。接收消息时消息中间件将消息传递给 BinderBinder 再把消息通过 inputs 通道传递给应用程序。

比如 Kafka Binder 依赖如下图:

图三 spring cloud stream kafka依赖

图三 spring cloud stream kafka依赖

2. The Binder Abstraction(Binder抽象)

Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件。

Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。 RocketMQ Binder 已由 Spring Cloud Alibaba 实现。

Binder 抽象也是该框架的扩展点之一,我们可以在 Spring Cloud Stream 之上实现自定义 Binder。

3. Programming Model(编程模型)

核心概念

  • Destination Binders(目标绑定器):负责提供与外部消息传递系统集成的组件。

  • Bindings(绑定):外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。

  • Message(消息):生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

图四

图四

环境搭建

本文环境:

  • Java:17

  • Spring Boot:3.0.2

  • Spring Cloud:2022.0.2

  • Spring Cloud Alibaba:2022.0.0.0

maven依赖配置

pom.xml依赖如下:

消息驱动jar,用哪个mq引入哪个即可。<dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>
</dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

配置文件

application.yml RocketMq 配置信息:

spring:cloud:stream:stream:rocketmq:binder:name-server: 127.0.0.1:9876;127.0.0.1:9877function:# 组装和绑定definition: myTopicCbinders:default:type: rocketmqbindings:## 生产者 新版本固定格式  函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式  函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic

application.yml Kafka 配置信息:

spring:cloud:stream:stream:kafka:binder:brokers: 127.0.0.1:9092function:# 组装和绑定definition: myTopicCbinders:default:type: kafkabindings:## 生产者 新版本固定格式  函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式  函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic

消息生产者

创建一个简单的消息生产者:

@RestController
@Slf4j
public class ProducerStream {@Autowiredprivate StreamBridge streamBridge;@GetMapping("/test-stream")public String testStream() {streamBridge.send("demoChannel-out-0",MessageBuilder.withPayload("消息体").build());return "success";}
}

消息消费者

创建一个消息消费者来接收消息:

@Slf4j
@Configuration
public class TestStreamConsumer {@Beanpublic Consumer<String> demoChannel() {return message -> {log.info("demoChannel接到消息:{}", message);};}
}

假如需要从 Kafka 替换成 RocketMq ,只需要修改pom文件和配置文件即可。

在之前的 Spring Cloud Stream 版本中是采用注解的方式来实现绑定,在新版本中是通过函数式编程模型来绑定名称。采用约定大于配置的思想,简化了应用程序配置。

具体可见官方文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names

Spring Cloud Stream 发送消息流程

图五 spring cloud stream消息流程图

图五 spring cloud stream消息流程图

消息模型

通过图三可以看到 Sping Cloud Stream 的依赖关系。

Sping Cloud Stream -> Spring Integration -> Spring Messaging

可以看出来 Sping Cloud Stream 是基于 Spring Integration 做了一层封装,是依赖于 Spring Integration 这个组件的,而 Spring Integration 则依赖于 Spring Messaging 组件来实现消息处理机制的基础设施。

Spring Integration 是对 Spring Messaging 的扩展,设计目标是系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。

各个异构系统相互集成时,Spring Integration 通过通道之间的消息传递,让我们可以在消息的入口和出口使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理。

Spring MessagingSpring 框架中的一个底层模块,用于提供统一的消息编程模型。

消息 Message 接口定义:

public interface Message<T> {//消息体T getPayload();//消息头MessageHeaders getHeaders();
}

消息通道 MessageChannel 接口定义:

@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;//发送消息,无限期阻塞default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}//发送消息,阻塞直到到达指定超时时间boolean send(Message<?> message, long timeout);
}

消息通道 MessageChannel 接收消息,调用send()方法将消息发送至该消息通道。

消息通道可简单理解为对队列的一种抽象。通道的名称对应队列的名称。

Spring message 把通道抽象成两种基本表现形式

  • 支持轮询的 PollableChannel

  • 实现发布-订阅模式的 SubscribableChannel

这两个通道都继承自具有消息发送功能的 MessageChannel

public interface SubscribableChannel extends MessageChannel {//通过注册回调函数MessageHandler来实现事件响应//注册消息处理器boolean subscribe(MessageHandler handler);//取消注册消息处理器boolean unsubscribe(MessageHandler handler);
}
public interface PollableChannel extends MessageChannel {//通过轮询操作主动获取消息//从通道中接收消息@NullableMessage<?> receive();//指定超时时间,从通道中接收消息@NullableMessage<?> receive(long timeout);
}

MessageHandler接口定义:

@FunctionalInterface
public interface MessageHandler {//处理消息方法void handleMessage(Message<?> message) throws MessagingException;
}

再回到图五流程图中,我们最终可以看到 KafkaRocketMQ 通过继承 AbstractMessageHandler 抽象类( AbstractMessageHandler 抽象类是实现了 MessageHandler 接口)来实现不同中间件的消息发送操作。而这些都是封装在各自中间件对应的 Binder 代码中来实现。

结论

回到我们的主题,Spring Cloud Stream 如何屏蔽不同 MQ 带来的差异性?

  • 统一的编程模型:发送和接收代码一致,开发者专注于业务逻辑即可。不用管底层消息中间件的实现。

  • Binder 抽象:封装与消息队列的交互逻辑,每种队列有自己的 Binder 实现。

  • 自动配置和约定优于配置:采用约定大于配置的思想,极少的改动配置文件实现消息队列的切换,而代码不用变动。

  • 高级特性的抽象:如分区、消息分组、持久性订阅等高级特性,Spring Cloud Stream 提供了抽象层,由不同的消息队列去实现。

参考资料

  • 官方文档:Spring Cloud Stream Reference Guide

  • 《Spring核心技术和案例实战》

这篇关于Spring Cloud Stream如何屏蔽不同MQ带来的差异性?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/436336

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

2. c#从不同cs的文件调用函数

1.文件目录如下: 2. Program.cs文件的主函数如下 using System;using System.Collections.Generic;using System.Linq;using System.Threading.Tasks;using System.Windows.Forms;namespace datasAnalysis{internal static

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听