Spring Boot Messaging Chapter 6 Messaging with Redis

2024-03-02 23:08

本文主要是介绍Spring Boot Messaging Chapter 6 Messaging with Redis,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

摘要:本章向您展示了如何使用Redis集成Spring Boot作为消息中间件。Redis是一个内存数据库,它被用作数据库、缓存和消息中间件。它不仅可以存储键值对,还可以用来存储复杂的数据类型,如散列、列表、集合、排序集、位图、超日志和地理空间索引。

Spring Boot使用Spring Data模块,特别是Redis one。换句话说,为了在您的项目中使用Redis,您必须将spring-boot-starer-redis依赖于您的pom.xml文件或Gradle。然后,您将拥有连接到Redis服务器的所有必要依赖项。

一:Redis as a Message Broker(Redis作为一个消息中间件)

Redis不仅提供了存储数据结构的方法,而且还实现了发布/订阅消息传递范型。前面的章节用JMS解释了这种范例。

        这里的重要部分向您展示了如何与Redis交互并启用消息中间件。Redis拥有一个通道的标识,其中一个消息将由生成者发送。它将被那些对一个或多个频道感兴趣的订阅者使用。正如您所看到的,通道关键字在Redis中使用。Redis中的通道是JMS世界中的主题。

Redis有几个命令允许你与发布/订阅功能进行交互:

• SUBSCRIBE: 告诉Redis订阅一个特定的频道或频道。例如:

127.0.0.1:6379> SUBSCRIBE spring-boot-chat

您可以一次订阅多个通道,将它们按空格分隔开。

• UNSUBSCRIBE: 取消订阅的频道。这个命令不需要参数。

• PUBLISH: 通过指定通道(作为第一个参数)和实际消息来发布一条消息。例如:

127.0.0.1:6379> PUBLISH spring-boot-chat "Hi there"

• PSUBSCRIBE: 这个命令与SUBSCRIBE相同,但接受多个通道的模式。例如:

127.0.0.1:6379> PSUBSCRIBE currency.*

这个例子将订阅任何以货币开头的频道,比如currency.us,currency.asia.jp,currency.eu.gb,等等。

•PUNSUBSCRIBE: 取消订阅使用模式匹配。例如:

127.0.0.1:6379> PUNSUBSCRIBE currency.asia.*

•PING: 如果没有提供任何参数,则返回一个PONG。通常情况下,您可以使用这个来测试连接是否仍然存在。


