Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)

2024-02-15 19:20

本文主要是介绍Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

源码版本:2.6.1

前言

上一篇Apache 神禹(shenyu)源码阅读(一)——Admin向Gateway的数据同步(Admin端)写了Admin 端在接收到程序员对 Divide 插件的选择器 Selector 作出新增操作时,Admin 端是如何将要同步的数据发布给 Gateway 端的。

本篇介绍 Gateway 端是如何接收 Admin 端发布的数据的。

本文介绍的数据同步(sync data)在 Shenyu 架构图中的位置

在这里插入图片描述

正文

1. Gateway 端通过网络接收 Admin 端要同步的数据

  • ShenyuWebsocketClient.onMessage()
    由 Admin 端的 WebsocketCollector.send() 通过网络发送数据后(上一篇的内容),Gateway 端的 ShenyuWebsocketClient.onMessage() 收到数据,onMessage() 是 Spring 框架抽象类 WebSocketClient 的一个方法,在 ShenyuWebsocketClient 中实现了这个方法。
public final class ShenyuWebsocketClient extends WebSocketClient {// ...@Overridepublic void onMessage(final String result) {handleResult(result);}private void handleResult(final String result) {// 1. 打印日志LOG.info("handleResult({})", result);// 2. 调用 Gson 包,将 Json 字符串转换为 WebsocketDataWebsocketData<?> websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);// 3. 因为我们是新增的 Selector,所以这里 groupEnum 为 ConfigGroupEnum.SELECTORConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());// 4. 事件是 UPDATEString eventType = websocketData.getEventType();// 5. 再转成 Json 字符串String json = GsonUtils.getInstance().toJson(websocketData.getData());// 6. 交给 WebsocketDataHandler 处理数据websocketDataHandler.executor(groupEnum, json, eventType);}
}
  • ShenyuWebsocketClient.handleResult()

    如上面那段代码,

    1. 打印日志
    2. 调用 Gson 包,将 Json 字符串转换为 WebsocketData
    3. 因为我们是新增的 Selector,所以这里 groupEnumConfigGroupEnum.SELECTOR
    4. 事件是 UPDATE
    5. 再转成 Json 字符串
    6. 交给 WebsocketDataHandler 处理数据
  • WebsocketDataHandler.executor()

    WebsocketDataHandler 的一个 EnumMap 类型的成员变量存储了 ConfigGroupEnum -> DataHandler 的映射。在 executor 方法里拿到 ConfigGroupEnum 对应的 DataHandler 去处理数据

public class WebsocketDataHandler {// ...private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {ENUM_MAP.get(type).handle(json, eventType);}
}

2 交由 SelectorDataHandler 处理数据

  • DataHandler.handle

    DataHandler 是个接口:

public interface DataHandler {/*** Handle.** @param json  the data for json* @param eventType the event type*/void handle(String json, String eventType);
}

其继承关系如下图:
在这里插入图片描述

  • AbstractDataHandler.handle()

    这里 handle() 用到了一个设计模式——模板方法,里面用到的方法都是交由子类根据自己的逻辑去实现
    事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理

public abstract class AbstractDataHandler<T> implements DataHandler {/*** Convert list.** @param json the json* @return the list*/protected abstract List<T> convert(String json);/*** Do refresh.** @param dataList the data list*/protected abstract void doRefresh(List<T> dataList);/*** Do update.** @param dataList the data list*/protected abstract void doUpdate(List<T> dataList);/*** Do delete.** @param dataList the data list*/protected abstract void doDelete(List<T> dataList);@Overridepublic void handle(final String json, final String eventType) {List<T> dataList = convert(json);if (CollectionUtils.isEmpty(dataList)) {return;}DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {case REFRESH:case MYSELF:doRefresh(dataList);break;case UPDATE:case CREATE:// 事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理doUpdate(dataList);break;case DELETE:doDelete(dataList);break;default:break;}}
}
  • SelectorDataHandler.doUpdate()

    由插件数据订阅者 pluginDataSubscriber 去完成 Selector 数据的订阅和处理

