ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp

2024-08-21 00:18

本文主要是介绍ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、 背景

对目前前后端分离的开发开发的大环境下, 前端使用 vue 进行项目的开发, 后端不在使用以前的 jsp 的开发方式进行开发, 因此造成了对于前端推送方案的选型问题, 在项目的开发过程针对于前后端的开发时间和效率等综合考虑进行了一个技术的选型,其中有过多种的方案的考虑。

二、 各种推送方案的比较

1. 各种推送方案的简单介绍

Ajax 轮询:

轮询:缺点,糟糕的用户体验;对服务器压力很大,并造成带宽的极大浪费。

DWR:

DWR 是一个用于改善 web 页面与 Java 类交互的远程服务器端 Ajax 开源框架,可以帮助开发人员开发包含 AJAX 技术的网站。它可以允许在浏览器里的代码使用运行在 WEB 服务器上的 JAVA 函数,就像它就在浏览器里一样。 将 java 代码转为 js 文件,引入 js 文件路径不能更改,原理也是轮询。

Activemq 结合 Ajax:

ActiveMQ 支持 Ajax,利用 ActiveMQ 的“发布/订阅”的特性,来创建高度实时的 web 应用, 需要结合 Amq.js+JQuery 进行处理发布和订阅消息。参考文章官方使用说明可以简单的了解如何使用。

Netty:

基于原生 NIO 实现的高并发框架,配合 websocket 实现消息推送, netty 会单独开一个 websocket 端口处理请求,并不会占用中间件的连接数,而且一个线程可以处理几万个链接。

Spring 对于 Websocket 的支持:

websocket 是 Html5 新增加特性之一,目的是浏览器与服务端建立全双工的通信方式,解决 http 请求-响应带来过多的资源消耗,同时对特殊场景应用提供了全新的实现方式, 比如聊天、股票交易、游戏等对对实时性要求较高的行业领域。

ActiveMQ 对于 Websocket 的支持:

可以通过 stomp 协议简单的处理发布订阅这样的消息模型并且建立在 Websocket 的基础上进行。 STOMP 即 Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连
接格式,允许 STOMP 客户端与任意 STOMP 消息代理(Broker)进行交互。参考文章STOMP Over WebSocket

2. 推送方案的选型

对于上述的各种推送的方案都曾经有过考虑和分析, 最终选取了最后一种方案”ActiveMQ 对于Websocket 的支持”.

DWR 和 Ajax

轮询采用同样的技术原理轮询,性能上首先淘汰掉了这两个方案

Activemq 结合 Ajax

在使用上来说依赖于 amq.js,这个 js 文件还依赖一些基于公共JavaScript 框架: jquery.js、 amq_jquery_adapter.js。因此,使用 amq.js 时,,必须先引入 jquery 库文件和适配器库文件 amq_jquery_adapter.js,其次这个方案的实现原理也是页
面轮询的方式, 就目前我们前端开发的特点上来说,使用的是 vue 不再提倡使用 JQuery 这样的类库,如果采用这样的方式非常的不利于前端的处理, 所以排除。

采用 Websocket

最后的三种方案其实都是采用 Websocket 的方式进行处理, Spring 对于 Websocket 的支持这种方案的使用可以参考Spring WebSocket 实现消息推送,这种方案目前在部门中没有看到过使用不成熟,需要自己去手动维护连接用户的信息并且针对不同的推送信息需要对于客户端进行区分到底是谁需要当前的这个推送的请求.Netty 的这个方案在之前有过使用, 代码量庞大并且当前的开发人员对于这个模块并不是很熟悉且与 spring 对于websocket 的支持有同样的毛病, 需要代码手动维护区分当前的推送需要针对哪些用户或者哪些页面的请求, 前端开发也是蛮复杂的,迁移成本巨大。 这里有一点需要说明,其实 spring也是支持 Stomp 协议Spring+STOMP 实现 WebSocket 广播订阅、权限认证、一对一通讯的,之前对于这方面不太了解,spring 使用本地维护 session 信息支持 Stomp 协议,同时也是支持与 ActiveMQ 集成,但是需要 ActiveMQ 支持 ws 协议;Spring Boot系列十七 Spring Boot 集成 websocket,使用RabbitMQ做为消息代理

spring websocket 配置类

