【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

2024-05-01 09:04

本文主要是介绍【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

通过Spring websocket 用户校验和业务会话绑定我们学会了如何将业务会话绑定到spring websocket会话上。通过这一节,我们来分析一下会话和订阅的实现

用户会话的数据结构

SessionInfo 用户会话

用户会话定义如下:

private static final class SessionInfo {// subscriptionId -> Subscriptionprivate final Map<String, Subscription> subscriptionMap = new ConcurrentHashMap<>();public Collection<Subscription> getSubscriptions() {return this.subscriptionMap.values();}@Nullablepublic Subscription getSubscription(String subscriptionId) {return this.subscriptionMap.get(subscriptionId);}public void addSubscription(Subscription subscription) {this.subscriptionMap.putIfAbsent(subscription.getId(), subscription);}@Nullablepublic Subscription removeSubscription(String subscriptionId) {return this.subscriptionMap.remove(subscriptionId);}}
  • 用户会话中有subscriptionMap。这个表示一个会话中,可以有多个订阅,可以根据subscriptionId找到订阅。

SessionRegistry 用户会话注册

private static final class SessionRegistry {private final ConcurrentMap<String, SessionInfo> sessions = new ConcurrentHashMap<>();@Nullablepublic SessionInfo getSession(String sessionId) {return this.sessions.get(sessionId);}public void forEachSubscription(BiConsumer<String, Subscription> consumer) {this.sessions.forEach((sessionId, info) ->info.getSubscriptions().forEach(subscription -> consumer.accept(sessionId, subscription)));}public void addSubscription(String sessionId, Subscription subscription) {SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());info.addSubscription(subscription);}@Nullablepublic SessionInfo removeSubscriptions(String sessionId) {return this.sessions.remove(sessionId);}}
  • SessionRegistry 中sessions 表示多个会话。根据sessionId可以找到唯一一个会话SessionInfo

Subscription 用户订阅

	private static final class Subscription {private final String id;private final String destination;private final boolean isPattern;@Nullableprivate final Expression selector;public Subscription(String id, String destination, boolean isPattern, @Nullable Expression selector) {Assert.notNull(id, "Subscription id must not be null");Assert.notNull(destination, "Subscription destination must not be null");this.id = id;this.selector = selector;this.destination = destination;this.isPattern = isPattern;}public String getId() {return this.id;}public String getDestination() {return this.destination;}public boolean isPattern() {return this.isPattern;}@Nullablepublic Expression getSelector() {return this.selector;}@Overridepublic boolean equals(@Nullable Object other) {return (this == other ||(other instanceof Subscription && this.id.equals(((Subscription) other).id)));}@Overridepublic int hashCode() {return this.id.hashCode();}@Overridepublic String toString() {return "subscription(id=" + this.id + ")";}}

SimpUserRegistry 用户注册接口

用户注册的接口如下:

public interface SimpUserRegistry {/**根据用户名,获取到用户信息* Get the user for the given name.* @param userName the name of the user to look up* @return the user, or {@code null} if not connected*/@NullableSimpUser getUser(String userName);/**获取现在所有的注册的用户* Return a snapshot of all connected users.* <p>The returned set is a copy and will not reflect further changes.* @return the connected users, or an empty set if none*/Set<SimpUser> getUsers();/**获取在线用户数量* Return the count of all connected users.* @return the number of connected users* @since 4.3.5*/int getUserCount();/*** Find subscriptions with the given matcher.* @param matcher the matcher to use* @return a set of matching subscriptions, or an empty set if none*/Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher);}

SimpUser实际上就是代表着一个用户,我们来看其实现:LocalSimpUser的定义

	private static class LocalSimpUser implements SimpUser {private final String name;private final Principal user;private final Map<String, SimpSession> userSessions = new ConcurrentHashMap<>(1);public LocalSimpUser(String userName, Principal user) {Assert.notNull(userName, "User name must not be null");this.name = userName;this.user = user;}}

