本文主要是介绍Soul网关源码分析-19期,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 集群下数据同步探究
- Websocket 表现
- Websocket 增量更新实现
- Http 长轮询表现
- Http 长轮询更新实现
- Zookeeper 表现
- Nacos 表现
- 总结
集群下数据同步探究
昨天配置集群时有个问题我一直惦记着, 集群间同步网关的数据会不会 相互覆盖 ?
在我看来, 后台集群间没有数据交互, 它们的桥梁仅仅是同一个数据库.
所以当后台为集群 A、B , 此时 A 做了信息变更, B 也做了信息变更, 两者的变更数据位置不同, 且后台如果使用缓存来更新数据并传导出去, 存在相互覆盖的可能 ?
Websocket 表现
首先我们来看看最常用的 websocket 模式.
要想验证这个问题, 我需要测试 A、B 集群变动不同数据时, 网关端接收到的信息情况. 开始第一步, 在网关监听处打印数据信息用于验证:
public final class SoulWebsocketClient extends WebSocketClient {@Overridepublic void onMessage(final String result) {log.info("websocket 路径: {}", uri.toString());log.info("传输数据: {}", result);handleResult(result);}
}
紧接着我们分别测试 A、B 改动不同地方的数据
看看这块网关端的表现, 首先是 A 的修改在网关端触发的日志打印:
2021-02-01 20:48:16.266 INFO 8463 --- [ctReadThread-31] o.d.s.p.s.d.w.c.SoulWebsocketClient : websocket 路径: ws://localhost:9095/websocket
2021-02-01 20:48:16.267 INFO 8463 --- [ctReadThread-31] o.d.s.p.s.d.w.c.SoulWebsocketClient : 传输数据: {"groupType":"RULE","eventType":"UPDATE","data":[{"id":"1355090604988162048","name":"/http/test/**","pluginName":"divide","selectorId":"1355090604493234176","matchMode":0,"sort":1,"enabled":false,"loged":true,"handle":"{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/http/test/**"}]}]}
可以看到 "/http/test/**"
的 enabled
属性变为 false
.
接着 B 的修改触发的日志打印:
2021-02-01 20:48:21.765 INFO 8463 --- [ctReadThread-36] o.d.s.p.s.d.w.c.SoulWebsocketClient : websocket 路径: ws://localhost:9096/websocket
2021-02-01 20:48:21.766 INFO 8463 --- [ctReadThread-36] o.d.s.p.s.d.w.c.SoulWebsocketClient : 传输数据: {"groupType":"RULE","eventType":"UPDATE","data":[{"id":"1355090605491478528","name":"/http/order/save","pluginName":"divide","selectorId":"1355090604493234176","matchMode":0,"sort":1,"enabled":false,"loged":true,"handle":"{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"random\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList":[{"paramType":"uri","operator":"\u003d","paramName":"/","paramValue":"/http/order/save"}]}]}
B 的改动仅传给了网关它所改动的数据, 这是 增量更新 , 如果仅仅增量更新, 就能有力证明 websocket 不会导致集群数据相互覆盖了.
Websocket 增量更新实现
再探究下后台如何通过 websocket 增量发送同步数据, 通过断点 admin 端的 DataChangedEventDispatcher 事件分发器, 我们追溯到后台的 Controller 层:
public class RuleController {@PutMapping("/{id}")public SoulAdminResult updateRule(@PathVariable("id") final String id, @RequestBody final RuleDTO ruleDTO) {Objects.requireNonNull(ruleDTO);ruleDTO.setId(id);Integer updateCount = ruleService.createOrUpdate(ruleDTO);return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);}
}
这是个根据 ID 进行特定数据修改的接口, 网页端的修改能精确到某一个数据, 增量同步的基础是接口 ID 隔离方式的更新.
之后的流程便是将特定更新数据传导到 webscoket 管理类 WebscoketController, 由它通知所持有的 session 会话进行增量数据更新.
Http 长轮询表现
我们再来测测长轮询的表现, 是否存在数据覆盖可能.
回顾之前我们对长轮询分析的文章( 后台与网关数据同步(Http长轮询篇 - 网关) ), 找到网关处关键接收信息的方法 HttpLongPollingDataChangedListener#doLongPolling
新增些日志信息便于观测:
public class HttpSyncDataService implements SyncDataService, AutoCloseable {private void doLongPolling(final String server) {// ...if (groupJson != null) {ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);if (ArrayUtils.isNotEmpty(changedGroups)) {log.info("Group config changed: {}", Arrays.toString(changedGroups));this.doFetchGroupConfig(server, changedGroups);}}}
}
根据之前的实验方式, 当后台 A 配置变动的日志信息:
2021-02-01 21:37:27.299 INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService : http 路径: http://localhost:90952021-02-01 21:37:27.301 INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService : 传输数据: [RULE]2021-02-01 21:37:27.301 INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService : request configs: [http://localhost:9095/configs/fetch?groupKeys=RULE]2021-02-01 21:37:27.325 INFO 9080 --- [-long-polling-1] o.d.s.s.d.h.refresh.AbstractDataRefresh : update RULE config: {... }
最后面的信息太多, 我将重点数据转换为 json 格式
{"md5": "ab4cbb5760006e4653f4025c7356ccff","lastModifyTime": 1612186647296,"data": [{"id": "1355090604988162048","name": "/http/test/**","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "match","paramName": "/","paramValue": "/http/test/**"}]}]
}
从信息可以看到 "/http/test/**"
的 enabled
属性变为 false
.
再看看 B 打印的日志内容
2021-02-01 21:37:37.423 INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService : http 路径: http://localhost:9096
2021-02-01 21:37:37.424 INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService : 传输数据: [RULE]
2021-02-01 21:37:37.424 INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService : request configs: [http://localhost:9096/configs/fetch?groupKeys=RULE]
2021-02-01 21:37:37.467 INFO 9080 --- [-long-polling-2] o.d.s.s.d.h.refresh.AbstractDataRefresh : update RULE config: {... }
同样将数据变动信息中的重点转换成 json 格式:
{"md5": "29173b55dff25770db3b23d634e88a29","lastModifyTime": 1612186657412,"data": [{"id": "1355090604988162048","name": "/http/test/**","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "match","paramName": "/","paramValue": "/http/test/**"}]},{"id": "1355090605491478528","name": "/http/order/save","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"random\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "=","paramName": "/","paramValue": "/http/order/save"}]}]
}
可以看到 "/http/test/**"
的状态 与 "/http/order/save"
的状态都是 false
. 也就是说后台 B 的数据更新并没有导致 A 的更新在网关端被覆盖.
至此可以证明使用 Http 长轮询不会导致数据相互覆盖.
Http 长轮询更新实现
为什么 Http 长轮询的集群数据更新不会导致数据覆盖呢? 这还要从 Http 长轮询同步机制说起.
- 长轮询方式中, 后台的数据变动仅会传递给网关监听方法少量数据, 这个数据就是 变动的元数据类型
- 网关端接收到变动通知后, 请求后台的
/config/fetch?[数据类型]
接口, 主动拉取特定类型数据
那么现在问题就变成: 后台接收到请求后, 如何返回网关最新的数据?
仅仅集群下某个节点的缓存数据肯定不是最新的, 所以肯定是要拉取数据库中信息的. 我们找到后台这边/config/fetch?
对应类探究一二.
首先找到 ConfigController 类, 其中包括拉取数据的方法
public class ConfigController {@GetMapping("/fetch")public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {Map<String, ConfigData<?>> result = Maps.newHashMap();// 根据不同 groupKey 查找数据并返回for (String groupKey : groupKeys) {ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));result.put(groupKey, data);}return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);}
}
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {// 仅仅取了缓存数据?ConfigDataCache config = CACHE.get(groupKey.name());switch (groupKey) {// ...case RULE:List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {}.getType());return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);// ...}}
}
到这我有些傻眼了, 不是按我想的在同步时返回数据库中信息.
不过接着向上断点探索, 终于在 网页端触发后台数据更新 这块发现问题.
HTTP 轮询中的通知是沿用的 AbstractDataChangedListener#onRuleChanged 等方法, 而这些方法会重新刷新缓存.
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {if (CollectionUtils.isEmpty(changed)) {return;}// 刷新缓存this.updateRuleCache();this.afterRuleChanged(changed, eventType);}protected void updateRuleCache() {// 获取数据库中所有 rule 的信息并放入 CACHE 缓存中this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());}protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {String json = GsonUtils.getInstance().toJson(data);ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);}
}
梳理下后台 B 同步流程图:
整个流程步骤是按照 时序1 -> 时序2 -> 时序3 , 正是由于时序2 中后台B配置变动时会重新刷新相关变动元数据类型的所有数据, 保证缓存中数据是最新的. 在时序3中网关请求时才能返回最新的数据.
Zookeeper 表现
来测测 Zookeeper 下网关同步的表现, 是否存在数据覆盖可能.
由于网关端仅是与 Zookeeper 有数据交互, 所以引起覆盖可能性的地方, 仅可能是后台传输数据到 Zookeeper.
找到后台的事件分发器 DataChangedEventDispatcher, 这块 Zookeeper 的监听类为 ZookeeperDataChangedListener.
由于后台的修改是区分为事件类型进行通知的, 我们找到 Rule
事件对应的方法看看实现代码:
public class ZookeeperDataChangedListener implements DataChangedListener {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {// ...for (RuleData data : changed) {String ruleRealPath = ZkPathConstants.buildRulePath(data.getPluginName(), data.getSelectorId(), data.getId());if (eventType == DataEventTypeEnum.DELETE) {deleteZkPath(ruleRealPath);continue;}String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getPluginName());createZkNode(ruleParentPath);// 写入数据 datainsertZkNode(ruleRealPath, data);}}
}
我们将 divide 的 /http/order/save
路径关闭, 查看 data
中的值
RuleData(id=1355090605491478528, name=/http/order/save, pluginName=divide, selectorId=1355090604493234176, matchMode=0, sort=1, enabled=false, loged=true, handle={"requestVolumeThreshold":"0","errorThresholdPercentage":"0","maxConcurrentRequests":"0","sleepWindowInMilliseconds":"0","loadBalance":"random","timeout":3000,"retry":"0"}, conditionDataList=[ConditionData(paramType=uri, operator==, paramName=/, paramValue=/http/order/save)])
可以发现, 后台对于 Zookeeper 的更新是增量的, 所以多个后台组成的集群环境下, 更新数据不会在 Zookeeper 端产生覆盖现象.
网关端是面向 Zookeeper 同步数据, 并没有直接关联后台集群, 所以这种模式下网关也不会有覆盖现象.
Nacos 表现
和 Zookeeper 一样, 使用 Nacos 同步时网关也不会直接面对后台集群, 所以只需保证后台集群对 Nacos 的更新没有覆盖问题即可.
找到关键监听类 NacosDataChangedListener
public class NacosDataChangedListener implements DataChangedListener {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {// 从 Nacos 拉取最新数据并更新到缓存updateRuleMap(getConfig(NacosPathConstants.RULE_DATA_ID));switch (eventType) {// ...default:changed.forEach(rule -> {// MAP 集合中剔除变动的数据, 其余数据保留List<RuleData> ls = RULE_MAP.getOrDefault(rule.getSelectorId(), new ArrayList<>()).stream().filter(s -> !s.getId().equals(rule.getId())).collect(Collectors.toList());// 加入变动的数据, 构成一个最新缓存ls.add(rule);ls.sort(RULE_DATA_COMPARATOR);// 全量 RULE 缓存重置RULE_MAP.put(rule.getSelectorId(), ls);});break;}// 推送 RULE 类型的全量数据到 NacospublishConfig(NacosPathConstants.RULE_DATA_ID, RULE_MAP);}@SneakyThrowsprivate String getConfig(final String dataId) {// 从 Nacos 中获取数据String config = configService.getConfig(dataId, NacosPathConstants.GROUP, NacosPathConstants.DEFAULT_TIME_OUT);return StringUtils.hasLength(config) ? config : NacosPathConstants.EMPTY_CONFIG_DEFAULT_VALUE;}private void updateRuleMap(final String configInfo) {JsonObject jo = GsonUtils.getInstance().fromJson(configInfo, JsonObject.class);Set<String> set = new HashSet<>(RULE_MAP.keySet());for (Entry<String, JsonElement> e : jo.entrySet()) {set.remove(e.getKey());List<RuleData> ls = new ArrayList<>();e.getValue().getAsJsonArray().forEach(je -> ls.add(GsonUtils.getInstance().fromJson(je, RuleData.class)));// 将最新的数据放入缓存RULE_MAP.put(e.getKey(), ls);}RULE_MAP.keySet().removeAll(set);}
}
根据我们的分析, Nacos 虽然是将某一类型的数据 (比如 RULE) 从缓存中全部发布到 Nacos 中. 但在推送前, 会从 Nacos 中获取到最新数据, 这时就获取到其他集群节点变动的数据 , 以此保证不会出现覆盖其他节点数据的问题.
附上一个流程图说明 后台更新数据 的情况:
Nacos 同步时, 后台通过推送前从 Nacos 获取数据, 达到节点间不覆盖的目的. 而网关端仅针对 Nacos 更新, 保证了此种方式下的数据正确性.
总结
集群下各种同步方式均可以保证 节点间数据变动同步给网关时不会相互影响, 造成数据覆盖, 它们的实现方式各不相同.
- Websocket 模式下, 通过精准的 增量更新 , 保证集群间同步给网关不相同数据时, 不会带上它们的过时数据
- Http 长轮询模式下, 每个节点在接收网页端变动信息时, 不仅变更自身相应数据的缓存, 也会 查询数据库, 更新相应类型缓存的所有数据. 以此保证通知网关时不会传出过时数据
- Zookeeper 模式下, 后台对于 Zookeeper 的数据更新也是 增量更新 的, 这点和 Websocket 很像.
- Nacos 模式下, 后台在相应类型缓存全量推送 Nacos 前, 会先 查询Nacos配置 并更新缓存, 依次保证不会给 Nacos 传入过时数据.
这篇关于Soul网关源码分析-19期的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!