为了试一试,确保你已经安装了Redis,它已经启动并运行了。(您可以从https://redis.io/download下载它。)打开一个新的终端窗口,并使用redis-cli命令与Redis进行交互。如图6 - 1所示。

Figure 6-1. Redis interaction with the publish/subscribe commands

图6-1向您展示了一种与Redis交互的简单方法,并确定订阅和发布一条消息是多么容易。

本章不讨论Redis的集群,也不讨论哨兵,分片,等等。只讨论消息传递。许多客户使用Redis作为消息传递代理,并作为实时数据分析的web会话管理工具。本章的开始是使用Spring Boot和Redis来进行消息传递。

二:Publish/Subscribe Messaging with Redis(使用Redis发布/订阅消息)

正如前面提到的,Spring Boot将使用Spring Data Redis的力量,它与您所熟悉的Spring JMS非常相似。Spring Boot将配置必要的组件,例如连接、使用的数据库(默认情况下是0索引DB)、集群节点、池、哨兵、超时等等。记住简单地添加 spring-boot-starter-redis可以启用Redis自动配置。

Spring Data Redis模块有两个主要的领域,用于消息的生产或发布,以及消息的消费或订阅。对于消息发布,它使用RedisTemplate<K,V>类(它使用模板设计模式),对于消息订阅,它有一个专用的异步消息侦听器容器(一个MDP消息驱动pojo)。对于同步消息,它使用一个RedisConnection接口约定。下面的部分只讨论异步订阅。

在这一章中,我们将使用两个项目:redis-demo和rest-api-redis。redis-demo项目拥有完成和补充货币项目的所有必要代码。

2.1.Subscriber(订阅者)

作为Redis订阅者,您可以通过使用固定名称或使用模式匹配来订阅一个或多个通道(或主题)。Spring Data Redis模块提供了一种进行低级操作的方法通过RedisConnection订阅。这包括subscribe和pSubscribe方法。

低级订阅需要一种方法来处理简单侦听器的连接和线程管理。现在想象有多个侦听器。您可能认为实现此功能将是一件麻烦的事情,但是Spring Data Redis包括RedisMessageListenerContainer类,它负责所有的繁重工作,并支持消息驱动pojo(MDPs)。换句话说,你可以创建自己的类和方法来接收消息并处理它。这意味着您需要使用MessageListenerAdapter类来使用这个特性。不要太担心,这是我接下来要给你们看的。

打开redis-demo项目,并检查com.micai.spring.messaging.config.RedisConfig类,如清单6-1所示。

Listing 6-1. com.micai.spring.messaging.config.RedisConfig.java

@Configuration
@EnableConfigurationProperties(SimpleRedisProperties.class)
public class RedisConfig {// Simple Message Listener@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter,@Value("${micai.redis.topic}") String topic) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, new PatternTopic(topic));return container;}@BeanMessageListenerAdapter listenerAdapter(Subscriber subscriber) {return new MessageListenerAdapter(subscriber);}
}

清单6-1向您展示了我们将要使用的配置,以便订阅一个频道。让我们更详细地了解代码:

•RedisMessageListenerContainer: 这是一个完成所有繁重工作的类,并充当一个消息侦听器容器,它将接收来自Redis通道(topic)的消息。您需要设置连接工厂和将要处理接收到的消息的消息侦听器。请记住,这个侦听器容器负责所有线程和消息分派。

•RedisConnectionFactory: 这个接口对于RedisMessageListenerContainer是必需的,它包含关于Redis连接的所有信息。因为它是这个方法的一部分,Spring会自动将它连接起来,所以您不需要手工创建它。在幕后,Spring Boot会为您处理这种配置。

•MessageListenerAdapter: 该类是一个适配器,它将收到的消息委托给一个已声明的类,该类是通过MessageListener签名进行投诉的。您可以看到,这是通过调用容器来设置的。addMessageListener方法并作为参数传递订阅者(listenerAdapter-MessageListenerAdapter)和它将订阅的主题(PatternTopic)。

•PatternTopic: 这是一个类,也是消息侦听器需要的参数之一。它通常保存主题的名称或用于订阅正确频道的名称模式。

接下来,让我们看一下订阅者类。打开com.micai.spring.messaging.redis.Subscriber类,如清单6-2所示。

 

Listing 6-2. com.micai.spring.messaging.redis.Subscriber.java

@Component
public class Subscriber {private static final Logger LOGGER = LoggerFactory.getLogger(Subscriber.class);// If only one method defined, it must be named: handleMessagepublic void handleMessage(String message) {// Process message here ...LOGGER.info("消费Redis的消息内容为:{}", message);}
}

 清单6-2显示了只有一个必需方法(handleMessage)的订阅者类。该订阅者类是消息委托的适配器。换句话说,MessageListenerAdapter符合以下签名:

void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);//You can get the channel or the pattern used
void handleMessage(Serializable message, String channel);
void handleMessage(byte[] bytes, String pattern);//You can have your own object
void handleMessage(MyOwnDomainObject obj);

有时,您的适配器类会有更多的方法进行额外的处理,或者从外部调用。在这些情况下,您可以让MessageListenerAdapter类通过在其构造函数中添加额外的参数来知道您想要使用什么方法。例如:

@Component
public class Subscriber {
public void shipping(Order order){
// Process order here ...
}
//This method is used as listener for Redis topics
public void processTicket(String message){
// Process message here ...
}
// ... more methods
}
// RedisConfig.java
@Bean
MessageListenerAdapter listenerAdapter(Subscriber subscriber) {
return new MessageListenerAdapter(subscriber,"processTicket");
}

 添加您将用来处理从Redis主题到构造器的传入消息的方法的名称;在这个例子中,这是processTicket方法。

正如您所看到的,使用MessageListenerAdapter的好处是您在POJO类中没有依赖关系,使您的应用程序更具可扩展性。

2.2.Publisher(发布者)

本节将介绍向通道(主题)发布消息。要在Redis中发布消息,您有两个选择。您可以使用 low-level RedisConnection类或高级RedisTemplate类(记住,它非常类似于JmsTemplate和RabbitTemplate)。两个接口都提供发布方法连接。发布(msg,通道)。您还必须确定通道(主题)。