userSessions 表示当前一个用户可以对应多个会话。
这个Principal 是啥,还记得我们上一节通过Spring websocket 用户校验和业务会话绑定中,我们是怎么注册用户的吗

    private void connect(Message<?> message, StompHeaderAccessor accessor) {//1通过请求头获取到tokenString token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);//2如果token为空或者用户id没有解析出来,抛出异常,spring会将此websocket连接关闭if (StringUtils.isEmpty(token)) {throw new MessageDeliveryException("token missing!");}String userId = TokenUtil.parseToken(token);if (StringUtils.isEmpty(userId)) {throw new MessageDeliveryException("userId missing!");}//这个是每个会话都会有的一个sessionIdString simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);//3创建自己的业务会话session对象UserSession userSession = new UserSession();userSession.setSimpleSessionId(simpleSessionId);userSession.setUserId(userId);userSession.setCreateTime(LocalDateTime.now());//4关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息accessor.setUser(new UserSessionPrincipal(userSession));}

从token中解析出用户的userId,并通过下面的代码,把当前用户和会话绑定起来。一个用户实际上是可以绑定多个会话的。

 accessor.setUser(new UserSessionPrincipal(userSession));

总结一下用户和会话之间的关系,如下图
在这里插入图片描述

订阅过程的源码分析

前端订阅的代码如下

  stompClient.subscribe("/user/topic/answer", function (response) {createElement("answer", response.body);});

当后端收到订阅消息后,会由SimpleBrokerMessageHandler来处理

	@Overrideprotected void handleMessageInternal(Message<?> message) {MessageHeaders headers = message.getHeaders();String destination = SimpMessageHeaderAccessor.getDestination(headers);String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);updateSessionReadTime(sessionId);if (!checkDestinationPrefix(destination)) {return;}SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);if (SimpMessageType.MESSAGE.equals(messageType)) {logMessage(message);sendMessageToSubscribers(destination, message);}else if (SimpMessageType.CONNECT.equals(messageType)) {logMessage(message);if (sessionId != null) {if (this.sessions.get(sessionId) != null) {if (logger.isWarnEnabled()) {logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");}return;}long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);long[] heartbeatOut = getHeartbeatValue();Principal user = SimpMessageHeaderAccessor.getUser(headers);MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);initHeaders(connectAck);connectAck.setSessionId(sessionId);if (user != null) {connectAck.setUser(user);}connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());getClientOutboundChannel().send(messageOut);}}else if (SimpMessageType.DISCONNECT.equals(messageType)) {logMessage(message);if (sessionId != null) {Principal user = SimpMessageHeaderAccessor.getUser(headers);handleDisconnect(sessionId, user, message);}}else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {logMessage(message);this.subscriptionRegistry.registerSubscription(message);}else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {logMessage(message);this.subscriptionRegistry.unregisterSubscription(message);}}

当消息类型为SUBSCRIBE时,会调用subscriptionRegistry.registerSubscription(message)
接着来看下subscriptionRegistry.registerSubscription(message)

//AbstractSubscriptionRegistry@Overridepublic final void registerSubscription(Message<?> message) {MessageHeaders headers = message.getHeaders();SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);}String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);if (sessionId == null) {if (logger.isErrorEnabled()) {logger.error("No sessionId in  " + message);}return;}String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);if (subscriptionId == null) {if (logger.isErrorEnabled()) {logger.error("No subscriptionId in " + message);}return;}String destination = SimpMessageHeaderAccessor.getDestination(headers);if (destination == null) {if (logger.isErrorEnabled()) {logger.error("No destination in " + message);}return;}addSubscriptionInternal(sessionId, subscriptionId, destination, message);}

这个代码很简单,就是从消息中取出三个东西,sessionId, subscriptionId, destination,进行注册。

//DefaultSubscriptionRegistry@Overrideprotected void addSubscriptionInternal(String sessionId, String subscriptionId, String destination, Message<?> message) {boolean isPattern = this.pathMatcher.isPattern(destination);Expression expression = getSelectorExpression(message.getHeaders());Subscription subscription = new Subscription(subscriptionId, destination, isPattern, expression);this.sessionRegistry.addSubscription(sessionId, subscription);this.destinationCache.updateAfterNewSubscription(sessionId, subscription);}//其实就是添加到sessions map中。会话里把订阅添加到订阅map中public void addSubscription(String sessionId, Subscription subscription) {SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());info.addSubscription(subscription);}

其实就是添加到sessions map中。会话里把订阅添加到订阅map中

那用户和会话是如何关联起来的?
在这里插入图片描述

  • 当订阅事件发生时,取出当前的Principal( accessor.setUser(xxx)设置的),然后生成LocalSimpleUser,即用户
  • 把当前会话,添加到当前用户会话中。这样就给用户绑定好了会话了。

用户会话事件

通过Spring事件机制,管理注册用户信息和会话,包括订阅、取消订阅,会话断连。代码如下

//DefaultSimpUserRegistry@Overridepublic void onApplicationEvent(ApplicationEvent event) {AbstractSubProtocolEvent subProtocolEvent = (AbstractSubProtocolEvent) event;Message<?> message = subProtocolEvent.getMessage();MessageHeaders headers = message.getHeaders();String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);Assert.state(sessionId != null, "No session id");if (event instanceof SessionSubscribeEvent) {LocalSimpSession session = this.sessions.get(sessionId);if (session != null) {String id = SimpMessageHeaderAccessor.getSubscriptionId(headers);String destination = SimpMessageHeaderAccessor.getDestination(headers);if (id != null && destination != null) {session.addSubscription(id, destination);}}}else if (event instanceof SessionConnectedEvent) {Principal user = subProtocolEvent.getUser();if (user == null) {return;}String name = user.getName();if (user instanceof DestinationUserNameProvider) {name = ((DestinationUserNameProvider) user).getDestinationUserName();}synchronized (this.sessionLock) {LocalSimpUser simpUser = this.users.get(name);if (simpUser == null) {simpUser = new LocalSimpUser(name, user);this.users.put(name, simpUser);}LocalSimpSession session = new LocalSimpSession(sessionId, simpUser);simpUser.addSession(session);this.sessions.put(sessionId, session);}}else if (event instanceof SessionDisconnectEvent) {synchronized (this.sessionLock) {LocalSimpSession session = this.sessions.remove(sessionId);if (session != null) {LocalSimpUser user = session.getUser();user.removeSession(sessionId);if (!user.hasSessions()) {this.users.remove(user.getName());}}}}else if (event instanceof SessionUnsubscribeEvent) {LocalSimpSession session = this.sessions.get(sessionId);if (session != null) {String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);if (subscriptionId != null) {session.removeSubscription(subscriptionId);}}}}

优雅停机

当服务器停机时,最好给客户端发送断连消息,而不是让客户端过了一段时间发现连接断开。
Spring websocket是如何来实现优雅停机的?

public class SubProtocolWebSocketHandlerimplements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {@Overridepublic final void stop() {synchronized (this.lifecycleMonitor) {this.running = false;this.clientOutboundChannel.unsubscribe(this);}// Proactively notify all active WebSocket sessionsfor (WebSocketSessionHolder holder : this.sessions.values()) {try {holder.getSession().close(CloseStatus.GOING_AWAY);}catch (Throwable ex) {if (logger.isWarnEnabled()) {logger.warn("Failed to close '" + holder.getSession() + "': " + ex);}}}}@Overridepublic final void stop(Runnable callback) {synchronized (this.lifecycleMonitor) {stop();callback.run();}}
}

其奥秘就是其实现了SmartLifecycle。这个是Spring的生命周期接口。我们可以通过实现此接口,在相应的生命周期阶段注册回调事件!
上面的代码,通过调用stop接口,给客户端发送了一个断连的消息。即实现了关机时的主动通知断连。

这篇关于【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/951269

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置