SpringBoot ApplicationListener实现发布订阅模式

本文主要是介绍SpringBoot ApplicationListener实现发布订阅模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、Spring对JDK的扩展
  • 二、快速实现发布订阅模式


前言

发布订阅模式(Publish-Subscribe Pattern)通常又称观察者模式,它被广泛应用于事件驱动架构中。即一个事件的发布,该行为会通过同步或者异步的方式告知给订阅该事件的订阅者。JDK中提供了EventListener作为所有订阅者的接口规范(即所有的订阅者都应该实现该接口),而EventObject则作为所有事件发布者的实现规范(即所有事件发布者都应该继承该类)。对于观察者的原理不是本章讨论的重点,本章只是演示如何在SpringBoot中实现发布订阅模式。


一、Spring对JDK的扩展

Spring中,提供了接口ApplicationListener作为Spring观察者(也叫监听者)的实现规范,ApplicationListener其实是对JDKEventListener中的扩展,增加了onApplicationEvent方法作为触发监听的方法。而事件发布对象ApplicationEvent也是继承了JDK中的EventObject类,仅仅增加了参数timestamp用于记录事件创建的时间。也就是说如果要使用Spring提供的发布订阅模式,您的监听器应该实现ApplicationListener接口,通过onApplicationEvent方法获取监听的内容。事件则必须继承ApplicationEvent

ApplicationListener源码:

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {/*** Handle an application event.* @param event the event to respond to*/void onApplicationEvent(E event);}

ApplicationEvent源码:

public abstract class ApplicationEvent extends EventObject {/** use serialVersionUID from Spring 1.2 for interoperability. */private static final long serialVersionUID = 7099057708183571937L;/** System time when the event happened. */private final long timestamp;/*** Create a new {@code ApplicationEvent}.* @param source the object on which the event initially occurred or with* which the event is associated (never {@code null})*/public ApplicationEvent(Object source) {super(source);this.timestamp = System.currentTimeMillis();}/*** Return the system time in milliseconds when the event occurred.*/public final long getTimestamp() {return this.timestamp;}}

二、快速实现发布订阅模式

配置线程池是必要的,因为发布订阅模式的一个好处就是可以实现解耦,而解耦最好的方式就是采用异步线程处理。如果我们不配置线程池,则在spring中默认会采用同步的方式进行消息发布和订阅消费。这样一来就没有任何意义了。首先在yaml或者properties中配置线程池信息:

thread:executor:corePoolSize: 8 #核心线程keepAliveSeconds: 30000 # 活跃时间maxPoolSize: 16 #最大线程数queueCapacity: 100000 #最大队列长度

然后通过配置文件读取配置信息,创建线程池并注入IOC容器

package com.hl.by.common.thread;import com.hl.by.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ErrorHandler;import java.util.concurrent.ThreadPoolExecutor;/*** @Author: DI.YIN* @Date: 2024/3/6 9:39* @Version: 1.0.0* @Description: 线程池配置**/
@Configuration
public class ThreadPoolTaskExecutorConfig {//核心线程@Value("${thread.executor.corePoolSize}")private Integer corePoolSize;//存活时间@Value("${thread.executor.keepAliveSeconds}")private Integer keepAliveSeconds;//最大线程数@Value("${thread.executor.maxPoolSize}")private Integer maxPoolSize;//最大队列长度@Value("${thread.executor.queueCapacity}")private Integer queueCapacity;@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);threadPoolTaskExecutor.setCorePoolSize(corePoolSize);threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);threadPoolTaskExecutor.setQueueCapacity(queueCapacity);//设置拒绝策略,直接运行,不采用异步threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());threadPoolTaskExecutor.setThreadNamePrefix("Thread-Pool-Task-");return threadPoolTaskExecutor;}@DependsOn(value = "taskExecutor")@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)public SimpleApplicationEventMulticaster eventMulticaster() {SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());//设置错误处理器simpleApplicationEventMulticaster.setErrorHandler(new ErrorHandler() {@Overridepublic void handleError(Throwable throwable) {System.out.println("抛出异常:" + JsonUtils.writeObjectAsBeautifulJson(throwable));}});return simpleApplicationEventMulticaster;}
}

