本文主要是介绍Soul网关数据同步源码分析-Http长轮询,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在这篇文章中,我们学习下soul网关与soul-admin间数据同步方案,并尝试分析其中实现较为复杂的Http长轮询方案的源码实现。
一、自己思考如何实现
在展开学习之前先想下如果是我怎么实现数据的同步功能。出现在我脑海里最直接的两个方案:
-
1 soul网关启动后调用soul-admin接口同步全量配置,然后启动定时任务每隔10秒钟查询一次做增量更新,这是一种纯拉取的方式;
-
2 全量配置信息的拉取同上,在admin数据变更后,调用soul网关的接口更新内存中的配置,这是一种推的方式,相对于上面的方式时效性更强。
乍一看这两个方案看着问题不大,但是细想之下可能有不少漏洞,如果调用接口失败等细节都需要斟酌。如果客户端不暴露对外http接口,那么方案二就无从谈起。
那么我们接下来还是看看成熟的开源框架是如何实现数据同步的,在soul网关中提供了多种数据同步的实现方案,包括Websocket,Zookeeper,http 长轮询。websocket与zookeeper的方案相对比较直接,WebSocket方案更多的是协议层面的直接应用,Zookeeper方案更多的是依赖Zookeeper框架的能力。
在接下来的文章中,我们来重点看下实现比较复杂的,也是开源配置中心采用较多的http长轮询方案。
http长轮询方案可以简单地理解为上面两种方案的折中,即在不需要客户端暴露http接口的情况下实现admin端推,时效性又比定时拉取admin接口的方案一好。
二、http长轮询
在学习源码之前我们需要先搞清楚什么是http长轮询,为何要用http长轮询。
在了解Http长轮询(Http Long Polling)之前,我们需要先看下Http 轮询(Http Polling)。在之前我们自己想到的简单数据同步方案中,client端定时发起请求查询admin的接口,admin端在接收到查询请求后立即返回响应结果,这就是轮询(Polling)。
轮询很明显不是一个最优的方案,如果定时的频率较低那么数据更新的时效性就很差,如果定时的频率很高,那么就会产生大量的无效http请求,占用网络带宽资源,服务器的压力也会比较大。
由于轮询方案有明显的短板,大牛们又在轮询(Polling)方案的基础上设计出了长轮询(Long Polling)。简单来说,在client端给admin端发送请求后,admin端并不立即返回结果,挂起当前请求直到admin端发生数据变更或者达到超时时间的阈值。client端直到上次请求返回或者失败之后,再发起下一次轮询请求。长轮询的“长”其实指的就是请求的响应时间被admin端主动地延长,达到了admin端推送数据变更的目的。
Http长轮询方案可以显著地减少http轮询请求数量,例如在soul中长轮询的超时时间为90秒,也不用担心在轮询的间隔期内client端无法获取到admin端的数据变更。接下来我们就进入正题,看下再Soul网关中长轮询的源码实现。
三、Soul网关长轮询源码分析
Http长轮询源码-客户端
在HttpSyncDataConfiguration中我们可以看到Client配置的soul.sync.http相关属性加载到HttpConfig,并构造了HttpSyncDataService,那么核心的逻辑就在HttpSyncDataService中。
/*** Http sync data configuration for spring boot.** @author xiaoyu(Myth)*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {@Beanpublic SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use http long pull sync soul data");//初始化HttpSyncDataServicereturn new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}@Bean@ConfigurationProperties(prefix = "soul.sync.http")public HttpConfig httpConfig() {return new HttpConfig();}
}
在HttpSyncDataService的构造函数中,最后会调用start方法,具体实现如下。
private void start() {if (RUNNING.compareAndSet(false, true)) {// 启动时即遍历所有分组的配置,这里的分组指的是插件、选择器、规则等不同种类的数据,这里的请求并不是长轮询this.fetchGroupConfig(ConfigGroupEnum.values());int threadSize = serverList.size();//初始化线程池,线程数量等于admin server 节点数this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),SoulThreadFactory.create("http-long-polling", true));// 开启长轮询线程this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));} else {log.info("soul http long polling was started, executor=[{}]", executor);}
}
线程实现如下,有简单的重试机制,核心的代码逻辑仍在doLongPolling中:
class HttpLongPollingTask implements Runnable {private String server;private final int retryTimes = 3;HttpLongPollingTask(final String server) {this.server = server;}@Overridepublic void run() {//网关运行过程中,RUNNING始终为true;while (RUNNING.get()) {for (int time = 1; time <= retryTimes; time++) {try {doLongPolling(server);} catch (Exception e) {// print warnning log.if (time < retryTimes) {log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",time, retryTimes - time, e.getMessage());ThreadUtils.sleep(TimeUnit.SECONDS, 5);continue;}// 3次重试仍失败,尝试等待5分钟后再试log.error("Long polling failed, try again after 5 minutes!", e);ThreadUtils.sleep(TimeUnit.MINUTES, 5);}}}log.warn("Stop http long polling.");}
}
我们继续看doLongPolling:
private void doLongPolling(final String server) {MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);for (ConfigGroupEnum group : ConfigGroupEnum.values()) {ConfigData<?> cacheConfig = factory.cacheConfigData(group);String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));params.put(group.name(), Lists.newArrayList(value));}HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);HttpEntity httpEntity = new HttpEntity(params, headers);String listenerUrl = server + "/configs/listener";log.debug("request listener configs: [{}]", listenerUrl);JsonArray groupJson = null;try {String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();log.debug("listener result: [{}]", json);groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");} catch (RestClientException e) {String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());throw new SoulException(message, e);}//长轮询获取变更的group, 根据group拉取对应数据if (groupJson != null) {// fetch group configuration async.ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);if (ArrayUtils.isNotEmpty(changedGroups)) {log.info("Group config changed: {}", Arrays.toString(changedGroups));this.doFetchGroupConfig(server, changedGroups);}}
}
Http长轮询源码-服务端
接下来学习下数据同步中的http长轮询服务端源码实现。整体的同步流程如官方文档中图例所示。
Client端会调用/configs/listener接口,核心代码逻辑见HttpLongPollingDataChangedListener。
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {// 对比Client请求中group的md5是否一致,List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);String clientIp = getRemoteIp(request);// 如果出现不一致的group则存在变更的数据,这种情况下可以直接返回响应。if (CollectionUtils.isNotEmpty(changedGroup)) {this.generateResponse(response, changedGroup);log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);return;}// listen for configuration changed.final AsyncContext asyncContext = request.startAsync();// AsyncContext.settimeout() does not timeout properly, so you have to control it yourselfasyncContext.setTimeout(0L);// 阻塞线程的实现scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
我们继续看下LongPollingClient线程的实现,run方法中定时调度线程,定时60秒后判断变更的ConfigGroup并返回响应。如果在60秒钟内数据发生变更如何处理呢,我们可以看到LongPollingClient中有个sendResponse方法,除了run方法内调用,还有DataChangeTask线程。
class LongPollingClient implements Runnable {private final AsyncContext asyncContext;private final String ip;private final long timeoutTime;private Future<?> asyncTimeoutFuture;LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {this.asyncContext = ac;this.ip = ip;this.timeoutTime = timeoutTime;}@Overridepublic void run() {// 定时调度线程,返回Future对象,超时时间为timeoutTime,默认为60秒this.asyncTimeoutFuture = scheduler.schedule(() -> {clients.remove(LongPollingClient.this);List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());sendResponse(changedGroups);}, timeoutTime, TimeUnit.MILLISECONDS);clients.add(this);}//如果在60s内数据发送变更,通过DataChangeTask调用此方法返回数据void sendResponse(final List<ConfigGroupEnum> changedGroups) {// cancel schedulerif (null != asyncTimeoutFuture) {asyncTimeoutFuture.cancel(false);}generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);asyncContext.complete();}
}
那我们看下DataChangeTask线程的实现,以选择器数据为例,当数据更新时会调用afterSelectorChanged方法,调度一个新的DataChangeTask线程。DataChangeTask线程中遍历全局变量clients(LongPollingClient线程列表),调用LongPollingClient的sendResponse方法返回变更的ConfigGroup。
@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}
......
class DataChangeTask implements Runnable {private final ConfigGroupEnum groupKey;private final long changeTime = System.currentTimeMillis();DataChangeTask(final ConfigGroupEnum groupKey) {this.groupKey = groupKey;}@Overridepublic void run() {for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {LongPollingClient client = iter.next();iter.remove();client.sendResponse(Collections.singletonList(groupKey));log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);}}
}
数据同步实践
前面我们学习了Http长轮询同步的核心代码,接下来主要是实际操作验证下同步的实际流程。使用http长轮询需要的配置如下:
- 首先在
pom.xml
文件中 引入以下依赖:
<dependency><groupId>org.dromara</groupId><artifactId>soul-spring-boot-starter-sync-data-http</artifactId><version>${last.version}</version></dependency>
- 在 soul-bootstrap的 yml 文件中进行如下配置:
soul :sync:http:url: http://localhost:9095
#url: 配置成你的 soul-admin的 ip与端口地址,多个admin集群环境请使用(,)分隔。
- soul-admin 配置, 或在 soul-admin 启动参数中设置
--soul.sync.http.enabled=true
,然后重启服务。
soul:sync:http:enabled: true
首先我们需要启动soul-admin,soul-bootstrap,soul-examples-http,在soul-admin中变更Selector /http配置,soul-bootstrap中可以立即得到响应。
如图所示,我们变更weight后保存Selector。在保存Selector的流程中触发DataChangedEvent。
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());List<ConditionData> conditionDataList =selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());// publish change event.eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}
监听对应DataChangedEvent的实现如下,在soul中我们可以看到ApplicationListener的频繁使用,应用内的事件转发很便捷。可以看到针对SELECTOR配置变更调用了DataChangedListener的onSelectorChanged事件,而在http长轮询的同步策略中,DataChangedListener的实现为HttpLongPollingDataChangedListener。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {private ApplicationContext applicationContext;private List<DataChangedListener> listeners;public DataChangedEventDispatcher(final ApplicationContext applicationContext) {this.applicationContext = applicationContext;}@Override@SuppressWarnings("unchecked")public void onApplicationEvent(final DataChangedEvent event) {for (DataChangedListener listener : listeners) {switch (event.getGroupKey()) {case APP_AUTH:listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());break;case PLUGIN:listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());break;case RULE:listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());break;case SELECTOR:listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());break;case META_DATA:listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());break;default:throw new IllegalStateException("Unexpected value: " + event.getGroupKey());}}}@Overridepublic void afterPropertiesSet() {Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));}
}
通过类继承关系可以看到,HttpLongPollingDataChangedListener并没有直接实现DataChangedListener接口,而是继承了实现了接口的AbstractDataChangedListener抽象类。
在AbstractDataChangedListener中实现了onSelectorChanged方法。
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {if (CollectionUtils.isEmpty(changed)) {return;}this.updateSelectorCache();this.afterSelectorChanged(changed, eventType);
}
......
default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {
}
在HttpLongPollingDataChangedListener中则是实现了onSelectorChanged方法。
@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}
看到这里就跟我们之前看到的DataChangeTask串联起来了,这是一个完整的变更配置时同步的流程。
看到这里就有个疑问,如果有多个admin实例,其中一个admin实例上的配置变更,如何保证多admin节点间数据的同步?
soul-admin集群数据同步
在昨天的文章结尾,我们提到了soul-admin 集群模式下,如何解决节点间的数据同步问题?这个问题其实就是缓存一致性问题。
在具体实验及看源码之前,我们先抛出几个问题:
1 soul-admin启动了两个节点,在admin网页端(节点A)更新并保存配置,此时数据库内数据会更新,节点A的本地缓存也会同时更新,问题是节点B是否会同步更新?如果会,更新的方式是怎样的?
2 如果节点B不会同步更新,soul-web从admin集群拉取数据时,会不会先拉取到节点A最新数据,然后拉取到节点B的数据覆盖掉最新的数据,导致soul-web中配置信息不是最新的?
我们可以先自己尝试下给这两个问题设计解决方案:
1 同步更新,在不引入第三方分布式服务的情况下,不容易做到。最简单的方式是定时查询数据库判断数据是否变更,存在变更就更新本地缓存。但是在更新的时间间隔内节点B的本地缓存不是最新的;
2 soul-web自身维护数据更新的时间戳,从admin拉取数据时,当前时间戳如果大于admin的时间戳,则不需要更新数据。这种方式可以保证soul-web始终只拉取最新的数据;
接下来我们看下soul具体的源码实现。
soul-admin节点间数据是否同步
我们仍然以Selector更新为例,Selector配置更新后触发DataChangedEvent事件,AbstractDataChangedListener中onSelectorChanged方法只是更新了本地缓存,并没有同步更新其他admin节点的逻辑。
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {if (CollectionUtils.isEmpty(changed)) {return;}this.updateSelectorCache();this.afterSelectorChanged(changed, eventType);
}
通过查看updateSelectorCache的调用方,我们看下还有没有其他更新本地缓存的逻辑。可以看到在HttpLongPollingDataChangedListener中有更新本地缓存的逻辑。
在HttpLongPollingDataChangedListener中,可以看到启动了定时线程,默认5分钟更新一次本地缓存。看到这里发现跟我们的第一个问题的解决方案是一致的,采用了很简单的实现。那么这种实现就需要处理第二个问题,避免出现数据更新问题。
@Override
protected void afterInitialize() {long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();// 定时更新本地缓存,默认事件间隔5分钟scheduler.scheduleWithFixedDelay(() -> {log.info("http sync strategy refresh config start.");try {this.refreshLocalCache();log.info("http sync strategy refresh config success.");} catch (Exception e) {log.error("http sync strategy refresh config error!", e);}}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
//更新全部本地缓存
private void refreshLocalCache() {this.updateAppAuthCache();this.updatePluginCache();this.updateRuleCache();this.updateSelectorCache();this.updateMetaDataCache();
}
那么soul在更新缓存数据时的策略,如何避免出现同步旧数据的情况呢?我们接着往下看。
在soul-web发送长轮询请求时,我们可以看到请求的参数包括每个配置组的md5值,及配置组的最近修改时间。
private void doLongPolling(final String server) {MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);//长轮询请求参数for (ConfigGroupEnum group : ConfigGroupEnum.values()) {ConfigData<?> cacheConfig = factory.cacheConfigData(group);String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));params.put(group.name(), Lists.newArrayList(value));}HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);HttpEntity httpEntity = new HttpEntity(params, headers);String listenerUrl = server + "/configs/listener";log.debug("request listener configs: [{}]", listenerUrl);
......
}
我们看下md5及lastModifyTime的更新逻辑,这部分逻辑要么在soul-web端,要么在soul-admin中,我们通过查找源码可以确定其位置。在AbstractDataChangedListener中,每次updateCache方法计算配置组对应json字符串的md5值,并更新最近更新时间戳。
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {String json = GsonUtils.getInstance().toJson(data);//设置md5,更新时间戳ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
md5值可以用来判断缓存的配置内容是否发生变更,而lastModifyTime则用来判断变更数据是否应该更新到本地。在判断配置组(ConfigGroup)是否发生变更时利用了这两个字段。
private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {// md5值一致,说明内容无变化,直接返回falseif (StringUtils.equals(clientMd5, serverCache.getMd5())) {return false;}// 对比最近更新时间,如果服务器更新时间大于soul-web更新时间,返回true说明配置信息变更,soul-web需要更新该配置组数据long lastModifyTime = serverCache.getLastModifyTime();if (lastModifyTime >= clientModifyTime) {return true;}//我们之前提到soul-admin在集群模式下,可能出现本地缓存不一致的情况。如果soul-web拉取了最新的配置,最近更新时间大于未更新节点的更新时间//这种情况下需要刷新本地缓存,考虑到多client并发请求可能频繁访问db,这里需加锁boolean locked = false;try {locked = LOCK.tryLock(5, TimeUnit.SECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();return true;}if (locked) {try {ConfigDataCache latest = CACHE.get(serverCache.getGroup());//如果在当前线程获取到锁后,数据已经发生了变更,判断md5值。这里用'!='判断是否为不同的对象if (latest != serverCache) {return !StringUtils.equals(clientMd5, latest.getMd5());}// 数据未变更,从数据库拉取配置数据更新到内存。这里是所有分组的数据都会更新this.refreshLocalCache();latest = CACHE.get(serverCache.getGroup());return !StringUtils.equals(clientMd5, latest.getMd5());} finally {LOCK.unlock();}}// 加锁失败,可能有其他线程正在更新本地缓存,则通知soul-web需更新配置数据return true;
}
接下来我们再看下当soul-web从soul-admin查询到配置信息后,如何更新本地缓存。
private DataRefreshFactory factory;
......
private boolean updateCacheWithJson(final String json) {JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);JsonObject data = jsonObject.getAsJsonObject("data");// if the config cache will be updated?return factory.executor(data);
}
我们继续来看DataRefreshFactory的实现,可以看到本地缓存并不在DataRefreshFactory中,而是在每个配置组对应的DataRefresh中。
public final class DataRefreshFactory {private static final EnumMap<ConfigGroupEnum, DataRefresh> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);//构造方法,这么多Subscriber干啥的?我们先跳过,后续看插件的时候再跟进public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AppAuthDataRefresh(authDataSubscribers));ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataRefresh(metaDataSubscribers));}//使用parallelStream,并发更新每个配置组的本地缓存public boolean executor(final JsonObject data) {final boolean[] success = {false};ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));return success[0];}//获取配置组的配置信息public ConfigData<?> cacheConfigData(final ConfigGroupEnum group) {return ENUM_MAP.get(group).cacheConfigData();}
}
我们可以看到每种配置组都有自己的DataRefresh实现,同样的这里使用了模板方法的设计模式,通过AbstractDataRefresh定义一组通用的方法,具体配置组的DataRefresh类直接继承AbstractDataRefresh,如下图所示。
我们仍然以Selector为例看下具体的实现。
我们接着看DataRefresh的具体实现,以SelectorDataRefresh为例:
在DataRefreshFactory中,我们看到遍历调用每个DataRefresh实现类的refresh方法,其具体实现则在AbstractDataRefresh中。
AbstractDataRefresh
......
protected abstract void refresh(List<T> data);
@Override
public Boolean refresh(final JsonObject data) {boolean updated = false;JsonObject jsonObject = convert(data);if (null != jsonObject) {//解析配置组对应的数据ConfigData<T> result = fromJson(jsonObject);//是否更新缓存,若更新需要同时refreshif (this.updateCacheIfNeed(result)) {updated = true;refresh(result.getData());}}return updated;
}
updateCacheIfNeed的具体实现同样在AbstractDataRefresh中。这里使用了ConcurrentHashMap的merge方法,可以保证更新的原子性。假设我们使用常规的更新逻辑,先get oldVal,对比后再决定是否更新newVal,在有多个线程并发执行时可能出现并发问题。在日常的开发工作中也需要留意,注意规避潜在的并发问题。
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {// 当前缓存中没有该配置组信息,更新后直接返回trueif (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {return true;}ResultHolder holder = new ResultHolder(false);GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {// md5值不一致,且oldVal更新时间小于newVal更新时间,更新newVal,且记录更新结果if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {log.info("update {} config: {}", groupEnum, newVal);holder.result = true;return newVal;}log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());return oldVal;});return holder.result;
}
我们接着看SelectorDataRefresh的具体实现。这里只有几行代码,大致猜测其中的含义为如果Selector数据为空,触发所有插件对于Selector的取消订阅事件,并更新Selector缓存。如果data不为空,更新Selector缓存,触发所有插件的onSelectorSubscribe事件。
SelectorDataRefresh
private final PluginDataSubscriber pluginDataSubscriber;
......
@Override
protected void refresh(final List<SelectorData> data) {if (CollectionUtils.isEmpty(data)) {log.info("clear all selector cache, old cache");data.forEach(pluginDataSubscriber::unSelectorSubscribe);pluginDataSubscriber.refreshSelectorDataAll();} else {// update cache for UpstreamCacheManagerpluginDataSubscriber.refreshSelectorDataAll();data.forEach(pluginDataSubscriber::onSelectorSubscribe);}
}
接下来的内容就涉及到插件体系,及插件如何使用缓存数据,后边我们继续看相关源码,最终拼出一个soul网关数据同步及数据查询的完整Picture。
参考:
https://www.ably.io/blog/websockets-vs-long-polling
https://dromara.org/zh-cn/docs/soul/dataSync.html
这篇关于Soul网关数据同步源码分析-Http长轮询的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!