Web服务端通过SSE推送消息给浏览器客户端的实现方案(附详细代码和仓库地址)

本文主要是介绍Web服务端通过SSE推送消息给浏览器客户端的实现方案(附详细代码和仓库地址),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

    • 1、SSE(Server-Sent Events)简介
    • 2、SSE 的工作原理
    • 3、SSE 与客户端轮询的区别和优势比较
      • 区别
      • 优势
    • 4、SSE简单实现(单机应用Demo)
      • 演示效果
      • SSE-Demo仓库地址
      • 下面直接贴代码:
      • 前端实现:
      • 后端实现:
    • 5、SSE简单实现(分布式应用Demo)
      • SSE-Demo仓库地址
      • 关键代码
      • 方案说明

1、SSE(Server-Sent Events)简介

Server-Sent Events (SSE) 是一种基于 HTTP 协议的服务器推送技术,允许服务器通过单个持久连接向客户端发送实时更新。客户端使用标准的 EventSource API 来接收服务器推送的事件,这种通信方式非常适合实时应用,如消息通知、股票行情更新、社交媒体更新等。

2、SSE 的工作原理

  • 单向连接:SSE 建立的是单向通道,即服务器向客户端推送数据,客户端只能接收,不能发送。
  • 持久连接:SSE 使用的是长连接(Long Polling),即连接一旦建立,将会持续存在,直到客户端或服务器关闭连接。
  • 文本数据:SSE 通过 text/event-stream MIME 类型传输数据,数据是纯文本格式。
  • 自动重连:如果连接中断,EventSource 会自动尝试重新连接,确保客户端能够接收后续的推送。

3、SSE 与客户端轮询的区别和优势比较

客户端轮询(Client Polling) 是一种传统的客户端从服务器请求数据的方式。客户端会定期向服务器发送请求,检查是否有新数据可用。

区别

连接方式:
SSE:建立后服务器主动推送数据,连接是持久的,数据在有更新时实时传递。
轮询:客户端定期发送请求获取数据,连接是间歇性的。

实时性:
SSE:数据几乎是实时推送的,延迟极低。
轮询:数据获取延迟取决于轮询的频率,频率高则延迟低,但频率低可能导致数据延迟。

网络和服务器负载:
SSE:由于是单个持久连接,减少了频繁的请求与响应开销,降低了服务器负载。
轮询:频繁的请求会增加服务器和网络的负担,尤其是在轮询频率较高时。

连接控制:
SSE:自动处理连接中断和重连,客户端实现简单。
轮询:需要客户端定期发起请求,且如果请求频率不当,可能导致资源浪费。

数据传输效率:
SSE:只在有数据更新时推送,传输效率高。
轮询:即使没有数据更新,客户端也会定期请求,效率低下。

优势

SSE 的优势:
更高效的网络和服务器资源利用率。
实时性更高,延迟更低。
实现简单,特别是在浏览器环境中,支持自动重连和事件处理。
适合需要频繁更新但客户端无需响应的场景。

客户端轮询的优势:
在不支持 SSE 的环境下仍然可以使用。
实现和理解相对简单,兼容性更好。

4、SSE简单实现(单机应用Demo)

演示效果

在这里插入图片描述

SSE-Demo仓库地址

https://github.com/deepjava-gm/SSE-Demo.git

下面直接贴代码:

