Spring Integration 是什么?

2024-03-25 09:12
文章标签 java spring integration

本文主要是介绍Spring Integration 是什么?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spring Integration 是什么?

Spring Integration 在 Spring 家族不太有名气,如果不是有需求,一般也不会仔细去看。那么 Spring Integration 是什么呢?用官方的一句话来解释就是:它是一种轻量级消息传递模块,并支持通过声明式适配器与外部系统集成。简单来说,Spring Integration 抽象了用于消息传递的一套规范,并且基于这套规范提供了很多企业级的中间件的集成。比如他支持基于 AMQP 的消息队列、MQTT、RMI 等等中间件。

用过 Spring 家族组件的同学应该会比较容易理解了。例如,Spring Data 抽象了数据访问的一系列接口,后端可支持多种 ORM;Spring Cache 抽象了缓存使用的接口,后端支持 Caffeine、Redis、Memcached 等缓存中间件。其实这都是一样的。好处是,我们只需要熟悉这一种规范,就可以任意的去对接各种企业级框架,起到快速开发的作用;劣势是,这些企业级的框架只能再 Spring 抽象的这套规范下工作,对于一些细节的开发,可能仍然需要使用原生的框架来实现。

本文主要介绍的是 Spring Integration,以及它是如何集成 MQTT 协议的。

Spring Integration 消息抽象

刚刚我们讲了,Spring Integration 实际上就是抽象出了消息传递的规范,然后再适配各种消息中间件。那么下面我们先简单了解下 Spring Integration 消息通信的模式。

image.png

image.png

image.png

image.png

image.png

image.png

以上几张官方提供的图可以大致厘清 Spring Integration 的各类组件和工作模式:

  1. Message 包含 Header 和 Payload 两部分。
  2. MessageChannel 用于解耦生产者和消费者,实现消息发送。
  3. MessageRouter 用来控制消息转发的 Channel。
  4. Service Activitor 用来绑定 MessageHandler 和用于消费消息的 MessageChannel。
  5. ChannelAdapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

在开发上就需要去了解这些抽象组件的具体实现了,在下面讲到 MQTT 的集成上可以再体会一下 SI 的设计思路。

MQTT 协议

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

这是 MQTT 协议的官方描述,它是一种应用于物联网的轻量级的发布订阅协议,类似于 AMQP。详细了解可以参考:

  • MQTT Specifications
  • [emqx mqtt 协议介绍](docs.emqx.cn/broker/v4.3… 协议)
  • MQTT 协议中文版
  • 消息推送标准协议:MQTT

下面提一些重要的或者开发中需要配置的点。

通信方式

默认是发布 / 订阅模式的。

  1. 通信系统中有发布者和订阅者。发布者发布消息而订阅者接收消息。我们把发布者和订阅者统称为客户端。客户端可以同时是发布者和订阅者。
  2. 在系统中有另外一个角色,它接收发布者的消息并且将消息派发给订阅者。我们一般称这个角色为消息 Broker。
  3. 在 MQTT 中默认是广播的,也就是说订阅了相同 topic 的订阅者都能收到发布者发送的消息。

基于主题 (Topic) 消息路由

MQTT 协议基于主题 (Topic) 进行消息路由,主题 (Topic) 类似 URL 路径,例如:

 

bash

复制代码

chat/room/1 sensor/10/temperature sensor/+/temperature

主题 (Topic) 通过'/'分割层级,支持'+', '#'通配符:

  • '+': 表示通配一个层级,例如 a/+,匹配 a/x, a/y
  • '#': 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c/d
  • 订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

QoS

为了满足不同的场景,MQTT 支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

  • 0:At most once。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
  • 1:At least once。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
  • 2:Exactly onces。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别 2 是最合适的。

订阅者收到 MQTT 消息的 QoS 级别,最终取决于发布消息的 QoS 和主题订阅的 QoS

Broker 选型

本文使用的 MQTT Broker 是 EMQ X 的开源版。

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。 Erlang/OTP 是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed) 的语言平台。

客户端代码集成

Java 客户端一般使用 Eclipse Paho Java Client,此客户端为 Java SE 版本的,为了在 SpringBoot 上有更好的集成,这里我们使用 Spring Integration,Spring Integration MQTT Support 默认集成的就是 Eclipse Paho Java Client V3 版本。

