本文主要是介绍持久化SSE对象,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
SpringBoot整合SSE,实现后端主动推送DEMO
前些日子写了整合SSE得demo。但是SSE对象是存储在ConcurrentHashMap<String, SseEmitter>中。在正式环境明显就不行了,服务重启一下的话都没有了。
那么要持久化,第一选择放redis
1、写了一个redis操作组件
SseEmitterStore
/*** 不考虑redis 连接异常问题* @author cmy* @date 2024/8/21 10:55*/
@Component
public class SseEmitterStore {private ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();@Resourceprivate RedisTemplate<String, Object> redisTemplate;public void addEmitter(String key, SseEmitter emitter) {emitters.put(key, emitter);redisTemplate.opsForHash().put("sse-emitters", key, emitter);}public void removeEmitter(String key) {emitters.remove(key);redisTemplate.opsForHash().delete("sse-emitters", key);}@PostConstructprivate void init() {Map<Object, Object> temp = redisTemplate.opsForHash().entries("sse-emitters");temp.forEach((key, value) -> {if (value instanceof SseEmitter) {emitters.put(key.toString(), (SseEmitter) value);}});}public ConcurrentHashMap<String, SseEmitter> getEmitters() {return emitters;}
}
Controller修改
public class SseController {@ResourceSseEmitterStore sseEmitterStore;@GetMapping("/subscribe/{id}")@CrossOrigin(origins = "*")public SseEmitter subscribe(@PathVariable String id) {SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);sseEmitterStore.addEmitter(id,emitter);emitter.onCompletion(() -> sseEmitterStore.removeEmitter(id));emitter.onError(e -> sseEmitterStore.removeEmitter(id));return emitter;}@GetMapping("/unbind/{id}")@CrossOrigin(origins = "*")public ServerResponse deleteItem(@PathVariable String id) {this.sseEmitterStore.removeEmitter(id);return ServerResponse.success(true);}
}
异步发送消息service
@Asyncpublic void broadcastMessage(String message) {List<String> keysToDelete = new ArrayList<>();this.sseEmitterStore.getEmitters().forEach((k, v) -> {try {v.send(message);} catch (Throwable t) {keysToDelete.add(k);}});keysToDelete.forEach(this.sseEmitterStore::removeEmitter);}
2、无法序列化的问题
跑起来之后,结果报错
DefaultSerializer requires a Serializable payload but received an object of type [org.springframework.web.servlet.mvc.method.annotation.SseEmitter]
错误信息已经很明显了
因为 SseEmitter
并不是一个实现了 Serializable
接口的类,因此不能被默认的序列化器正确处理。
问了AI
3、解决无法序列化问题
3.1自定义redis自定义序列化器
public class CustomJackson2JsonRedisSerializer<T> implements RedisSerializer<T> {private static final long serialVersionUID = -7649863253433761554L;private final ObjectMapper objectMapper;public CustomJackson2JsonRedisSerializer() {this.objectMapper = new ObjectMapper();this.objectMapper.registerModule(new JavaTimeModule());this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);}@Overridepublic byte[] serialize(T t) throws SerializationException {if (t == null) {return new byte[0];}try {return objectMapper.writeValueAsBytes(t);} catch (JsonProcessingException e) {throw new SerializationException("Could not write JSON: " + e.getMessage(), e);}}@Overridepublic T deserialize(byte[] bytes) throws SerializationException {if (bytes == null || bytes.length == 0) {return null;}try {return (T) objectMapper.readValue(bytes, SseEmitter.class);} catch (IOException e) {throw new SerializationException("Could not read JSON: " + e.getMessage(), e);}}
}
3.2redis配置,使序列化器生效
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);StringRedisSerializer stringSerializer = new StringRedisSerializer();CustomJackson2JsonRedisSerializer<Object> jacksonSerializer = new CustomJackson2JsonRedisSerializer<>();// 根据实际情况,自行修改template.setKeySerializer(stringSerializer);template.setValueSerializer(jacksonSerializer);template.setHashKeySerializer(stringSerializer);template.setHashValueSerializer(jacksonSerializer);template.afterPropertiesSet();return template;}
}
再次启动服务,即生效。
这篇关于持久化SSE对象的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!