Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)

2024-02-15 19:20

本文主要是介绍Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

源码版本:2.6.1

前言

上一篇Apache 神禹(shenyu)源码阅读(一)——Admin向Gateway的数据同步(Admin端)写了Admin 端在接收到程序员对 Divide 插件的选择器 Selector 作出新增操作时,Admin 端是如何将要同步的数据发布给 Gateway 端的。

本篇介绍 Gateway 端是如何接收 Admin 端发布的数据的。

本文介绍的数据同步(sync data)在 Shenyu 架构图中的位置

在这里插入图片描述

正文

1. Gateway 端通过网络接收 Admin 端要同步的数据

  • ShenyuWebsocketClient.onMessage()
    由 Admin 端的 WebsocketCollector.send() 通过网络发送数据后(上一篇的内容),Gateway 端的 ShenyuWebsocketClient.onMessage() 收到数据,onMessage() 是 Spring 框架抽象类 WebSocketClient 的一个方法,在 ShenyuWebsocketClient 中实现了这个方法。
public final class ShenyuWebsocketClient extends WebSocketClient {// ...@Overridepublic void onMessage(final String result) {handleResult(result);}private void handleResult(final String result) {// 1. 打印日志LOG.info("handleResult({})", result);// 2. 调用 Gson 包,将 Json 字符串转换为 WebsocketDataWebsocketData<?> websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);// 3. 因为我们是新增的 Selector,所以这里 groupEnum 为 ConfigGroupEnum.SELECTORConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());// 4. 事件是 UPDATEString eventType = websocketData.getEventType();// 5. 再转成 Json 字符串String json = GsonUtils.getInstance().toJson(websocketData.getData());// 6. 交给 WebsocketDataHandler 处理数据websocketDataHandler.executor(groupEnum, json, eventType);}
}
  • ShenyuWebsocketClient.handleResult()

    如上面那段代码,

    1. 打印日志
    2. 调用 Gson 包,将 Json 字符串转换为 WebsocketData
    3. 因为我们是新增的 Selector,所以这里 groupEnumConfigGroupEnum.SELECTOR
    4. 事件是 UPDATE
    5. 再转成 Json 字符串
    6. 交给 WebsocketDataHandler 处理数据
  • WebsocketDataHandler.executor()

    WebsocketDataHandler 的一个 EnumMap 类型的成员变量存储了 ConfigGroupEnum -> DataHandler 的映射。在 executor 方法里拿到 ConfigGroupEnum 对应的 DataHandler 去处理数据

public class WebsocketDataHandler {// ...private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {ENUM_MAP.get(type).handle(json, eventType);}
}

2 交由 SelectorDataHandler 处理数据

  • DataHandler.handle

    DataHandler 是个接口:

public interface DataHandler {/*** Handle.** @param json  the data for json* @param eventType the event type*/void handle(String json, String eventType);
}

其继承关系如下图:
在这里插入图片描述

  • AbstractDataHandler.handle()

    这里 handle() 用到了一个设计模式——模板方法,里面用到的方法都是交由子类根据自己的逻辑去实现
    事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理

public abstract class AbstractDataHandler<T> implements DataHandler {/*** Convert list.** @param json the json* @return the list*/protected abstract List<T> convert(String json);/*** Do refresh.** @param dataList the data list*/protected abstract void doRefresh(List<T> dataList);/*** Do update.** @param dataList the data list*/protected abstract void doUpdate(List<T> dataList);/*** Do delete.** @param dataList the data list*/protected abstract void doDelete(List<T> dataList);@Overridepublic void handle(final String json, final String eventType) {List<T> dataList = convert(json);if (CollectionUtils.isEmpty(dataList)) {return;}DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {case REFRESH:case MYSELF:doRefresh(dataList);break;case UPDATE:case CREATE:// 事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理doUpdate(dataList);break;case DELETE:doDelete(dataList);break;default:break;}}
}
  • SelectorDataHandler.doUpdate()

    由插件数据订阅者 pluginDataSubscriber 去完成 Selector 数据的订阅和处理

