持久化SSE对象

2024-08-24 09:36
文章标签 对象 持久 sse

本文主要是介绍持久化SSE对象,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

SpringBoot整合SSE,实现后端主动推送DEMO

前些日子写了整合SSE得demo。但是SSE对象是存储在ConcurrentHashMap<String, SseEmitter>中。在正式环境明显就不行了,服务重启一下的话都没有了。

那么要持久化,第一选择放redis

1、写了一个redis操作组件

SseEmitterStore

/*** 不考虑redis 连接异常问题* @author cmy* @date 2024/8/21 10:55*/
@Component
public class SseEmitterStore {private ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();@Resourceprivate RedisTemplate<String, Object> redisTemplate;public void addEmitter(String key, SseEmitter emitter) {emitters.put(key, emitter);redisTemplate.opsForHash().put("sse-emitters", key, emitter);}public void removeEmitter(String key) {emitters.remove(key);redisTemplate.opsForHash().delete("sse-emitters", key);}@PostConstructprivate void init() {Map<Object, Object> temp = redisTemplate.opsForHash().entries("sse-emitters");temp.forEach((key, value) -> {if (value instanceof SseEmitter) {emitters.put(key.toString(), (SseEmitter) value);}});}public ConcurrentHashMap<String, SseEmitter> getEmitters() {return emitters;}
}

Controller修改

public class SseController {@ResourceSseEmitterStore sseEmitterStore;@GetMapping("/subscribe/{id}")@CrossOrigin(origins = "*")public SseEmitter subscribe(@PathVariable String id) {SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);sseEmitterStore.addEmitter(id,emitter);emitter.onCompletion(() -> sseEmitterStore.removeEmitter(id));emitter.onError(e -> sseEmitterStore.removeEmitter(id));return emitter;}@GetMapping("/unbind/{id}")@CrossOrigin(origins = "*")public ServerResponse deleteItem(@PathVariable String id) {this.sseEmitterStore.removeEmitter(id);return ServerResponse.success(true);}
}

异步发送消息service

    @Asyncpublic void broadcastMessage(String message) {List<String> keysToDelete = new ArrayList<>();this.sseEmitterStore.getEmitters().forEach((k, v) -> {try {v.send(message);} catch (Throwable t) {keysToDelete.add(k);}});keysToDelete.forEach(this.sseEmitterStore::removeEmitter);}

2、无法序列化的问题

跑起来之后,结果报错

DefaultSerializer requires a Serializable payload but received an object of type [org.springframework.web.servlet.mvc.method.annotation.SseEmitter]

错误信息已经很明显了

因为 SseEmitter 并不是一个实现了 Serializable 接口的类,因此不能被默认的序列化器正确处理。

问了AI

        

3、解决无法序列化问题

3.1自定义redis自定义序列化器

public class CustomJackson2JsonRedisSerializer<T> implements RedisSerializer<T> {private static final long serialVersionUID = -7649863253433761554L;private final ObjectMapper objectMapper;public CustomJackson2JsonRedisSerializer() {this.objectMapper = new ObjectMapper();this.objectMapper.registerModule(new JavaTimeModule());this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);}@Overridepublic byte[] serialize(T t) throws SerializationException {if (t == null) {return new byte[0];}try {return objectMapper.writeValueAsBytes(t);} catch (JsonProcessingException e) {throw new SerializationException("Could not write JSON: " + e.getMessage(), e);}}@Overridepublic T deserialize(byte[] bytes) throws SerializationException {if (bytes == null || bytes.length == 0) {return null;}try {return (T) objectMapper.readValue(bytes, SseEmitter.class);} catch (IOException e) {throw new SerializationException("Could not read JSON: " + e.getMessage(), e);}}
}

3.2redis配置,使序列化器生效

@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);StringRedisSerializer stringSerializer = new StringRedisSerializer();CustomJackson2JsonRedisSerializer<Object> jacksonSerializer = new CustomJackson2JsonRedisSerializer<>();// 根据实际情况,自行修改template.setKeySerializer(stringSerializer);template.setValueSerializer(jacksonSerializer);template.setHashKeySerializer(stringSerializer);template.setHashValueSerializer(jacksonSerializer);template.afterPropertiesSet();return template;}
}

再次启动服务,即生效。

这篇关于持久化SSE对象的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1102107

相关文章

Spring Boot 整合 SSE的高级实践(Server-Sent Events)

《SpringBoot整合SSE的高级实践(Server-SentEvents)》SSE(Server-SentEvents)是一种基于HTTP协议的单向通信机制,允许服务器向浏览器持续发送实... 目录1、简述2、Spring Boot 中的SSE实现2.1 添加依赖2.2 实现后端接口2.3 配置超时时

springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法

《springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法》:本文主要介绍springboot整合阿里云百炼DeepSeek实现sse流式打印,本文给大家介绍的非常详细,对大... 目录1.开通阿里云百炼,获取到key2.新建SpringBoot项目3.工具类4.启动类5.测试类6.测

Python中判断对象是否为空的方法

《Python中判断对象是否为空的方法》在Python开发中,判断对象是否为“空”是高频操作,但看似简单的需求却暗藏玄机,从None到空容器,从零值到自定义对象的“假值”状态,不同场景下的“空”需要精... 目录一、python中的“空”值体系二、精准判定方法对比三、常见误区解析四、进阶处理技巧五、性能优化

Spring Boot 3.4.3 基于 Spring WebFlux 实现 SSE 功能(代码示例)

《SpringBoot3.4.3基于SpringWebFlux实现SSE功能(代码示例)》SpringBoot3.4.3结合SpringWebFlux实现SSE功能,为实时数据推送提供... 目录1. SSE 简介1.1 什么是 SSE?1.2 SSE 的优点1.3 适用场景2. Spring WebFlu

在java中如何将inputStream对象转换为File对象(不生成本地文件)

《在java中如何将inputStream对象转换为File对象(不生成本地文件)》:本文主要介绍在java中如何将inputStream对象转换为File对象(不生成本地文件),具有很好的参考价... 目录需求说明问题解决总结需求说明在后端中通过POI生成Excel文件流,将输出流(outputStre

SpringCloud之consul服务注册与发现、配置管理、配置持久化方式

《SpringCloud之consul服务注册与发现、配置管理、配置持久化方式》:本文主要介绍SpringCloud之consul服务注册与发现、配置管理、配置持久化方式,具有很好的参考价值,希望... 目录前言一、consul是什么?二、安装运行consul三、使用1、服务发现2、配置管理四、数据持久化总

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

Java实现将byte[]转换为File对象

《Java实现将byte[]转换为File对象》这篇文章将通过一个简单的例子为大家演示Java如何实现byte[]转换为File对象,并将其上传到外部服务器,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言1. 问题背景2. 环境准备3. 实现步骤3.1 从 URL 获取图片字节数据3.2 将字节数组

Javascript访问Promise对象返回值的操作方法

《Javascript访问Promise对象返回值的操作方法》这篇文章介绍了如何在JavaScript中使用Promise对象来处理异步操作,通过使用fetch()方法和Promise对象,我们可以从... 目录在Javascript中,什么是Promise1- then() 链式操作2- 在之后的代码中使

MyBatis的配置对象Configuration作用及说明

《MyBatis的配置对象Configuration作用及说明》MyBatis的Configuration对象是MyBatis的核心配置对象,它包含了MyBatis运行时所需的几乎所有配置信息,这个对... 目录MyBATis配置对象Configuration作用Configuration 对象的主要作用C