工作七年,对消息推送使用的一些经验和总结

2024-01-31 18:44

本文主要是介绍工作七年,对消息推送使用的一些经验和总结,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言:不管是APP还是WEB端都离不开消息推送,尤其是APP端,push消息,小信箱消息;WEB端的代办消息等。因在项目中多次使用消息推送且也是很多项目必不可少的组成部分,故此总结下供自己参考。

一、什么是消息推送

消息推送(Push)指运营人员通过自己的产品或第三方工具对用户当前网页或移动设备进行的主动消息推送。用户可以在网页上或移动设备锁定屏幕和通知栏看到push消息通知

二、消息推送的种类

从数据模型分:推和拉

从终端分:APP端和WEB端

从实现层面分:短论询、Comet(长轮询)、Flash XMLSocket、SSE、Web-Socket

类型概念优点缺点备注
短轮询客户端通过定期向服务器发送请求来获取最新的消息。服务器在接收到请求后立即响应,无论是否有新消息。如果服务器没有新消息可用,客户端将再次发送请求后端编写简单

高延迟:因客户端定期发起请求,导致消息延迟,尤其是定期时间设置过长时

高网络负载:无新消息时也会频繁发起请求,消耗服务器资源和网络

时效性差:服务器产生了新消息,客户端不能立马感知到,需等到轮询时间到

Comet(长轮询)客户端发起请求,服务器接到请求后hold住连接,直到有新消息(或超时)才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求

减少请求次数:相对于短轮训而言

减少网络负载:没有消息时会保持连接,减少了频繁请求

时效性稍提高:相对于短轮询而言

没有新消息时会保持请求挂起,直到有新消息到达或超时。相比于短轮询,长轮询可以更快地获取新消息,减少了不必要的请求。
Flash XMLSocket在 HTML 页面中内嵌入一个使用了 XMLSocket 类的 Flash 程序。JavaScript 通过调用此 Flash 程序提供的socket接口与服务器端的socket进行通信网络聊天室,网络互动游戏使用较多

SSE(Server-send Events)

服务器主动推送时效性好:SSE使用了持久连接,可以实现比短轮询和长轮询更好的实时性

单向通道:SSE是单向的,只允许服务器向客户端推送消息,客户端无法向服务器发送消息

不适用低版本浏览器:SSE是HTML5的一部分,不支持低版本的浏览器。在使用SSE时,需要确保客户端浏览器的兼容性

Web-SocketWebSocket是一种双向通信协议,允许在单个持久连接上进行全双工通信

时效性最佳:WebSocket 提供了真正的双向通信,可以实现实时的双向数据传输,具有最佳的实时性

低延迟:与轮询和长轮询相比,WebSocket 使用单个持久连接,减少了连接建立和断开的开销,从而降低了延迟

双向通信:WebSocket 允许服务器与客户端之间进行双向通信,服务器可以主动向客户端发送消息,同时客户端也可以向服务器发送消息

较高的网络负载:WebSocket 使用长连接,会占用一定的网络资源。在大规模并发场景下,需要注意服务器的负载情况

浏览器支持:大多数现代浏览器都支持 WebSocket,但需要注意在开发过程中考虑不同浏览器的兼容性

短轮询:客户端定时轮询发起请求

长轮询:客户端发起请求,等待后端响应并再次发起请求

Flash XMLSocket:

原理示意图:

利用Flash XML Socket实现”服务器推”技术前提:
(1)Flash提供了XMLSocket类,服务器利用Socket向Flash发送数据;
(2)JavaScript和Flash的紧密结合JavaScript和Flash可以相互调用。
优点是实现了socket通信,不再利用无状态的http进行伪推送。但是缺点更明显:
1.客户端必须安装 Flash 播放器;
2.因为 XMLSocket 没有 HTTP 隧道功能,XMLSocket 类不能自动穿过防火墙;
3.因为是使用套接口,需要设置一个通信端口,防火墙、代理服务器也可能对非 HTTP 通道端口进行限制。

SSE:当使用Server-Sent Events(SSE)时,客户端(通常是浏览器)与服务器之间建立一种持久的连接,使服务器能够主动向客户端发送数据。这种单向的、服务器主动推送数据的通信模式使得实时更新的数据能够被实时地传送到客户端,而无需客户端进行轮询请求

