记一次Spring boot使用stomp协议栈时从服务端发起关闭

本文主要是介绍记一次Spring boot使用stomp协议栈时从服务端发起关闭,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

这篇文章是在开发过程中发生的问题,会主要根据本人在本次解决问题的角度进行分析。面向的是一个即时通信项目,与客户端使用websocket做连接接口,使用spring boot的stomp协议栈进行通信。即如下代码形式:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/im/ws").setAllowedOrigins("*").withSockJS();}...
}

在分析过程中发现stomp是一个闭环,很多东西都是私有的,可能人家并不想让使用者从服务端发起关闭吧。但是如果客户端进行了订阅,虽然可以通过拦截器的方法拒绝他的消息,但却无法做到不向该通道进行消息推送,所以需要从服务端主动断掉推送思路有二,一种是通过是通过推送关闭消息让客服端发起关闭,第二种webSocketSession直接直接close进行关闭。

解决思路

我们配置项目支持stomp是通过@EnableWebSocketMessageBroker标签进行配置的,追踪过去发现它引入了DelegatingWebSocketMessageBrokerConfiguration.class。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketMessageBrokerConfiguration.class)
public @interface EnableWebSocketMessageBroker {}

继续追踪我们发现配置类WebSocketMessageBrokerConfigurationSupport.class中有个subProtocolWebSocketHandler被声明。

public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration {...@Beanpublic WebSocketHandler subProtocolWebSocketHandler() {return new SubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());}...
}

继续分析里面有个函数 handleMessage 是向客户端推送消息的,并且是通过sessionId进行精确推送的。

public class SubProtocolWebSocketHandlerimplements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {..../*** Handle an outbound Spring Message to a WebSocket client.*/@Overridepublic void handleMessage(Message<?> message) throws MessagingException {...}...
}

所以我们可以通过@Resource(name = “subProtocolWebSocketHandler”)进行注入获取SubProtocolWebSocketHandler进行操作。因StompCommand中供服务端使用只有CONNECTED(客户端发起连接时回复连接成功)、RECEIPT(接收消息回复接收成功)、MESSAGE(消息推送)、ERROR(错误)可以使用,并且可以从客户端主动发起的只有MESSAGE和ERROR可以使用,通过ERROR可以知道,推送至客户端后,stomp协议就会被动发起关闭流程。以下是一种演示,当然也可以通过MESSAGE的方式自己写逻辑进行主动关闭。

@Controller
public class WSController {...@Resource(name = "subProtocolWebSocketHandler")private SubProtocolWebSocketHandler subProtocolWebSocketHandler;// 这里只是一个demo程序,我是通过http请求的方式控制关闭具体流程和安全性就要自己掌握了@RequestMapping("/closeAllSession")@ResponseBodypublic String closeAllSession() {// 此处只做演示,具体的用户信息和sessionId的对应可以通过configureClientInboundChannel进行映射String[] array = WebSocketConfig.sessionSet.toArray(new String[0]);WebSocketConfig.sessionSet.clear();StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);headerAccessor.setSessionId(array[0]);Message<byte[]> createMessage = MessageBuilder.createMessage(new byte[0], headerAccessor.getMessageHeaders());subProtocolWebSocketHandler.handleMessage(createMessage);return "success";}...
}

至此一个简单的从服务端发起关闭stomp接口也就完成了。

但是。。。

流程上可以看出,发起关闭的是stomp协议分析后发起DISCONNECT消息进行关闭,也就是说如果这个流程被恶意劫持,那么这个连接还是不能被关闭的,安全性上还是欠缺一些,所以我们的第二种方法通过webSocketSession进行关闭。查看了其他相关的方法,比较合适做session处理的还是在 SubProtocolWebSocketHandler 中;我们继续分析发现sessions是private修饰的,并且没有对外的获取map或根据sessionId获取session的方法,所以这里只能pass掉;继续分析在afterConnectionEstablished函数中调用了decorateSession方法中对session进行自定义封装,并且decorateSession是protected修饰的。