 public class SelectorDataHandler extends AbstractDataHandler<SelectorData> {// ...private final PluginDataSubscriber pluginDataSubscriber;@Overrideprotected void doUpdate(final List<SelectorData> dataList) {dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}
}
  • CommonPluginDataSubscriber.onSelectorSubscribe()

    CommonPluginDataSubscriber 是 PluginDataSubscriber 的唯一一个实现类:

public class CommonPluginDataSubscriber implements PluginDataSubscriber {// ...@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {LOG.info("subscribe select data for selector: [id: {}, pluginName: {}, name: {}]", selectorData.getId(), selectorData.getPluginName(), selectorData.getName());subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {if (dataType == DataEventTypeEnum.UPDATE) {Optional.ofNullable(classData)// 如果要更新的数据不为空,则更新缓存数据.ifPresent(data -> updateCacheData(classData));} else if (dataType == DataEventTypeEnum.DELETE) {Optional.ofNullable(classData).ifPresent(data -> removeCacheData(classData));}}private <T> void updateCacheData(@NonNull final T data) {if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;final PluginData oldPluginData = BaseDataCache.getInstance().obtainPluginData(pluginData.getName());BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));// update enabled pluginsPluginHandlerEventEnum state = Boolean.TRUE.equals(pluginData.getEnabled())? PluginHandlerEventEnum.ENABLED : PluginHandlerEventEnum.DISABLED;eventPublisher.publishEvent(new PluginHandlerEvent(state, pluginData));// sorted pluginsortPluginIfOrderChange(oldPluginData, pluginData);final String pluginName = pluginData.getName();// if update plugin, remove selector and rule match cache/trie cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(pluginName);}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(pluginName);}} else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;// BaseDataCache 缓存BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));// remove match cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptySelectorData(selectorData.getPluginName());}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleDataBySelector(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptyRuleData(selectorData.getPluginName());}updateSelectorTrieCache(selectorData);} else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(ruleData.getPluginName(), ruleData.getId());MatchDataCache.getInstance().removeEmptyRuleData(ruleData.getPluginName());}updateRuleTrieCache(ruleData);}}
}

3. BaseDataCache 根据数据更新缓存

网关的 SELECTOR_MAP 等缓存是由 ConcurrentMap 实现的。

  1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中
  2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据
  3. 然后将更新后的 selectorData 集合排序
  4. 更新 SELECTOR_MAP
public final class BaseDataCache {// ...private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();// 我觉得这个方法名可能是敲错了,应该是 cacheSelectorData 才对public void cacheSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);}private void selectorAccept(final SelectorData data) {String key = data.getPluginName();synchronized (SELECTOR_MAP) {if (SELECTOR_MAP.containsKey(key)) {// 存在 key,说明为更新操作List<SelectorData> existList = SELECTOR_MAP.get(key);// 1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());// 2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据resultList.add(data);// 3. 然后将更新后的 selectorData 集合排序final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());// 4. 更新 SELECTOR_MAPSELECTOR_MAP.put(key, collect);} else {// 不存在 key,说明为新增操作SELECTOR_MAP.put(key, Lists.newArrayList(data));}}}
}

一张图总结

在这里插入图片描述

这篇关于Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/712307

相关文章

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

Python 中的异步与同步深度解析(实践记录)

《Python中的异步与同步深度解析(实践记录)》在Python编程世界里,异步和同步的概念是理解程序执行流程和性能优化的关键,这篇文章将带你深入了解它们的差异,以及阻塞和非阻塞的特性,同时通过实际... 目录python中的异步与同步:深度解析与实践异步与同步的定义异步同步阻塞与非阻塞的概念阻塞非阻塞同步

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到... python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个

SpringBoot整合jasypt实现重要数据加密

《SpringBoot整合jasypt实现重要数据加密》Jasypt是一个专注于简化Java加密操作的开源工具,:本文主要介绍详细介绍了如何使用jasypt实现重要数据加密,感兴趣的小伙伴可... 目录jasypt简介 jasypt的优点SpringBoot使用jasypt创建mapper接口配置文件加密