SSE的工作原理如下: 

  1. 建立连接:客户端通过使用EventSource对象在浏览器中创建一个与服务器的连接。客户端向服务器发送一个HTTP请求,请求的头部包含Accept: text/event-stream,以表明客户端希望接收SSE数据。服务器响应这个请求,并建立一个持久的HTTP连接。

  2. 保持连接:服务器保持与客户端的连接打开状态,不断发送数据。这个连接是单向的,只允许服务器向客户端发送数据,客户端不能向服务器发送数据。

  3. 服务器发送事件:服务器使用Content-Type: text/event-stream标头来指示响应是SSE数据流。服务器将数据封装在特定的SSE格式中,每个事件都以data:开头,后面是实际的数据内容,以及可选的其他字段,如event:id:。服务器发送的数据可以是任何文本格式,通常是JSON。

  4. 客户端接收事件:客户端通过EventSource对象监听服务器发送的事件。当服务器发送事件时,EventSource对象会触发相应的事件处理程序,开发人员可以在处理程序中获取到事件数据并进行相应的操作。常见的事件是message事件,表示接收到新的消息。

  5. 断开连接:当客户端不再需要接收服务器的事件时,可以关闭连接。客户端可以调用EventSource对象的close()方法来显式关闭连接,或者浏览器在页面卸载时会自动关闭连接。

在Spring Boot中,可以使用SseEmitter类来实现SSE:

@RestController
public class SSEController {private SseEmitter sseEmitter;@GetMapping("/subscribe")public SseEmitter subscribe() {sseEmitter = new SseEmitter();return sseEmitter;}@PostMapping("/send-message")public void sendMessage(@RequestBody String message) {try {if (sseEmitter != null) {sseEmitter.send(SseEmitter.event().data(message));}} catch (IOException e) {e.printStackTrace();}}
}
<script>// 创建一个EventSource对象,指定SSE的服务端端点var eventSource = new EventSource('/subscribe');console.log("eventSource=", eventSource)// 监听message事件,接收从服务端发送的消息eventSource.addEventListener('message', function(event) {var message = event.data;console.log("message=", message)var messageContainer = document.getElementById('message-container');messageContainer.innerHTML += '<p>' + message + '</p>';});
</script>

上述过程:客户端可以通过访问/subscribe接口来订阅SSE事件,服务器会返回一个SseEmitter对象。当有新消息到达时,调用SseEmitter对象的send()方法发送消息。

Web-Socket:

HTML代码: 

<script>// 创建WebSocket对象,并指定服务器的URLvar socket = new WebSocket('ws://localhost:8080/上下文路径/channel/message/');// 监听WebSocket的连接事件socket.onopen = function(event) {console.log('WebSocket connected');};// 监听WebSocket的消息事件socket.onmessage = function(event) {var message = event.data;var messageContainer = document.getElementById('message-container');messageContainer.innerHTML += '<p>' + message + '</p>';};// 监听WebSocket的关闭事件socket.onclose = function(event) {console.log('WebSocket closed');};// 发送消息到服务器function sendMessage() {var messageInput = document.getElementById('message-input');var message = messageInput.value;socket.send(message);messageInput.value = '';}
</script>

三、项目中使用的消息推送

例子1:Web-Socket
1.引入websocket依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
2.websocket配置
/*** @描述 开启WebSocket支持的配置类* 自动注册使用@ServerEndpoint*/
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
3.websocket服务器端代码

说明:@ ServerEndpoint 注解是一个类层次的注解,主要是将当前类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端