依赖和参数配置

 

xml

复制代码

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>

 

yml

复制代码

mqtt: url: tcp://172.17.218.94:1883 username: admin password: public clientId: mqtt-sender

 

java

复制代码

@Data @Component @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String url; private String username; private String password; private String clientId; }

发布者配置

 

java

复制代码

@Configuration @IntegrationComponentScan public class MqttConfig { @Autowired private MqttProperties prop; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setServerURIs(new String[]{prop.getUrl()}); mqttConnectOptions.setUserName(prop.getUsername()); mqttConnectOptions.setPassword(prop.getPassword().toCharArray()); // 客户端断线时暂时不清除,直到超时注销 mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setAutomaticReconnect(true); factory.setConnectionOptions(mqttConnectOptions); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( prop.getClientId() + "-pub-" + Instant.now().toEpochMilli(), mqttClientFactory); messageHandler.setAsync(true); messageHandler.setDefaultRetained(false); messageHandler.setAsyncEvents(false); // Exactly Once messageHandler.setDefaultQos(2); messageHandler.setDefaultTopic(ApiConst.MQTT_TOPIC_SUFFIX); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } } @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttTemplate { void send(String payload); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos); }

  1. @IntegrationComponentScan,开启 Spring Integration 的注解扫描。
  2. 注入客户端工厂类 MqttPahoClientFactory,此处可以配置认证参数、超时时间等 broker 连接参数。
  3. 注入 MessageChannel 实例。
  4. 注入 MessageHandler 的实例,并通过 @ServiceActivator 绑定到对应的 MessageChannel。此处可配置消息处理的模式、QoS、默认的 Topic 等。
  5. 定义一个 @MessagingGateway 修饰的接口,用于消息的发送,@MessagingGatewaydefaultRequestChannel 参数用于绑定具体的 MessageChannel
  6. 在使用的地方自动注入 MqttTemplate 的实例,即可调用方法发送消息。

订阅者配置

 

java

复制代码

@Configuration @IntegrationComponentScan public class MqttConfig { private final MqttProperties prop; private final MqttInboundMessageHandler mqttInboundMessageHandler; public MqttConfig(MqttProperties prop, MqttInboundMessageHandler mqttInboundMessageHandler) { this.prop = prop; this.mqttInboundMessageHandler = mqttInboundMessageHandler; } @Bean public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(prop.getClientId() + "-sub-" + Instant.now().toEpochMilli(), mqttClientFactory, "facego/reply"); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(2); adapter.setOutputChannel(mqttInboundChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler InboundMessageHandler() { return mqttInboundMessageHandler; } @Bean public MessageChannel mqttInboundChannel() { return new DirectChannel(); } } @Slf4j @Component public class MqttInboundMessageHandler implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("mqtt reply: {}", message.getPayload()); } }

  1. 注入消息处理的 MessageChannel
  2. 注入自己实现的 MqttInboundMessageHandler,并通过 @ServiceActivator 绑定到对应的 MessageChannel
  3. 注入 Channel Adapter 的实例,配置客户端订阅的 Topic 和相应的 MessageChannel

Spring Integration 大致交互逻辑

对于发布者:

  1. 消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
  2. DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

  1. 通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel
  2. 同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定,这里是 MQTT Eclipse Paho Java Client。

总结

本文主要介绍了 Java 使用 MQTT 通信的方式,由于使用了 SpringBoot,因此使用 Spring Integration 来集成会比直接只用 Eclipse Paho Java Client 更符合 Spring 的哲学,所有的 Bean 均单例注入统一管理。

Spring Integration 的好处在于,我们只需要了解其消息通信的基本机制,屏蔽了 Eclipse Paho Java Client 的具体细节,便于编码。从上面的代码可以看出,我们仅仅注入了相关的 Bean,给出响相应的配置信息即可。

参考文献

  • Spring Integration Reference Guide
  • Spring Integration 中文手册(完整版)
  • SpringBoot 集成 MQTT 配置
  • Spring Boot 集成 MQTT
  • 消息推送标准协议:MQTT

这篇关于Spring Integration 是什么?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/844535

相关文章

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.