public class SubProtocolWebSocketHandlerimplements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {...@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocketHandlerDecorator could close the sessionif (!session.isOpen()) {return;}this.stats.incrementSessionCount(session);session = decorateSession(session);this.sessions.put(session.getId(), new WebSocketSessionHolder(session));findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel);}....protected WebSocketSession decorateSession(WebSocketSession session) {return new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit());}...
}

所以我们可以通过继承 SubProtocolWebSocketHandler 重写decorateSession的方式拿到webSocketSession并自己维护。

我们要用自定义的 CustomSubProtocolWebSocketHandler 则需要重写 DelegatingWebSocketMessageBrokerConfiguration 配置类。又因为我们自己实现了 DelegatingWebSocketMessageBrokerConfiguration 所以前面的@EnableWebSocketMessageBroker也就要去掉了。

@Configuration
public class CustomWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration {@Beanpublic CustomSubProtocolWebSocketHandler customSubProtocolWebSocketHandler() {return new CustomSubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());}@Overridepublic WebSocketHandler subProtocolWebSocketHandler() {return customSubProtocolWebSocketHandler();}}

websocket通道处理器 继承 SubProtocolWebSocketHandler 的实现如下:

/*** 自定义的websocket通道处理器*/
public class CustomSubProtocolWebSocketHandler extends SubProtocolWebSocketHandler {protected Map<String, Set<String>> userSessionIdMap = new HashMap<>();protected Map<String, CustomWebSocketSessionDecorator> sessionMap = new HashMap<>();private final Lock mapLock = new ReentrantLock();@Autowiredprivate WsuserServer wsuserServer;public CustomSubProtocolWebSocketHandler(MessageChannel clientInboundChannel,SubscribableChannel clientOutboundChannel) {super(clientInboundChannel, clientOutboundChannel);}@Overrideprotected WebSocketSession decorateSession(WebSocketSession session) {CustomWebSocketSessionDecorator decorator = new CustomWebSocketSessionDecorator(session, getSendTimeLimit(),getSendBufferSizeLimit(), this);sessionMap.put(session.getId(), decorator);return decorator;}/*** 设置用户信息* * @param sessionId* @param userInfo*/public void setUserInfo(String sessionId, WebsocketUserInfo userInfo) {String userUnoinAddress = WebsocketFormat.userUnoinAddress(userInfo);mapLock.lock();try {CustomWebSocketSessionDecorator decorator = sessionMap.get(sessionId);Assert.notNull(decorator, String.format("sessionId [%s] 的decorator为空", sessionId));decorator.setPrincipal(userInfo);Set<String> set = userSessionIdMap.get(userUnoinAddress);if (set == null) {set = new HashSet<>();userSessionIdMap.put(userUnoinAddress, set);}set.add(sessionId);} finally {mapLock.unlock();}}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {super.afterConnectionClosed(session, closeStatus);this.sessionClear(session, closeStatus);}/*** 清理session* * @param session* @param closeStatus*/public void sessionClear(WebSocketSession session, CloseStatus closeStatus) {// 关闭websocket sessionmapLock.lock();try {CustomWebSocketSessionDecorator remove = sessionMap.remove(session.getId());if (remove != null) {WebsocketUserInfo userInfo = (WebsocketUserInfo) remove.getPrincipal();if (userInfo != null) {String userUnoinAddress = WebsocketFormat.userUnoinAddress(userInfo);Set<String> set = userSessionIdMap.get(userUnoinAddress);if (set != null) {set.remove(session.getId());if (set.isEmpty()) {userSessionIdMap.remove(userUnoinAddress);}}}}} finally {mapLock.unlock();}}/*** 关闭指定用户* * @param userInfo*/public void closeByUserInfo(WebsocketUserInfo userInfo) {List<WebSocketSession> sessionList = new LinkedList<>();String userUnoinAddress = WebsocketFormat.userUnoinAddress(userInfo);Assert.isTrue(LockActuator.trylock(userUnoinAddress), String.format("锁定[%s]失败", userUnoinAddress));try {WebsocketUserInfoWrapper wrapper = WebsocketUserInfoWrapper.of(userInfo);WebsocketUserInfo cacheUserInfo = wsuserServer.getUserInfo(wrapper);mapLock.lock();try {Set<String> set = userSessionIdMap.get(userUnoinAddress);if (set != null) {Iterator<String> iterator = set.iterator();while (iterator.hasNext()) {String next = iterator.next();CustomWebSocketSessionDecorator customWebSocketSessionDecorator = sessionMap.get(next);if (customWebSocketSessionDecorator == null) {// 此处是将那些无关联的进行清理,防止内存泄漏,从目前逻辑上来看应该不会发生,做个保险iterator.remove();continue;}WebsocketUserInfo tempUserInfo = (WebsocketUserInfo) customWebSocketSessionDecorator.getPrincipal();if (tempUserInfo == null || !tempUserInfo.sameUser(cacheUserInfo)) {// 此处只判断哪些需要关闭,不清理sessionMap和userSessionIdMap,交给close逻辑处理sessionList.add(customWebSocketSessionDecorator);continue;}}if (set.isEmpty()) {userSessionIdMap.remove(userUnoinAddress);}}} finally {mapLock.unlock();}} finally {LockActuator.releaseLock();}for (WebSocketSession session : sessionList) {try {session.close();} catch (Exception e) {// do nothing}}}}

CustomWebSocketSessionDecorator的实现如下:

/*** 自定义的websocket控制器*/
public class CustomWebSocketSessionDecorator extends ConcurrentWebSocketSessionDecorator {protected CustomSubProtocolWebSocketHandler webSocketHandler;protected Principal user;public CustomWebSocketSessionDecorator(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit,CustomSubProtocolWebSocketHandler webSocketHandler) {super(delegate, sendTimeLimit, bufferSizeLimit);this.webSocketHandler = webSocketHandler;}public CustomWebSocketSessionDecorator(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit,OverflowStrategy overflowStrategy, CustomSubProtocolWebSocketHandler webSocketHandler) {super(delegate, sendTimeLimit, bufferSizeLimit, overflowStrategy);this.webSocketHandler = webSocketHandler;}public void setPrincipal(Principal user) {this.user = user;}@Overridepublic Principal getPrincipal() {return this.user;}@Overridepublic void close() throws IOException {this.close(CloseStatus.GOING_AWAY);}@Overridepublic void close(CloseStatus status) throws IOException {super.close(status);webSocketHandler.sessionClear(this, status);}}

上面CustomWebSocketSessionDecorator的实现中有个setPrincipal,需要注意的是stomp中没有直接将 用户信息 设置到webSocketSession中,而是维护在了 StompSubProtocolHandler.class 中,所以如果我们想让用户信息注入到session中还要主动执行一下保存。

结语

以上是使用SpringBoot封装的stomp消息的情况下从服务端发起关闭连接的两种思路。通过分析来看,stomp更想自己维护session,形成一个闭环,毕竟stomp是一个协议栈,并不只是应用在webSocket上,所以这里的方案也只是适用webSocket的通信方案上。

写了一个相关的demo,可以更好的展示出第二种解决方案:
https://github.com/Pluto-Whong/stomp-demo.git

这篇关于记一次Spring boot使用stomp协议栈时从服务端发起关闭的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/904003

相关文章

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java进程异常故障定位及排查过程

《Java进程异常故障定位及排查过程》:本文主要介绍Java进程异常故障定位及排查过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、故障发现与初步判断1. 监控系统告警2. 日志初步分析二、核心排查工具与步骤1. 进程状态检查2. CPU 飙升问题3. 内存

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.

java中新生代和老生代的关系说明

《java中新生代和老生代的关系说明》:本文主要介绍java中新生代和老生代的关系说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、内存区域划分新生代老年代二、对象生命周期与晋升流程三、新生代与老年代的协作机制1. 跨代引用处理2. 动态年龄判定3. 空间分

Java设计模式---迭代器模式(Iterator)解读

《Java设计模式---迭代器模式(Iterator)解读》:本文主要介绍Java设计模式---迭代器模式(Iterator),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录1、迭代器(Iterator)1.1、结构1.2、常用方法1.3、本质1、解耦集合与遍历逻辑2、统一

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空