/*** 消息推送**/
@ServerEndpoint("/channel/message/{user-id}")
@Slf4j
@Component
@RequiredArgsConstructor
public class TodoChannel implements ApplicationListener<FlowMessageEvent> {private static final Map<String, Set<Session>> SESSION_MAP = new ConcurrentHashMap<>();private Session session;private String userId;@OnMessagepublic void onMessage(String message) {log.info("websocket消息(id={}): {}", this.session.getId(), message);}@OnOpenpublic void onOpen(Session session, @PathParam("user-id") String userId) {this.session = session;this.userId = userId;val sessionSet = SESSION_MAP.getOrDefault(this.userId, new CopyOnWriteArraySet<>());sessionSet.add(session);SESSION_MAP.put(this.userId, sessionSet);log.info("websocket连接: id={}", this.session.getId());val message = new MessageModel();//往todoModel放业务数据session.getAsyncRemote().sendText(JSON.toJSONString(message));}@OnClosepublic void onClose(CloseReason closeReason) {val sessionSet = SESSION_MAP.get(this.userId);if (sessionSet != null) {sessionSet.remove(session);}log.info("websocket断开: id={} {}", this.session.getId(), closeReason);}@OnErrorpublic void onError(Throwable throwable) {log.warn("websocket异常: id={} throwable:", this.session.getId(), throwable);val sessionSet = SESSION_MAP.get(this.userId);if (sessionSet != null) {sessionSet.remove(this.session);}try {this.session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));} catch (IOException e) {log.error("websocket关闭失败", e);}}@Overridepublic void onApplicationEvent(@NotNull FlowMessageEvent event) {Set<String> userIds = CastUtils.cast(event.getSource());userIds.forEach(id -> {val sessionSet = SESSION_MAP.get(id);if (sessionSet == null) {return;}//业务处理todosessionSet.forEach(s -> s.getAsyncRemote().sendObject(JSON.toJSON(message)));});}@Scheduled(fixedRate = 24 * 60 * 60 * 1000L)public void sessionCleaner() {log.info("websocket message channel session清理");val keyToClean = new HashSet<String>();SESSION_MAP.forEach((k, v) -> {val sessionToClean = new HashSet<Session>();v.forEach(s -> {if (!s.isOpen()) {sessionToClean.add(s);}});v.removeAll(sessionToClean);if (v.isEmpty()) {keyToClean.add(k);}});keyToClean.forEach(SESSION_MAP::remove);}@Dataprivate static class MessageModel implements Serializable {private static final long serialVersionUID = 1L;private List<Info> list;private Integer size;@AllArgsConstructor@Valueprivate static class Info implements Serializable {private static final long serialVersionUID = 1L;String type;String name;}}
}
 4.事件类代码
/*** message事件**/
public class FlowMessageEvent extends ApplicationEvent {public FlowMessageEvent(Object source) {super(source);}
}
5.使用事件推送消息

applicationEventPublisher.publishEvent(new FlowMessageEvent(user));

例子2:RabbitMq:
1.引入rabbitmq依赖
2.编写rabbitmq配置类
/*** @author wux* @version 1.0.0*/
@SpringBootConfiguration
@Slf4j
public class RabbitMqConfig {@Value("${spring.rabbitmq.host:10.128.30.xxx}")private String host;@Value("${spring.rabbitmq.port:5672}")private int port;@Value("${spring.rabbitmq.username:guest}")private String username;@Value("${spring.rabbitmq.password:guest}")private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory factory = new CachingConnectionFactory(host, port);factory.setUsername(username);factory.setPassword(password);//连接工厂开启消息确认和消息返回机制
//        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//        factory.setPublisherReturns(true);return factory;}@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//使用json 序列化和反序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate();template.setConnectionFactory(connectionFactory());template.setMessageConverter(new Jackson2JsonMessageConverter());return template;}@Beanpublic Queue testDirectQueue() {return new Queue(RabbitMqConsts.TEST_QUE_PHM_WARN_INFO, true, false, false);}@Beanpublic DirectExchange testDirectExchange() {return new DirectExchange(RabbitMqConsts.TEST_EXC_PHM_WARN_INFO, true,false);}@Beanpublic Binding TestBinding() {return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(RabbitMqConsts.TEST_KEY_PHM_WARN_INFO);}
}
3.编写rabbitmq工具类
/*** @author wux* @version 1.0.0* @description rabbit工具类*/
@Slf4j
public class RabbitMqUtil {private static AmqpAdmin getAmqpAdmin() {return SpringContextUtils.getBean("amqpAdmin",AmqpAdmin.class);}/**通过amqpAdmin动态创建队列、交换机和绑定关系ttlFlag :设置消息过期时间*/public static void createQueueAndExchangeIfNeed(String businessName, ExchangeEnum typeEnum, Integer ttl) {String exchangeName = "exc_" + businessName;String queueName = "que_" + businessName;String routingKey = "key_" + businessName;if (CheckUtil.isNotEmpty(getQueueInfo(queueName))) {return;}//创建队列Queue queue = createAndBindQueue(queueName, ttl);//创建交换机Exchange exchange = createAndBindExchange(exchangeName, typeEnum);//绑定队列和交换机switch (typeEnum){case DIRECT:binding(queueName, exchangeName, routingKey, typeEnum);break;case FANOUT:fanoutBinding(queue, exchange);break;default:binding(queueName, exchangeName, routingKey, typeEnum);}}private static Queue createAndBindQueue(String queueName, Integer ttl) {Map<String, Object> arguments = new HashMap<>();//设置过期时间,单位是毫秒if (CheckUtil.isNotEmpty(ttl)) {arguments.put("x-message-ttl", ttl);}if (CheckUtil.isEmpty(queueName)) {log.error("队列名称为空!queueName=" + queueName);throw new BusinessException("队列名称为空!");}Queue queue = new Queue(queueName, true, false, false, arguments);getAmqpAdmin().declareQueue(queue);return queue;}public static QueueInformation getQueueInfo (String queueName) {if (CheckUtil.isEmpty(queueName)) {return null;}return getAmqpAdmin().getQueueInfo(queueName);}private static Exchange createAndBindExchange(String exchangeName, ExchangeEnum typeEnum){AbstractExchange exchange = null;switch (typeEnum){case DIRECT:exchange = new DirectExchange(exchangeName, true, false);break;case TOPIC:exchange = new TopicExchange(exchangeName, true, false);break;case FANOUT:exchange = new FanoutExchange(exchangeName, true, false);break;case HEADERS:exchange = new HeadersExchange(exchangeName, true, false);break;default:exchange = new DirectExchange(exchangeName, true, false);}getAmqpAdmin().declareExchange(exchange);return exchange;}private static void binding(String queueName, String exchangeName, String routingKey, ExchangeEnum typeEnum) {//绑定队列和交换机Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);getAmqpAdmin().declareBinding(binding);}private static void fanoutBinding(Queue queue, Exchange exchange) {BindingBuilder.bind(queue).to(exchange);}
}
4.监听和推送消息
/*** @author wux* @version 1.0.0* @description 监听phm设备状态消息*/
@Component
@Slf4j
public class MotePhmDeviceStatesListener {private static final String CLASS_NAME = "MotePhmDeviceStatesListener";@Autowiredprivate Map<String, AssembleDeviceStatesStrategy> map = new ConcurrentHashMap<String, AssembleDeviceStatesStrategy>();@Resourceprivate MoteMessageService moteMessageService;@Autowiredprotected BeanMapper beanMapper;@RabbitHandler@RabbitListener(bindings = @QueueBinding(value=@Queue("que_phm_device_states"),exchange = @Exchange("exc_phm_device_states"),key = "key_phm_device_states"))public void process(@Payload BaseMessage req, Message msg, Channel channel) {final String METHOD_NAME ="process";log.info(CLASS_NAME + "-" + METHOD_NAME + "-start,req={},msg={}", req, msg);long deliverTag = msg.getMessageProperties().getDeliveryTag();if (!MessageTypeEnum.DEVICE_STATES.getKey().equals(req.getType())) {log.warn(CLASS_NAME + "-" + METHOD_NAME + "-message=消息类型不匹配!");//拒绝,重新回到队列//channel.clearReturnListeners();//channel.basicReject(deliverTag, true);return;}try {String service = MessageSubTypeEnum.getService(req.getSubType());AssembleDeviceStatesStrategy strategy = map.get(service);BaseMessage reqMessage = beanMapper.map(req, BaseMessage.class);BaseMessage retMessage = strategy.assembleDeviceStates(reqMessage);strategy.sendMessage(retMessage, reqMessage, channel);} catch (Exception e) {log.error(CLASS_NAME + "-" + METHOD_NAME + "-异常, e={}", e);return;//拒绝,重新回到队列//channel.basicNack(deliverTag, false,true);}}
}
 public void sendMessage(BaseMessage retMessage, BaseMessage req, Channel channel) {retMessage.setDate(new Date());retMessage.setDateStr(DateUtils.format(retMessage.getDate(), DateUtils.DATE_TIME_SECOND));if (CheckUtil.isEmpty(req.getQueueName())) {log.warn("AssembleDeviceStatesStrategy" + "队列名称为空,req={}", req);return;}QueueInformation queueInfo = RabbitMqUtil.getQueueInfo(req.getQueueName());if (CheckUtil.isEmpty(queueInfo) || CheckUtil.isEmpty(queueInfo.getName())) {log.warn("AssembleDeviceStatesStrategy" + "-" + "队列不存在!" + ",req={}", req);return;}
//        try {
//            long count = channel.messageCount(req.getQueueName());
//            if (count >= 5000) {
//                channel.queueDelete(req.getQueueName());
//            }
//
//        } catch (IOException e) {
//            log.error("AssembleDeviceStatesStrategy" + "-" + "清除队列消息失败" + ",retMessage={}", retMessage, e);
//        }rabbitTemplate.convertAndSend(req.getQueueName(), retMessage);log.info("AssembleDeviceStatesStrategy" + "-" + "sendMessage推给前端信息" + ",retMessage={},req={}", retMessage, req);}
例子3:Kafka:

使用@KafkaListene(topics="xxx", groupId="xxx")  接受消息

四、消息中间件:RabbitMQ、RocketMQ、Kafka

高并发情况下,或者规模较大,推荐使用消息中间件,搭建一个公共平台,统一管理消息推送,项目层面进行隔离即可。

RabbitMQ可看:https://blog.csdn.net/baidu_35160588/article/details/89027810

这篇关于工作七年,对消息推送使用的一些经验和总结的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/664714

相关文章

Python使用PIL库将PNG图片转换为ICO图标的示例代码

《Python使用PIL库将PNG图片转换为ICO图标的示例代码》在软件开发和网站设计中,ICO图标是一种常用的图像格式,特别适用于应用程序图标、网页收藏夹图标等场景,本文将介绍如何使用Python的... 目录引言准备工作代码解析实践操作结果展示结语引言在软件开发和网站设计中,ICO图标是一种常用的图像

使用Java发送邮件到QQ邮箱的完整指南

《使用Java发送邮件到QQ邮箱的完整指南》在现代软件开发中,邮件发送功能是一个常见的需求,无论是用户注册验证、密码重置,还是系统通知,邮件都是一种重要的通信方式,本文将详细介绍如何使用Java编写程... 目录引言1. 准备工作1.1 获取QQ邮箱的SMTP授权码1.2 添加JavaMail依赖2. 实现

MyBatis与其使用方法示例详解

《MyBatis与其使用方法示例详解》MyBatis是一个支持自定义SQL的持久层框架,通过XML文件实现SQL配置和数据映射,简化了JDBC代码的编写,本文给大家介绍MyBatis与其使用方法讲解,... 目录ORM缺优分析MyBATisMyBatis的工作流程MyBatis的基本使用环境准备MyBati

使用Python开发一个图像标注与OCR识别工具

《使用Python开发一个图像标注与OCR识别工具》:本文主要介绍一个使用Python开发的工具,允许用户在图像上进行矩形标注,使用OCR对标注区域进行文本识别,并将结果保存为Excel文件,感兴... 目录项目简介1. 图像加载与显示2. 矩形标注3. OCR识别4. 标注的保存与加载5. 裁剪与重置图像

使用Python实现表格字段智能去重

《使用Python实现表格字段智能去重》在数据分析和处理过程中,数据清洗是一个至关重要的步骤,其中字段去重是一个常见且关键的任务,下面我们看看如何使用Python进行表格字段智能去重吧... 目录一、引言二、数据重复问题的常见场景与影响三、python在数据清洗中的优势四、基于Python的表格字段智能去重

使用Apache POI在Java中实现Excel单元格的合并

《使用ApachePOI在Java中实现Excel单元格的合并》在日常工作中,Excel是一个不可或缺的工具,尤其是在处理大量数据时,本文将介绍如何使用ApachePOI库在Java中实现Excel... 目录工具类介绍工具类代码调用示例依赖配置总结在日常工作中,Excel 是一个不可或缺的工http://

Java之并行流(Parallel Stream)使用详解

《Java之并行流(ParallelStream)使用详解》Java并行流(ParallelStream)通过多线程并行处理集合数据,利用Fork/Join框架加速计算,适用于大规模数据集和计算密集... 目录Java并行流(Parallel Stream)1. 核心概念与原理2. 创建并行流的方式3. 适

如何使用Docker部署FTP和Nginx并通过HTTP访问FTP里的文件

《如何使用Docker部署FTP和Nginx并通过HTTP访问FTP里的文件》本文介绍了如何使用Docker部署FTP服务器和Nginx,并通过HTTP访问FTP中的文件,通过将FTP数据目录挂载到N... 目录docker部署FTP和Nginx并通过HTTP访问FTP里的文件1. 部署 FTP 服务器 (

MySQL 日期时间格式化函数 DATE_FORMAT() 的使用示例详解

《MySQL日期时间格式化函数DATE_FORMAT()的使用示例详解》`DATE_FORMAT()`是MySQL中用于格式化日期时间的函数,本文详细介绍了其语法、格式化字符串的含义以及常见日期... 目录一、DATE_FORMAT()语法二、格式化字符串详解三、常见日期时间格式组合四、业务场景五、总结一、

Python中配置文件的全面解析与使用

《Python中配置文件的全面解析与使用》在Python开发中,配置文件扮演着举足轻重的角色,它们允许开发者在不修改代码的情况下调整应用程序的行为,下面我们就来看看常见Python配置文件格式的使用吧... 目录一、INI配置文件二、YAML配置文件三、jsON配置文件四、TOML配置文件五、XML配置文件