前端实现:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>SSE 用户消息推送 Demo</title><style>body {font-family: Arial, sans-serif;margin: 20px;}h1 {text-align: center;}.container {max-width: 600px;margin: 0 auto;padding: 20px;border: 1px solid #ccc;border-radius: 8px;}.form-group {margin-bottom: 15px;}label {display: block;margin-bottom: 5px;font-weight: bold;}input[type="text"] {width: 100%;padding: 8px;box-sizing: border-box;border: 1px solid #ccc;border-radius: 4px;}button {padding: 10px 20px;color: white;background-color: #007bff;border: none;border-radius: 4px;cursor: pointer;}button:hover {background-color: #0056b3;}#messages {margin-top: 20px;}.message {background-color: #f1f1f1;padding: 10px;margin-bottom: 10px;border-radius: 4px;}.status {margin-top: 10px;font-weight: bold;}.success {color: green;}.error {color: red;}</style>
</head>
<body><h1>SSE 用户消息推送 Demo</h1><div class="container"><!-- 连接部分 --><div class="form-group"><label for="userId">用户 ID:</label><input type="text" id="userId" placeholder="请输入您的用户 ID"><br><br><button onclick="connect()">连接</button></div><div id="connectionStatus" class="status"></div><!-- 消息发送部分 --><div class="form-group"><label for="targetUserId">目标用户 ID:</label><input type="text" id="targetUserId" placeholder="请输入目标用户 ID"></div><div class="form-group"><label for="message">消息内容:</label><input type="text" id="message" placeholder="请输入要发送的消息"></div><button onclick="sendMessage()">推送消息</button><!-- 消息显示部分 --><div id="messages"></div></div><script>let eventSource;let currentUserId = '';function connect() {const userId = document.getElementById('userId').value;const connectionStatus = document.getElementById('connectionStatus');if (!userId) {alert('请输入用户 ID');return;}// 显示连接状态为“连接中”connectionStatus.textContent = '已连接...';connectionStatus.className = 'status';eventSource = new EventSource(`http://localhost:9999/sse/connect/${userId}`);currentUserId = userId; // 保存当前用户 IDeventSource.onopen = function() {connectionStatus.textContent = '接收成功';connectionStatus.className = 'status success';};eventSource.onmessage = function(event) {try {// 解析 JSON 消息const data = JSON.parse(event.data);const newElement = document.createElement('div');newElement.className = 'message';newElement.innerText = `用户 ${data.senderId} 接收的消息: ${data.message}`;document.getElementById('messages').appendChild(newElement);} catch (e) {console.error('消息解析错误:', e);}};eventSource.onerror = function(event) {connectionStatus.textContent = '连接失败,请检查网络或服务器';connectionStatus.className = 'status error';console.error("连接错误: ", event);eventSource.close();};}function sendMessage() {const targetUserId = document.getElementById('targetUserId').value;const message = document.getElementById('message').value;if (!targetUserId || !message) {alert('请填写目标用户 ID 和消息内容');return;}// 发送 GET 请求推送消息fetch(`http://localhost:9999/sse/push/${targetUserId}?message=${encodeURIComponent(message)}`, {method: 'GET'}).then(response => {if (response.ok) {console.log('消息发送成功');} else {console.log('消息发送失败');}}).catch(error => {console.error('发送错误:', error);alert('消息发送失败');});}</script>
</body>
</html>

后端实现:

启动类:

package io.github.deepjava;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;@SpringBootApplication
public class SseDemoApplication {public static void main(String[] args) {SpringApplication.run(SseDemoApplication.class);}// 注入一个全局缓存 用来保存不同用户的SSE连接信息@Bean("userSSEMap")public ConcurrentHashMap<String, SseEmitter> getUserSSEMap(){return new ConcurrentHashMap<>();}}

Controller:

package io.github.deepjava.controller;import io.github.deepjava.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;@RestController
@RequestMapping("/sse")
@CrossOrigin(origins = "*")
@Slf4j
public class SseController {@Resource(name = "userSSEMap")private ConcurrentHashMap<String, SseEmitter> userSSEMap;@Resourceprivate SseService sseService;// 连接方法:为用户 ID 注册 SSE 链接@GetMapping("/connect/{userId}")public SseEmitter connect(@PathVariable String userId) {SseEmitter emitter = new SseEmitter(0L); // 设置超时时间为无限大userSSEMap.put(userId, emitter);// 连接正常关闭回调 移除连接emitter.onCompletion(() -> {userSSEMap.remove(userId);log.info("连接正常关闭回调 移除连接");});// 连接超时回调 移除连接emitter.onTimeout(() -> {userSSEMap.remove(userId);log.info("连接超时回调 移除连接");});// 连接出错回调 移除连接emitter.onError((e) -> {userSSEMap.remove(userId);log.info("连接出错回调 移除连接");});log.info("连接成功!");return emitter;}// 推送方法:根据用户 ID 发送消息@GetMapping("/push/{userId}")public void push(@PathVariable String userId, @RequestParam String message) {sseService.extracted(userId, message);}}

Service:

package io.github.deepjava.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;@Service
@Slf4j
public class SseService {@Resource(name = "userSSEMap")private ConcurrentHashMap<String, SseEmitter> clients;public void extracted(String userId, String message) {SseEmitter emitter = clients.get(userId);if (emitter != null) {try {// 创建包含用户 ID 和消息内容的 JSON 对象String jsonMessage = String.format("{\"senderId\":\"%s\", \"message\":\"%s\"}", userId, message);emitter.send(jsonMessage);log.info("消息推送成功!");} catch (IOException e) {clients.remove(userId);log.info("消息推送失败!");}}}}

配置文件:application.properties

spring.application.name=sse-demo
server.port=9999

Maven的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.1</version></parent><groupId>org.example</groupId><artifactId>SSE-Demo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--lombok  --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>

5、SSE简单实现(分布式应用Demo)

注意:
SSE 连接(如 SseEmitter)是持久化的、与具体服务器实例相关联的动态对象,无法直接存储在 Redis 等外部存储中。Redis 主要用于消息传递和共享数据,但无法直接管理活跃的连接。所以下面方案仅使用 Redis 进行消息广播。

解决方案概述
为了在分布式环境中实现 SSE,通常采用以下架构:

  • 每个服务器实例维护本地的 SSE 连接:每个实例只管理与自身连接的客户端。
  • 使用 Redis 进行消息广播:当需要向特定用户推送消息时,将消息发布到 Redis 频道。所有实例订阅该频道,并检查自己是否有需要向某个用户推送的连接。
  • 用户与实例的映射:使用 Redis 存储用户与服务器实例的映射信息,确保消息能够被正确路由到处理该用户连接的实例。
    虽然无法完全将连接信息存储在 Redis 中,但通过这种方式,可以有效地在分布式环境中管理 SSE 连接和消息推送。

这里只贴主要的后端代码:完整代码去下载仓库代码看。

SSE-Demo仓库地址

https://github.com/deepjava-gm/SSE-Demo.git

关键代码

redis配置:

 // 配置redis的序列化@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());return template;}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}

redis监听主题:

package io.github.deepjava.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;@RestController
@RequestMapping("/dis/sse")
@CrossOrigin(origins = "*")
@Slf4j
public class DistributedSseController {@Resource(name = "userSSEMap")private ConcurrentHashMap<String, SseEmitter> userSSEMap;@Resourceprivate RedisTemplate<String, String> redisTemplate;@Resourceprivate RedisMessageListenerContainer redisMessageListenerContainer;private final ChannelTopic topic = new ChannelTopic("sse-messages");@PostConstructpublic void init() {// 订阅 Redis 频道redisMessageListenerContainer.addMessageListener(new MessageListenerAdapter((MessageListener) (message, pattern) -> {String payload = new String(message.getBody(), StandardCharsets.UTF_8);// 假设消息的格式为 "userId:message"String[] parts = payload.split(":", 2);if (parts.length == 2) {String userId = parts[0];String userMessage = parts[1];// 发送消息给本地的 SSE 连接SseEmitter emitter = userSSEMap.get(userId);if (emitter != null) {try {String jsonMessage = String.format("{\"senderId\":\"%s\", \"message\":\"%s\"}", userId, userMessage);emitter.send(jsonMessage);} catch (IOException e) {emitter.completeWithError(e);userSSEMap.remove(userId);}}}}), topic);}// 连接方法:为用户 ID 注册 SSE 链接@GetMapping("/connect/{userId}")public SseEmitter connect(@PathVariable String userId) {SseEmitter emitter = new SseEmitter(0L); // 设置超时时间为无限大userSSEMap.put(userId, emitter);// 连接正常关闭回调 移除连接emitter.onCompletion(() -> {userSSEMap.remove(userId);log.info("连接正常关闭回调 移除连接");});// 连接超时回调 移除连接emitter.onTimeout(() -> {userSSEMap.remove(userId);log.info("连接超时回调 移除连接");});// 连接出错回调 移除连接emitter.onError((e) -> {userSSEMap.remove(userId);log.info("连接出错回调 移除连接");});log.info("连接成功!");return emitter;}@GetMapping("/push/{userId}")public void push(@PathVariable String userId, String message) {// 将消息发布到 Redis 频道redisTemplate.convertAndSend(topic.getTopic(), userId + ":" + message);}
}

方案说明

SSE 连接管理:
使用 ConcurrentHashMap<String, SseEmitter> 存储用户的连接信息,每个服务器实例只维护与自身连接的客户端。
connect 方法用于创建 SSE 连接并保存到本地缓存。

Redis 消息广播:
通过 Redis 的 发布订阅(Pub/Sub) 机制,所有实例订阅同一个频道(sse-messages)。
push 方法将消息发布到 Redis 频道,所有订阅了该频道的实例都会收到消息,并检查是否有对应的连接需要推送。

这篇关于Web服务端通过SSE推送消息给浏览器客户端的实现方案(附详细代码和仓库地址)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1128040

相关文章

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

常用的jdk下载地址

jdk下载地址 安装方式可以看之前的博客: mac安装jdk oracle 版本:https://www.oracle.com/java/technologies/downloads/ Eclipse Temurin版本:https://adoptium.net/zh-CN/temurin/releases/ 阿里版本: github:https://github.com/

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount