soul从入门到进阶05——soul-bootstrap数据同步流程

2024-01-22 12:18

本文主要是介绍soul从入门到进阶05——soul-bootstrap数据同步流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

我们在 soul-admin的数据同步流程中分析了admin的数据同步流程,这篇我们来看看soul-bootstrap的数据同步流程

启动 soul-bootstrap

打印如下日志,我们同样从日志着手来分析。

  • 日志中打印了 you use websocket sync soul data
  • 我们根据日志,找到WebsocketSyncDataConfiguration这个类
2021-01-19 21:19:48.461  INFO 20364 --- [           main] b.s.s.d.w.WebsocketSyncDataConfiguration : you use websocket sync soul data.......
2021-01-19 21:19:48.526  INFO 20364 --- [           main] o.d.s.p.s.d.w.WebsocketSyncDataService   : websocket connection is successful.....
2021-01-19 21:19:48.617  INFO 20364 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2021-01-19 21:19:49.202  INFO 20364 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 9195
2021-01-19 21:19:49.205  INFO 20364 --- [           main] o.d.s.b.SoulBootstrapApplication         : Started SoulBootstrapApplication in 13.767 seconds (JVM running for 14.489)
分析WebsocketSyncDataConfiguration
  • 这个类的代码非常简单,从名字上看出来是一个配置类。
  • 下面这一行表示 前缀(prefix是soul.sync.websocket的属性),配置的名字是urls。才会来加载这个类。对应配置文件中的属性
soul :sync:websocket :urls: ws://localhost:9095/websocket
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
  • websocketSyncDataService方法里,会加载一个WebsocketSyncDataService类。
分析WebsocketSyncDataService
  • 初步观察 WebsocketSyncDataService这个类。
  • 发现构造方法里的意图是根据url的数量,创建了对应数量的websocket 客户端,以及对应数量的线程池。线程池定时检测websocket客户端的连接状态。
  • 同时WebsocketSyncDataService实现了AutoCloseable接口,来进行资源的管理
分析SoulWebsocketClient
  • 点击WebSocketClient抽象类,然后查看抽象类的实现类。
  • 发现SoulWebsocketClient实现了WebSocketClient
  • onMessage是收到websocket消息之后调用的方法。
   @Overridepublic void onMessage(final String result) {handleResult(result);}
  • handlerResult方法是真正处理收到的信息的方法
 private void handleResult(final String result) {WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());String eventType = websocketData.getEventType();String json = GsonUtils.getInstance().toJson(websocketData.getData());websocketDataHandler.executor(groupEnum, json, eventType);}
  • ConfigGroupEnum里面有五组config 分别是
    • APP_AUTH
    • PLUGIN 插件
    • SELECTOR 选择器
    • RULE 规则
    • META_DATA 元数据
  • 最后一行,websocketDataHandler.executor(groupEnum, json, eventType) ,是核心逻辑
  • WebsocketDataHandler类里面的excutor方法如下
 public void executor(final ConfigGroupEnum type, final String json, final String eventType) {ENUM_MAP.get(type).handle(json, eventType);}
  • 这里的handle方法,是AbstractDataHandlerhandle方法
分析AbstractDataHandler的handle方法
  • 事件类型(DataEventTypeEnum)有五种,分别是增删改 、刷新和Myself。REFRESH和Myself都是走刷新的逻辑
    • DELETE
    • CREATE
    • UPDATE
    • REFRESH
    • MYSELF
  • handler方法分别调用doRefreshdoUpdatedoDelete方法
  • doRefresh方法为例,PluginDataHandlerdoRefresh最终调用了
    PluginDataSubscriberonSubscribe方法。
  • PluginDataSubscriber的实现类是CommonPluginDataSubscriber
以CommonPluginDataSubscriber为例,
  • onSubscribe方法调用subscribeDataHandler
  • 最终操作的是BaseDataCache
    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {Optional.ofNullable(classData).ifPresent(data -> {if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));}} else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removeSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));}} else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removeRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));}}});}
分析BaseDataCache
  • BaseDataCache里面有三个ConcurrentMap分别存储了PLUGIN、SELECTOR、RULE的信息
  • 也就是说对插件、选择器、规则的操作,最终反映到内存中是对这个BaseDataCache中三个ConcurrentMap的操作

这篇关于soul从入门到进阶05——soul-bootstrap数据同步流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/632936

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd