Soul网关源码分析-19期

2024-02-09 15:59
文章标签 分析 源码 网关 19 soul

本文主要是介绍Soul网关源码分析-19期,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 集群下数据同步探究
    • Websocket 表现
    • Websocket 增量更新实现
    • Http 长轮询表现
    • Http 长轮询更新实现
    • Zookeeper 表现
    • Nacos 表现
    • 总结


集群下数据同步探究


昨天配置集群时有个问题我一直惦记着, 集群间同步网关的数据会不会 相互覆盖 ?


在我看来, 后台集群间没有数据交互, 它们的桥梁仅仅是同一个数据库.


所以当后台为集群 A、B , 此时 A 做了信息变更, B 也做了信息变更, 两者的变更数据位置不同, 且后台如果使用缓存来更新数据并传导出去, 存在相互覆盖的可能 ?




Websocket 表现


首先我们来看看最常用的 websocket 模式.

要想验证这个问题, 我需要测试 A、B 集群变动不同数据时, 网关端接收到的信息情况. 开始第一步, 在网关监听处打印数据信息用于验证:

public final class SoulWebsocketClient extends WebSocketClient {@Overridepublic void onMessage(final String result) {log.info("websocket 路径: {}", uri.toString());log.info("传输数据: {}", result);handleResult(result);}
}

紧接着我们分别测试 A、B 改动不同地方的数据
在这里插入图片描述

看看这块网关端的表现, 首先是 A 的修改在网关端触发的日志打印:

2021-02-01 20:48:16.266  INFO 8463 --- [ctReadThread-31] o.d.s.p.s.d.w.c.SoulWebsocketClient      : websocket 路径: ws://localhost:9095/websocket
2021-02-01 20:48:16.267  INFO 8463 --- [ctReadThread-31] o.d.s.p.s.d.w.c.SoulWebsocketClient      : 传输数据: {"groupType":"RULE","eventType":"UPDATE","data":[{"id":"1355090604988162048","name":"/http/test/**","pluginName":"divide","selectorId":"1355090604493234176","matchMode":0,"sort":1,"enabled":false,"loged":true,"handle":"{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/http/test/**"}]}]}

可以看到 "/http/test/**"enabled 属性变为 false .


接着 B 的修改触发的日志打印:

2021-02-01 20:48:21.765  INFO 8463 --- [ctReadThread-36] o.d.s.p.s.d.w.c.SoulWebsocketClient      : websocket 路径: ws://localhost:9096/websocket
2021-02-01 20:48:21.766  INFO 8463 --- [ctReadThread-36] o.d.s.p.s.d.w.c.SoulWebsocketClient      : 传输数据: {"groupType":"RULE","eventType":"UPDATE","data":[{"id":"1355090605491478528","name":"/http/order/save","pluginName":"divide","selectorId":"1355090604493234176","matchMode":0,"sort":1,"enabled":false,"loged":true,"handle":"{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"random\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList":[{"paramType":"uri","operator":"\u003d","paramName":"/","paramValue":"/http/order/save"}]}]}

B 的改动仅传给了网关它所改动的数据, 这是 增量更新 , 如果仅仅增量更新, 就能有力证明 websocket 不会导致集群数据相互覆盖了.




Websocket 增量更新实现


再探究下后台如何通过 websocket 增量发送同步数据, 通过断点 admin 端的 DataChangedEventDispatcher 事件分发器, 我们追溯到后台的 Controller 层:

public class RuleController {@PutMapping("/{id}")public SoulAdminResult updateRule(@PathVariable("id") final String id, @RequestBody final RuleDTO ruleDTO) {Objects.requireNonNull(ruleDTO);ruleDTO.setId(id);Integer updateCount = ruleService.createOrUpdate(ruleDTO);return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);}
}

这是个根据 ID 进行特定数据修改的接口, 网页端的修改能精确到某一个数据, 增量同步的基础是接口 ID 隔离方式的更新.


之后的流程便是将特定更新数据传导到 webscoket 管理类 WebscoketController, 由它通知所持有的 session 会话进行增量数据更新.




Http 长轮询表现


我们再来测测长轮询的表现, 是否存在数据覆盖可能.


回顾之前我们对长轮询分析的文章( 后台与网关数据同步(Http长轮询篇 - 网关) ), 找到网关处关键接收信息的方法 HttpLongPollingDataChangedListener#doLongPolling


新增些日志信息便于观测:

public class HttpSyncDataService implements SyncDataService, AutoCloseable {private void doLongPolling(final String server) {// ...if (groupJson != null) {ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);if (ArrayUtils.isNotEmpty(changedGroups)) {log.info("Group config changed: {}", Arrays.toString(changedGroups));this.doFetchGroupConfig(server, changedGroups);}}}
}

根据之前的实验方式, 当后台 A 配置变动的日志信息:

2021-02-01 21:37:27.299  INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService    : http 路径: http://localhost:90952021-02-01 21:37:27.301  INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService    : 传输数据: [RULE]2021-02-01 21:37:27.301  INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService    : request configs: [http://localhost:9095/configs/fetch?groupKeys=RULE]2021-02-01 21:37:27.325  INFO 9080 --- [-long-polling-1] o.d.s.s.d.h.refresh.AbstractDataRefresh  : update RULE config: {... }

最后面的信息太多, 我将重点数据转换为 json 格式

{"md5": "ab4cbb5760006e4653f4025c7356ccff","lastModifyTime": 1612186647296,"data": [{"id": "1355090604988162048","name": "/http/test/**","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "match","paramName": "/","paramValue": "/http/test/**"}]}]
}

从信息可以看到 "/http/test/**"enabled 属性变为 false .

再看看 B 打印的日志内容

2021-02-01 21:37:37.423  INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService    : http 路径: http://localhost:9096
2021-02-01 21:37:37.424  INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService    : 传输数据: [RULE]
2021-02-01 21:37:37.424  INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService    : request configs: [http://localhost:9096/configs/fetch?groupKeys=RULE]
2021-02-01 21:37:37.467  INFO 9080 --- [-long-polling-2] o.d.s.s.d.h.refresh.AbstractDataRefresh  : update RULE config: {... }

同样将数据变动信息中的重点转换成 json 格式:

{"md5": "29173b55dff25770db3b23d634e88a29","lastModifyTime": 1612186657412,"data": [{"id": "1355090604988162048","name": "/http/test/**","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "match","paramName": "/","paramValue": "/http/test/**"}]},{"id": "1355090605491478528","name": "/http/order/save","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"random\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "=","paramName": "/","paramValue": "/http/order/save"}]}]
}

可以看到 "/http/test/**" 的状态 与 "/http/order/save" 的状态都是 false . 也就是说后台 B 的数据更新并没有导致 A 的更新在网关端被覆盖.


至此可以证明使用 Http 长轮询不会导致数据相互覆盖.




Http 长轮询更新实现


为什么 Http 长轮询的集群数据更新不会导致数据覆盖呢? 这还要从 Http 长轮询同步机制说起.

  • 长轮询方式中, 后台的数据变动仅会传递给网关监听方法少量数据, 这个数据就是 变动的元数据类型
  • 网关端接收到变动通知后, 请求后台的 /config/fetch?[数据类型] 接口, 主动拉取特定类型数据

那么现在问题就变成: 后台接收到请求后, 如何返回网关最新的数据?


仅仅集群下某个节点的缓存数据肯定不是最新的, 所以肯定是要拉取数据库中信息的. 我们找到后台这边/config/fetch? 对应类探究一二.


首先找到 ConfigController 类, 其中包括拉取数据的方法

public class ConfigController {@GetMapping("/fetch")public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {Map<String, ConfigData<?>> result = Maps.newHashMap();// 根据不同 groupKey 查找数据并返回for (String groupKey : groupKeys) {ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));result.put(groupKey, data);}return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);}
}
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {// 仅仅取了缓存数据?ConfigDataCache config = CACHE.get(groupKey.name());switch (groupKey) {// ...case RULE:List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {}.getType());return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);// ...}}
}

到这我有些傻眼了, 不是按我想的在同步时返回数据库中信息.


不过接着向上断点探索, 终于在 网页端触发后台数据更新 这块发现问题.


HTTP 轮询中的通知是沿用的 AbstractDataChangedListener#onRuleChanged 等方法, 而这些方法会重新刷新缓存.

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {if (CollectionUtils.isEmpty(changed)) {return;}// 刷新缓存this.updateRuleCache();this.afterRuleChanged(changed, eventType);}protected void updateRuleCache() {// 获取数据库中所有 rule 的信息并放入 CACHE 缓存中this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());}protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {String json = GsonUtils.getInstance().toJson(data);ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);}
}

梳理下后台 B 同步流程图:

时序1
后台A页面配置变动
数据库更新
省略...
时序2
后台B页面配置变动
查询数据库
更新后台B缓存
通知网关数据变动
时序3
网关接收数据变动
请求后台B
后台B返回缓存信息

整个流程步骤是按照 时序1 -> 时序2 -> 时序3 , 正是由于时序2 中后台B配置变动时会重新刷新相关变动元数据类型的所有数据, 保证缓存中数据是最新的. 在时序3中网关请求时才能返回最新的数据.




Zookeeper 表现


来测测 Zookeeper 下网关同步的表现, 是否存在数据覆盖可能.


由于网关端仅是与 Zookeeper 有数据交互, 所以引起覆盖可能性的地方, 仅可能是后台传输数据到 Zookeeper.


找到后台的事件分发器 DataChangedEventDispatcher, 这块 Zookeeper 的监听类为 ZookeeperDataChangedListener.


由于后台的修改是区分为事件类型进行通知的, 我们找到 Rule 事件对应的方法看看实现代码:

public class ZookeeperDataChangedListener implements DataChangedListener {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {// ...for (RuleData data : changed) {String ruleRealPath = ZkPathConstants.buildRulePath(data.getPluginName(), data.getSelectorId(), data.getId());if (eventType == DataEventTypeEnum.DELETE) {deleteZkPath(ruleRealPath);continue;}String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getPluginName());createZkNode(ruleParentPath);// 写入数据 datainsertZkNode(ruleRealPath, data);}}
}

我们将 divide 的 /http/order/save 路径关闭, 查看 data 中的值

RuleData(id=1355090605491478528, name=/http/order/save, pluginName=divide, selectorId=1355090604493234176, matchMode=0, sort=1, enabled=false, loged=true, handle={"requestVolumeThreshold":"0","errorThresholdPercentage":"0","maxConcurrentRequests":"0","sleepWindowInMilliseconds":"0","loadBalance":"random","timeout":3000,"retry":"0"}, conditionDataList=[ConditionData(paramType=uri, operator==, paramName=/, paramValue=/http/order/save)])

可以发现, 后台对于 Zookeeper 的更新是增量的, 所以多个后台组成的集群环境下, 更新数据不会在 Zookeeper 端产生覆盖现象.


网关端是面向 Zookeeper 同步数据, 并没有直接关联后台集群, 所以这种模式下网关也不会有覆盖现象.




Nacos 表现


和 Zookeeper 一样, 使用 Nacos 同步时网关也不会直接面对后台集群, 所以只需保证后台集群对 Nacos 的更新没有覆盖问题即可.


找到关键监听类 NacosDataChangedListener

public class NacosDataChangedListener implements DataChangedListener {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {// 从 Nacos 拉取最新数据并更新到缓存updateRuleMap(getConfig(NacosPathConstants.RULE_DATA_ID));switch (eventType) {// ...default:changed.forEach(rule -> {// MAP 集合中剔除变动的数据, 其余数据保留List<RuleData> ls = RULE_MAP.getOrDefault(rule.getSelectorId(), new ArrayList<>()).stream().filter(s -> !s.getId().equals(rule.getId())).collect(Collectors.toList());// 加入变动的数据, 构成一个最新缓存ls.add(rule);ls.sort(RULE_DATA_COMPARATOR);// 全量 RULE 缓存重置RULE_MAP.put(rule.getSelectorId(), ls);});break;}// 推送 RULE 类型的全量数据到 NacospublishConfig(NacosPathConstants.RULE_DATA_ID, RULE_MAP);}@SneakyThrowsprivate String getConfig(final String dataId) {// 从 Nacos 中获取数据String config = configService.getConfig(dataId, NacosPathConstants.GROUP, NacosPathConstants.DEFAULT_TIME_OUT);return StringUtils.hasLength(config) ? config : NacosPathConstants.EMPTY_CONFIG_DEFAULT_VALUE;}private void updateRuleMap(final String configInfo) {JsonObject jo = GsonUtils.getInstance().fromJson(configInfo, JsonObject.class);Set<String> set = new HashSet<>(RULE_MAP.keySet());for (Entry<String, JsonElement> e : jo.entrySet()) {set.remove(e.getKey());List<RuleData> ls = new ArrayList<>();e.getValue().getAsJsonArray().forEach(je -> ls.add(GsonUtils.getInstance().fromJson(je, RuleData.class)));// 将最新的数据放入缓存RULE_MAP.put(e.getKey(), ls);}RULE_MAP.keySet().removeAll(set);}
}

根据我们的分析, Nacos 虽然是将某一类型的数据 (比如 RULE) 从缓存中全部发布到 Nacos 中. 但在推送前, 会从 Nacos 中获取到最新数据, 这时就获取到其他集群节点变动的数据 , 以此保证不会出现覆盖其他节点数据的问题.


附上一个流程图说明 后台更新数据 的情况:

网页端更新
发送通知Nacos监听器
从Nacos拉取最新数据更新缓存
将变动增量写入缓存
缓存全部推送Nacos

Nacos 同步时, 后台通过推送前从 Nacos 获取数据, 达到节点间不覆盖的目的. 而网关端仅针对 Nacos 更新, 保证了此种方式下的数据正确性.




总结


集群下各种同步方式均可以保证 节点间数据变动同步给网关时不会相互影响, 造成数据覆盖, 它们的实现方式各不相同.

  • Websocket 模式下, 通过精准的 增量更新 , 保证集群间同步给网关不相同数据时, 不会带上它们的过时数据
  • Http 长轮询模式下, 每个节点在接收网页端变动信息时, 不仅变更自身相应数据的缓存, 也会 查询数据库, 更新相应类型缓存的所有数据. 以此保证通知网关时不会传出过时数据
  • Zookeeper 模式下, 后台对于 Zookeeper 的数据更新也是 增量更新 的, 这点和 Websocket 很像.
  • Nacos 模式下, 后台在相应类型缓存全量推送 Nacos 前, 会先 查询Nacos配置 并更新缓存, 依次保证不会给 Nacos 传入过时数据.

这篇关于Soul网关源码分析-19期的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/694665

相关文章

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专

详解Spring Boot接收参数的19种方式

《详解SpringBoot接收参数的19种方式》SpringBoot提供了多种注解来接收不同类型的参数,本文给大家介绍SpringBoot接收参数的19种方式,感兴趣的朋友跟随小编一起看看吧... 目录SpringBoot接受参数相关@PathVariable注解@RequestHeader注解@Reque

Spring中Bean有关NullPointerException异常的原因分析

《Spring中Bean有关NullPointerException异常的原因分析》在Spring中使用@Autowired注解注入的bean不能在静态上下文中访问,否则会导致NullPointerE... 目录Spring中Bean有关NullPointerException异常的原因问题描述解决方案总结

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

python-nmap实现python利用nmap进行扫描分析

《python-nmap实现python利用nmap进行扫描分析》Nmap是一个非常用的网络/端口扫描工具,如果想将nmap集成进你的工具里,可以使用python-nmap这个python库,它提供了... 目录前言python-nmap的基本使用PortScanner扫描PortScannerAsync异