使用RedisTemplate的好处是,您有一种方法来定义序列化/反序列化策略。这种方法隐藏了调用原始方法的复杂性,并且是线程安全的。

让我们打开代码com.micai.spring.messaging.RedisDemoApplication类如清单6-3所示。

 

Listing 6-3. com.micai.spring.messaging.RedisDemoApplication.java

@SpringBootApplication
public class RedisDemoApplication {private static final Logger LOGGER = LoggerFactory.getLogger(RedisDemoApplication.class);public static void main(String [] args) {SpringApplication.run(RedisDemoApplication.class, args);}@BeanCommandLineRunner sendMessage(StringRedisTemplate template, @Value("${micai.redis.topic}")String topic){return args -> {String message = "Hello Redis with Spring Boot!";template.convertAndSend(topic, message);LOGGER.info("发布Redis的消息内容为: {}", message);};}
}

清单6-3显示了主应用程序类。正如您所知道的,一旦Spring Boot最终确定了自动配置,它将执行sendMessage方法。该方法将自动在应用程序中自动连接StringRedisTemplate和主题。在application.properties文件中micai.redis.topic=spring-boot-chat的key。然后,它将使用模板通过模板来发送一条消息。接受主题和消息作为其参数的convertAndSend方法。

您通常需要使用RedisTemplate,但是在本例中,我们使用的是StringRedisTemplate。我们这样做是因为RedisTemplate被定义为RedisTemplate<K,V>,当key是Redis的key时(通常是一个字符串),而V是Redis值类型(它将是消息)。然后,StringRedisTemplate是一个使用字符串的子类。换句话说,它就像创建一个RedisTemplate<String,String>对象。

这里的有趣之处在于,StringRedisTemplate为不同的操作定义了多个字符串序列化器,它们适用于不同的数据结构,比如Set和Hash键/值。

现在,如果您运行这个项目(请记住让redis-server启动并运行),您将看到如图6-2、6-3和6-4所示的订阅者日志。

Figure 6-2. Project logs

图6-2显示了日志。具有Around AOP建议的RedisAudit类会生成这些日志。正如您所看到的,它使用订阅者类(侦听器适配器)和接收字符串消息的handleMessage方法。

图6-3显示了带有Redis客户端的终端。监视器在执行代码之前显示,只是为了确定Redis是否接受这些消息。正如您所看到的,Redis展示了在redis-server中执行的命令,PSUBSCRIBE和PUBLISH,当然还有一个PING命令来检查客户端和服务器之间的连接。

Figure 6-3. The redis-cli monitor command

图6-4显示了另一个终端窗口,在那里我们订阅了spring-boot-chat channel/topic。运行这个项目之后,它将打印消息。这是另一种确保您的redis-server正在运行的方法,并且您可以有多个订阅者到一个通道/主题。

Figure 6-4. redis-cli subscribe

2.3.JSON Serialization(JSON序列化)

 现在,回到序列化/反序列化,回想一下我们正在使用JSON格式。您需要做些什么才能使发布/订阅使用JSON和序列化/反序列化为自定义对象?

如果您遵循前面的模块(JMS和RabbitMQ)的相同想法,那么回答这个问题就更容易了,因为您可以在这里应用相同的概念。

让我们先修改RedisConfig类,如清单6-4所示。

 

Listing 6-4. com.micai.spring.messaging.config.RedisConfig.java

@Configuration
@EnableConfigurationProperties(SimpleRedisProperties.class)
public class RedisConfig {// Simple Message Listener@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter,@Value("${micai.redis.topic}") String topic) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, new PatternTopic(topic));return container;}@BeanMessageListenerAdapter rateListenerAdapter(RateSubscriber subscriber) {MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(subscriber);messageListenerAdapter.setSerializer(new Jackson2JsonRedisSerializer<>(Rate.class));return messageListenerAdapter;}@BeanRedisTemplate<String, Rate> redisTemplate(RedisConnectionFactory connectionFactory){RedisTemplate<String,Rate> redisTemplate = new RedisTemplate<String,Rate>();redisTemplate.setConnectionFactory(connectionFactory);redisTemplate.setDefaultSerializer(new Jackson2JsonRedisSerializer<>(Rate.class));redisTemplate.afterPropertiesSet();return redisTemplate;}
}