可以看到除了注入线程池之外,还注入了自定义的SimpleApplicationEventMulticaster 对象并将创建的线程池设置到SimpleApplicationEventMulticaster中。因为SimpleApplicationEventMulticaster是处理发布订阅的核心类,通过multicastEvent方法进行事件发布。可以看到multicastEvent中,循环遍历订阅该事件的所有监听器,并判断是否配置了线程池Executor,如果配置了则将发布操作扔入线程池中异步处理,否则将同步处理发布事件操作。很多情况发现我们的事件发布与监听处理是在一个线程中执行,就是因为我们未设置线程池,导致发布订阅无法异步实现。

	@Overridepublic void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));//获取线程池Executor executor = getTaskExecutor();for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {if (executor != null) {//如果配置了线程池,则放入线程池中异步处理executor.execute(() -> invokeListener(listener, event));}else {//未配置线程池,则同步处理invokeListener(listener, event);}}}

完成以上配置后,就可以定义发布者、订阅者和发布事件了。现在我们定义一个类MessageSource作为发布者发布的事件,结构如下:

import lombok.Data;/*** @Author: DI.YIN* @Date: 2024/3/6 13:41* @Version:* @Description: 消息实体**/
@Data
public class MessageSource {private String id;private String msg;private String title;
}

定义好发布事件后,我们定义一个事件发布者MessageEvent,并指定其发布的事件类型是MessageSourceMessageSource 的子类,结构如下:

import org.springframework.context.ApplicationEvent;/*** @Author: DI.YIN* @Date: 2024/3/6 13:39* @Version: 1.0.0* @Description: 消息事件**/
public class MessageEvent<T extends MessageSource> extends ApplicationEvent {/*** Create a new {@code ApplicationEvent}.** @param source the object on which the event initially occurred or with*               which the event is associated (never {@code null})*/public MessageEvent(MessageSource source) {super(source);}
}

现在已经定义好了发布事件MessageSource,事件发布者MessageEvent,此时我们可以定义一个事件订阅者MessageListener,用于监听事件发布者MessageEvent发布的事件。代码如下:

import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;/*** @Author: DI.YIN* @Date: 2024/3/6 10:19* @Version:* @Description:**/
@Component
public class MessageListener implements ApplicationListener<MessageEvent> {@Overridepublic void onApplicationEvent(MessageEvent event) {MessageSource source = (MessageSource)event.getSource();System.out.println("消息监听器监听到消息:===>"+ JSONObject.toJSONString(source));}
}

现在我们就实现了一个订阅发布模式,事件对象MessageSource,事件发布者MessageEvent专门用于发布MessageSource类型的事件,事件监听者MessageListener 则专门监听MessageEvent发布的事件。可以创建一个接口用于测试发布订阅是否成功。


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;/*** @Author: Greyfus* @Create: 2024-03-02 14:08* @Version:* @Description:*/
@RestController
@RequestMapping("/mock")
public class TestController {@Autowiredprivate ApplicationContext applicationContext;@RequestMapping(method = RequestMethod.POST, value = "/publishMessage", consumes = MediaType.APPLICATION_JSON_VALUE)public void publishMessage() throws Exception {//构建信息实体MessageSource messageSource = new MessageSource();messageSource.setId(String.valueOf(1));messageSource.setTitle("日志消息");messageSource.setMsg("调用了接口publishMessage");//构建消息事件MessageEvent<MessageSource> messageEvent = new MessageEvent(messageSource);//发布事件applicationContext.publishEvent(messageEvent);}
}

通过用postman调用接口/mock/feign可以看到MessageListener 成功接受到了MessageEvent发布的MessageSource事件。
在这里插入图片描述

这篇关于SpringBoot ApplicationListener实现发布订阅模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/824496

相关文章

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

Python脚本实现自动删除C盘临时文件夹

《Python脚本实现自动删除C盘临时文件夹》在日常使用电脑的过程中,临时文件夹往往会积累大量的无用数据,占用宝贵的磁盘空间,下面我们就来看看Python如何通过脚本实现自动删除C盘临时文件夹吧... 目录一、准备工作二、python脚本编写三、脚本解析四、运行脚本五、案例演示六、注意事项七、总结在日常使用

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服