Springboot集成SSE消息推送

2024-06-11 20:52

本文主要是介绍Springboot集成SSE消息推送,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

SSE介绍
        SSE(Server-Sent Events)的全称是服务器推送事件,它是一种基于 HTTP 协议的实时通信技术,用于在客户端和服务器之间建立持久、单向的链接,允许服务器向客户端发送异步消息。

        了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别:SSE 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道,客户端和服务端可以相互发消息

SSE
        1. 单向通信:SSE是一种服务器推送技术,服务器可以向客户端发送消息,但客户端无法主动发送消息到服务器。
        2. 持久连接:SSE在单个HTTP连接上建立持久连接,服务器可以多次发送事件到客户端,客户端只需保持连接不关闭。

        3 . 实时性:适用于需要从服务器获取实时更新的场景,如即时通知、实时数据更新

Websocket
        1. 双向通信:WebSocket提供了全双工通信,客户端和服务器可以双向发送消息,不需要等待请求-响应。
        2. 持久连接:WebSocket在单个TCP连接上实现持久连接,适用于双向通信的场景。
        3. 实时性:非常适合实时性要求高的应用,如在线游戏、实时聊天等。

使用方法:

Maven依赖

springboot中封装了sse代码,不需要额外的依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.21</version>
</dependency>

SseEmitterUtil工具类

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;/*** SSE长链接工具类*/
@Slf4j
public class SseEmitterUtil {/*** 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面*/private final static Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();public static SseEmitter connect(Long userId) {// 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 注册回调sseEmitter.onCompletion(completionCallBack(userId));sseEmitter.onError(errorCallBack(userId));sseEmitter.onTimeout(timeoutCallBack(userId));sseEmitterMap.put(userId, sseEmitter);log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());return sseEmitter;}/*** 给制定用户发送消息** @param userId 指定用户名* @param sseMessage 消息体*/public static void sendMessage(Long userId, String sseMessage) {if (sseEmitterMap.containsKey(userId)) {try {sseEmitterMap.get(userId).send(sseMessage);log.info("用户 {} 推送消息 {}", userId, sseMessage);} catch (IOException e) {log.error("用户 {} 推送消息异常", userId, e);removeUser(userId);}} else {log.error("消息推送 用户 {} 不存在,链接总数 {}", userId, sseEmitterMap.size());}}/*** 群发消息*/public static void batchSendMessage(String message, List<Long> ids) {ids.forEach(userId -> sendMessage(userId, message));}/*** 群发所有人*/public static void batchSendMessage(String message) {sseEmitterMap.forEach((k, v) -> {try {v.send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("用户 {} 推送异常", k, e);removeUser(k);}});}/*** 移除用户连接** @param userId 用户 ID*/public static void removeUser(Long userId) {if (sseEmitterMap.containsKey(userId)) {sseEmitterMap.get(userId).complete();sseEmitterMap.remove(userId);log.info("移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());} else {log.error("消息推送 用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());}}/*** 获取当前连接信息** @return 所有的连接用户*/public static List<Long> getIds() {return new ArrayList<>(sseEmitterMap.keySet());}/*** 获取当前的连接数量** @return 当前的连接数量*/public static int getUserCount() {return sseEmitterMap.size();}private static Runnable completionCallBack(Long userId) {return () -> {log.info("用户 {} 结束连接", userId);};}private static Runnable timeoutCallBack(Long userId) {return () -> {log.error("用户 {} 连接超时", userId);removeUser(userId);};}private static Consumer<Throwable> errorCallBack(Long userId) {return throwable -> {log.error("用户 {} 连接异常", userId);removeUser(userId);};}
}
Controller层
import cn.hutool.json.JSONUtil;
import com.geb.common.utils.SseEmitterUtil;
import com.geb.domain.SseMessage;
import com.geb.domain.WdAgent;
import io.grpc.internal.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {/*** 用于创建连接*/@GetMapping("/connect/{userId}")public SseEmitter connect(@PathVariable Long userId) {return SseEmitterUtil.connect(userId);}/*** 关闭连接*/@GetMapping("/close/{userid}")public void close(@PathVariable("userid") Long userid) {SseEmitterUtil.removeUser(userid);}@GetMapping("/sse")public void sse(){// 构建推送消息体SseMessage sseMessage = new SseMessage();sseMessage.setId(1L);sseMessage.setMsg("SSE测试发送消息");sseMessage.setAgentName("测试智能体推送消息");SseEmitterUtil.sendMessage(1L, JSONUtil.toJsonStr(sseMessage));}
}
测试

这里使用postman测试,输入链接点击运行后,会自动处于链接状态

我们发送一条消息测试,我这里对消息实体进行了封装,在postman新建窗口输入测试请求

消息发送完毕,在postman就可以看到刚刚的消息

可以多发送几条消息看看效果,关闭链接调用关闭的接口即可。

这是个本地的测试,如果使用了Nginx反向代理和Gateway网关,具体的使用情况需要根据测试或者生产环境进行相应的配置。

文章参考:Springboot集成SSE消息推送_springboot 集成sse推送-CSDN博客

这篇关于Springboot集成SSE消息推送的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1052252

相关文章

SpringBoot请求参数接收控制指南分享

《SpringBoot请求参数接收控制指南分享》:本文主要介绍SpringBoot请求参数接收控制指南,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring Boot 请求参数接收控制指南1. 概述2. 有注解时参数接收方式对比3. 无注解时接收参数默认位置

SpringBoot基于配置实现短信服务策略的动态切换

《SpringBoot基于配置实现短信服务策略的动态切换》这篇文章主要为大家详细介绍了SpringBoot在接入多个短信服务商(如阿里云、腾讯云、华为云)后,如何根据配置或环境切换使用不同的服务商,需... 目录目标功能示例配置(application.yml)配置类绑定短信发送策略接口示例:阿里云 & 腾

SpringBoot项目中报错The field screenShot exceeds its maximum permitted size of 1048576 bytes.的问题及解决

《SpringBoot项目中报错ThefieldscreenShotexceedsitsmaximumpermittedsizeof1048576bytes.的问题及解决》这篇文章... 目录项目场景问题描述原因分析解决方案总结项目场景javascript提示:项目相关背景:项目场景:基于Spring

Spring Boot 整合 SSE的高级实践(Server-Sent Events)

《SpringBoot整合SSE的高级实践(Server-SentEvents)》SSE(Server-SentEvents)是一种基于HTTP协议的单向通信机制,允许服务器向浏览器持续发送实... 目录1、简述2、Spring Boot 中的SSE实现2.1 添加依赖2.2 实现后端接口2.3 配置超时时

Spring Boot读取配置文件的五种方式小结

《SpringBoot读取配置文件的五种方式小结》SpringBoot提供了灵活多样的方式来读取配置文件,这篇文章为大家介绍了5种常见的读取方式,文中的示例代码简洁易懂,大家可以根据自己的需要进... 目录1. 配置文件位置与加载顺序2. 读取配置文件的方式汇总方式一:使用 @Value 注解读取配置方式二

一文详解Java异常处理你都了解哪些知识

《一文详解Java异常处理你都了解哪些知识》:本文主要介绍Java异常处理的相关资料,包括异常的分类、捕获和处理异常的语法、常见的异常类型以及自定义异常的实现,文中通过代码介绍的非常详细,需要的朋... 目录前言一、什么是异常二、异常的分类2.1 受检异常2.2 非受检异常三、异常处理的语法3.1 try-

Java中的@SneakyThrows注解用法详解

《Java中的@SneakyThrows注解用法详解》:本文主要介绍Java中的@SneakyThrows注解用法的相关资料,Lombok的@SneakyThrows注解简化了Java方法中的异常... 目录前言一、@SneakyThrows 简介1.1 什么是 Lombok?二、@SneakyThrows

Java中字符串转时间与时间转字符串的操作详解

《Java中字符串转时间与时间转字符串的操作详解》Java的java.time包提供了强大的日期和时间处理功能,通过DateTimeFormatter可以轻松地在日期时间对象和字符串之间进行转换,下面... 目录一、字符串转时间(一)使用预定义格式(二)自定义格式二、时间转字符串(一)使用预定义格式(二)自

Spring 请求之传递 JSON 数据的操作方法

《Spring请求之传递JSON数据的操作方法》JSON就是一种数据格式,有自己的格式和语法,使用文本表示一个对象或数组的信息,因此JSON本质是字符串,主要负责在不同的语言中数据传递和交换,这... 目录jsON 概念JSON 语法JSON 的语法JSON 的两种结构JSON 字符串和 Java 对象互转

JAVA保证HashMap线程安全的几种方式

《JAVA保证HashMap线程安全的几种方式》HashMap是线程不安全的,这意味着如果多个线程并发地访问和修改同一个HashMap实例,可能会导致数据不一致和其他线程安全问题,本文主要介绍了JAV... 目录1. 使用 Collections.synchronizedMap2. 使用 Concurren