清单6-4展示了修改后的RedisConfig类。与之前的版本相比,有什么区别呢?RedisMessageListenerContainer bean是一样的,只不过现在我们也在使用rateListenerAdapter bean。让我们来回顾一下这个清单:

• MessageListenerAdapter: 这和以前是一样的,但是这里我们设置了一个新的类适配器,在这个例子中是RateSubscriber。需要注意的是,我们通过调用setSerializer方法来设置序列化器,并且我们正在使用Rate类作为对象映射器来实例化一个Jackson2JsonRedisSerializer对象。

• RedisTemplate<String,Rate>:正如您所看到的,我们定义这个bean来返回一个RedisTemplate,其中键是一个字符串,值是利率类。还要注意的是,我们通过调用setDefaultSerializer方法来设置一个序列化器,我们使用的是与以前相同的类,Jackson2JsonRedisSerializer。

查看RateSubscriber类,如清单6-5所示。

 

Listing 6-5. com.micai.spring.messaging.redis.RateSubscriber.java

/*** @Auther: zhaoxinguo* @Date: 2018/8/9 19:27* @Description:*/
@Component
public class RateSubscriber {private static final Logger LOGGER = LoggerFactory.getLogger(RateSubscriber.class);// If only one method defined, it must be named: handleMessagepublic void handleMessage(Rate rate){// Process message here ...LOGGER.info("消费Redis的消息内容为Rate:{}", rate);}
}

清单6-5显示了RateSubscriber类。与前面的例子没有任何改变。handleMessage方法将接收一条利率消息。现在,让我们看一下发布者,如清单6-6所示。

 

Listing 6-6. com.micai.spring.messaging.RedisDemoApplication.java

@SpringBootApplication
public class RedisDemoApplication {private static final Logger LOGGER = LoggerFactory.getLogger(RedisDemoApplication.class);public static void main(String [] args) {SpringApplication.run(RedisDemoApplication.class, args);}@BeanCommandLineRunner sendMessage(RedisTemplate<String, Rate> template,@Value("${micai.redis.topic}")String topic){return args -> {Rate rate = new Rate("MX", 21.17F, new Date());template.convertAndSend(topic, rate);LOGGER.info("发布Redis的消息内容为Rate: {}", rate);};}
}

 清单6-6显示了主应用程序。将这个类与前一个版本进行比较;改变了什么?我们使用RedisTemplate类并使用利率作为值。我们还通过创建一个新利率来使用Rate类作为消息。

现在您可以运行您的项目并查看日志了。参见图6-5、6-6和6-7。

图6-5显示了RateSubscriber正在处理消息的日志。请记住,在幕后,为了得到对象,序列化/反序列化正在发生。

Figure 6-5. RateSubscriber logs

图6-6显示了带有Redis客户端监视器的终端。我们的想法是看到发布的方法是JSON格式的字符串,这意味着Jackson2JsonRedisSerializer执行了速率类的序列化。

Figure 6-6. redis-cli monitor

图6-7向您展示了一个终端,它的订阅者利率通道/主题。请注意,此订阅者以JSON字符串格式接收利率消息。

Figure 6-7. redis-cli subscriber

如果您将这些结果与之前的Spring模块(JMS和AMQP)进行比较,您将看到我们正在做同样的事情。尽管Spring Data Redis模块没有注解来简化发布/订阅模式,但是很容易就能很快地启动并运行它。

三:The Currency Project(货币项目)

您现在拥有了完成货币项目所需的所有信息。看一下RateRedisSubscriber、RateRedisConfig和RateRedisProperties类开始。您可以重用演示项目来将消息发布到channel/topic。

四:总结

本章讨论了发布/订阅消息模式,并指出Redis提供了开箱即用的功能。它很容易使用。本章向您展示了Spring Boot如何帮助您轻松地配置您的发布者和订阅者,只需添加spring-boot-starer-redis。

您看到了如何发布和监听传入的消息,您看到Spring Data Redis模块使用RedisTemplate(与Spring JMS和Spring AMQP模块相同的行为)使用与生产者和订阅者类似的方式。

尽管这一章很短,但它为您提供了一个使用Redis作为内存消息中间件的起点。

下一章将介绍WebSockets,这是另一种使用Spring Boot进行消息传递的方式。

五:源代码

https://gitee.com/micai/micai-spring-message/tree/master/redis-demo

这篇关于Spring Boot Messaging Chapter 6 Messaging with Redis的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/767713

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2