 public class SelectorDataHandler extends AbstractDataHandler<SelectorData> {// ...private final PluginDataSubscriber pluginDataSubscriber;@Overrideprotected void doUpdate(final List<SelectorData> dataList) {dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}
}
  • CommonPluginDataSubscriber.onSelectorSubscribe()

    CommonPluginDataSubscriber 是 PluginDataSubscriber 的唯一一个实现类:

public class CommonPluginDataSubscriber implements PluginDataSubscriber {// ...@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {LOG.info("subscribe select data for selector: [id: {}, pluginName: {}, name: {}]", selectorData.getId(), selectorData.getPluginName(), selectorData.getName());subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {if (dataType == DataEventTypeEnum.UPDATE) {Optional.ofNullable(classData)// 如果要更新的数据不为空,则更新缓存数据.ifPresent(data -> updateCacheData(classData));} else if (dataType == DataEventTypeEnum.DELETE) {Optional.ofNullable(classData).ifPresent(data -> removeCacheData(classData));}}private <T> void updateCacheData(@NonNull final T data) {if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;final PluginData oldPluginData = BaseDataCache.getInstance().obtainPluginData(pluginData.getName());BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));// update enabled pluginsPluginHandlerEventEnum state = Boolean.TRUE.equals(pluginData.getEnabled())? PluginHandlerEventEnum.ENABLED : PluginHandlerEventEnum.DISABLED;eventPublisher.publishEvent(new PluginHandlerEvent(state, pluginData));// sorted pluginsortPluginIfOrderChange(oldPluginData, pluginData);final String pluginName = pluginData.getName();// if update plugin, remove selector and rule match cache/trie cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(pluginName);}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(pluginName);}} else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;// BaseDataCache 缓存BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));// remove match cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptySelectorData(selectorData.getPluginName());}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleDataBySelector(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptyRuleData(selectorData.getPluginName());}updateSelectorTrieCache(selectorData);} else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(ruleData.getPluginName(), ruleData.getId());MatchDataCache.getInstance().removeEmptyRuleData(ruleData.getPluginName());}updateRuleTrieCache(ruleData);}}
}

3. BaseDataCache 根据数据更新缓存

网关的 SELECTOR_MAP 等缓存是由 ConcurrentMap 实现的。

  1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中
  2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据
  3. 然后将更新后的 selectorData 集合排序
  4. 更新 SELECTOR_MAP
public final class BaseDataCache {// ...private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();// 我觉得这个方法名可能是敲错了,应该是 cacheSelectorData 才对public void cacheSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);}private void selectorAccept(final SelectorData data) {String key = data.getPluginName();synchronized (SELECTOR_MAP) {if (SELECTOR_MAP.containsKey(key)) {// 存在 key,说明为更新操作List<SelectorData> existList = SELECTOR_MAP.get(key);// 1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());// 2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据resultList.add(data);// 3. 然后将更新后的 selectorData 集合排序final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());// 4. 更新 SELECTOR_MAPSELECTOR_MAP.put(key, collect);} else {// 不存在 key,说明为新增操作SELECTOR_MAP.put(key, Lists.newArrayList(data));}}}
}

一张图总结

在这里插入图片描述

这篇关于Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/712307

相关文章

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

Spring 请求之传递 JSON 数据的操作方法

《Spring请求之传递JSON数据的操作方法》JSON就是一种数据格式,有自己的格式和语法,使用文本表示一个对象或数组的信息,因此JSON本质是字符串,主要负责在不同的语言中数据传递和交换,这... 目录jsON 概念JSON 语法JSON 的语法JSON 的两种结构JSON 字符串和 Java 对象互转

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

SpringValidation数据校验之约束注解与分组校验方式

《SpringValidation数据校验之约束注解与分组校验方式》本文将深入探讨SpringValidation的核心功能,帮助开发者掌握约束注解的使用技巧和分组校验的高级应用,从而构建更加健壮和可... 目录引言一、Spring Validation基础架构1.1 jsR-380标准与Spring整合1

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2