/*** Spring websocket 接入使用** @author: wangji* @date: 2018/04/13 15:37*/
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfig extends AbstractWebSocketMessageBrokerConfigurer {@Resource(name = "taskExecutor")private ThreadPoolTaskExecutor threadPool;/*** 连接的端点,客户端建立连接时需要连接这里配置的端点*/@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/client").setAllowedOrigins("*").setHandshakeHandler(new DefaultHandshakeHandler()).addInterceptors(new HandshakeInterceptor() {/*** 添加权限校验是否登录* 不进行校验可以使用默认的 HttpSessionHandshakeInterceptor** @param serverHttpRequest* @param serverHttpResponse* @param webSocketHandler* @param map* @return* @throws Exception* @see org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor*/@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {boolean result = true;if (!StringUtils.isNotEmpty(LoginInfoUtil.getRequesetUserId())) {result = false;}return result;}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {}});}/*** 使用参考** @link {https://blog.csdn.net/elonpage/article/details/78446695?locationNum=5&fps=1}* @see org.springframework.messaging.simp.SimpMessageSendingOperations#convertAndSend(Object, Object) 发送消息(/topic/xxxx,"消息")*/@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.setApplicationDestinationPrefixes("/app");registry.setUserDestinationPrefix("/user");registry.enableSimpleBroker("/topic", "/queue");}/*** 输入通道参数设置设置多线程(接收消息通道)*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {//线程信息registration.taskExecutor(threadPool);}/*** 输出通道参数配置多线程(发送给给前端的推送通道)*/@Overridepublic void configureClientOutboundChannel(ChannelRegistration registration) {//线程信息registration.taskExecutor(threadPool);}}

发送工具类

*** stomp websocket协议手动发送通知到前端** @author: wangji* @date: 2018/04/16 10:00*/
@Component
public class StompWebsocketNotifier implements InitializingBean {private static final Logger log = LoggerFactory.getLogger(StompWebsocketNotifier.class);private static final String TOPIC_PREFIX = "/topic/";@Resourceprivate ApplicationContext applicationContext;private static SimpMessageSendingOperations simpMessageSendingOperations;@Overridepublic void afterPropertiesSet() throws Exception {simpMessageSendingOperations = applicationContext.getBean(SimpMessageSendingOperations.class);}/*** 推送消息通过 websocket到前端** @param topicName 队列的名称* @param message   发送消息的内容* @return*/public static boolean sendToWebSocketTopic(String topicName, String message) {boolean result = true;if (StringUtils.isNotEmpty(topicName) && StringUtils.isNotEmpty(message)) {try {simpMessageSendingOperations.convertAndSend(TOPIC_PREFIX + topicName, message);} catch (MessagingException e) {result = false;log.error(PrettyLogger.toMessage("push websocket error", "topicName", "message"), topicName, message, e);}}return result;}
}

配置线程池

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="shutdown"><!-- 核心线程数  --><property name="corePoolSize" value="10" /><!-- 最大线程数 --><property name="maxPoolSize" value="50" /><!-- 队列最大长度 >=mainExecutor.maxSize --><property name="queueCapacity" value="1000" /><!-- 线程池维护线程所允许的空闲时间 --><property name="keepAliveSeconds" value="300" /><!-- 线程池对拒绝任务(无线程可用)的处理策略 --><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/></property><property name="threadGroup" value="taskExecutor-Pool"/><property name="threadNamePrefix" value="taskExecutor-Pool"/><property name="waitForTasksToCompleteOnShutdown" value="true"/></bean>

三、 ActiveMQ与WebSocket的结合的使用

ActiveMQ结合Websocket的使用主要集中在前端,后端由于ActiveMQ历史沉淀了许多优秀的ActiveMQ使用的封装的类库,直接调用非常的方便。针对不同的topic队列可以根据前端页面的需求进行相应的订,如下是一个简单的使用Demo参考STOMP Over WebSocket

1. 简单使用

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title><script src="stomp.min.js"></script><script>var url = "ws://10.33.30.255:61614";var client = Stomp.client(url);// client will send heartbeats every 20000msclient.heartbeat.outgoing = 20000;client.heartbeat.incoming = 0;client.connect("", "", function (frame) {// upon connection, subscribe to the destinationvar sub = client.subscribe("/topic/testTopic", function (message) {console.log("receive: " + message.body);});});</script>
</head>
<body></body>
</html>

2. Vue组件封装

根据vue的特点,对于推送请求框架进行了简单的封装,各个页面可以简单的使用组件,需要下面的两个封装的文件
Message-push.vue

<template><!--@input: URI port  destination 详情见下方props--><!--@output:message-get 从服务器端得到的消息--><span></span>
</template>
<script>import Stomp from 'stompjs';export default {name: 'messagePush',props: {URI: {// 连接着服务端的WebSocket的代理的地址  类似于服务器的ip地址type: String,default: '10.11.165.16'},port: {type: String,default: '61614'},URL: String, // 带协议的连接地址,传入这个属性会覆盖上面的两个属性destination: {// 订阅消息的目的地,也就是消息的来源地址 类似于api中的地址;可以同时接收多个消息type: [String, Array],default: ''}},data () {return {client: null};},beforeDestroy() {this.destroy();},mounted() {this.init();},methods: {init() {if (window.WebSocket) {/** * 步骤创建STOMP客户端------连接服务端* */// STOMP javascript 客户端会使用ws://的URL与STOMP 服务端进行交互。let url = `ws://${this.URI}:${this.port}`;if (this.URL !== '') url = this.URL;// 为了创建一个STOMP客户端js对象,你需要使用Stomp.client(url),而这个URL连接着服务端的WebSocket的代理:this.client = Stomp.client(url);this.client.heartbeat.incoming = 0;this.client.heartbeat.outgoing = 1000 * 60;this.client.debug = null;// Stomp 客户端建立后,必须调用它的connect()方法去连接this.client.connect('', '', () => {// 为了在浏览器中接收消息,STOMP客户端必须先订阅一个目的地destination。// body 是字符串,请使用JSON.parse()去转换JSON对象let desList = [];if (typeof this.destination === 'string') {desList.push(this.destination);} else {desList = this.destination;}// 遍历每一个地址,订阅消息for (let des of desList) {((destination) => {this.client.subscribe(destination, (message) => {let _data = null;if (message.body) {// 解析接受到的数据_data = JSON.parse(message.body);} else {// 接受到空消息_data = '';}this.$emit('message-get', _data, destination);});})(des);}});} else {this.$message({message: `你的浏览器版本太低了,不支持Websocket,会导致你接收不到实时数据!'您可以更换一个支持websockets浏览器,如:IE11 或者 chrome 或firefox!`,type: 'warning'});}},destroy() {// 断开连接时,调用disconnect方法this.client.disconnect();this.client = null;}}};
</script>
<style lang='less' scoped></style>

messager.vue

<template><message-push :destination="destination" v-if="start" :URL="url" @message-get="onMessageGet"></message-push>
</template><script>import messagePush from './message-push.vue';import commonService from 'service/common.service';import {ErrorHandleMixin} from 'utils/mixin';export default {props: {destination: {type: [String, Array],default: ''}},mixins: [ErrorHandleMixin],components: {messagePush},data() {return {start: false,url: ''};},methods: {getWsInfo() {commonService.getWsInfo().then(res => {this.url = res.mqAddr;if (this.url) this.start = true;}).catch(this._handleError);},onMessageGet(data, topic) {this.$emit('message-get', data, topic);}},mounted() {this.getWsInfo();this.$emit('init');}};
</script>

3. 组件使用

只需要传入需要订阅的队列的名称和处理的地址即可,如下

<messager destination="/topic/XXXTopic" @message-get="msgPushHandler"></messager>

四、 总结

对于不同的推送方案的了解,在不断的咨询相关的同时和查找资料的阅读相应的官方文档下进行了了解,感谢开源,让我们能够利用优秀的方案不用重复的造轮子。

这篇关于ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1091580

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

每天认识几个maven依赖(ActiveMQ+activemq-jaxb+activesoap+activespace+adarwin)

八、ActiveMQ 1、是什么? ActiveMQ 是一个开源的消息中间件(Message Broker),由 Apache 软件基金会开发和维护。它实现了 Java 消息服务(Java Message Service, JMS)规范,并支持多种消息传递协议,包括 AMQP、MQTT 和 OpenWire 等。 2、有什么用? 可靠性:ActiveMQ 提供了消息持久性和事务支持,确保消