Soul网关数据同步源码分析-Http长轮询

2024-02-09 15:59

本文主要是介绍Soul网关数据同步源码分析-Http长轮询,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在这篇文章中,我们学习下soul网关与soul-admin间数据同步方案,并尝试分析其中实现较为复杂的Http长轮询方案的源码实现。

一、自己思考如何实现

在展开学习之前先想下如果是我怎么实现数据的同步功能。出现在我脑海里最直接的两个方案:

  • 1 soul网关启动后调用soul-admin接口同步全量配置,然后启动定时任务每隔10秒钟查询一次做增量更新,这是一种纯拉取的方式;

  • 2 全量配置信息的拉取同上,在admin数据变更后,调用soul网关的接口更新内存中的配置,这是一种推的方式,相对于上面的方式时效性更强。

乍一看这两个方案看着问题不大,但是细想之下可能有不少漏洞,如果调用接口失败等细节都需要斟酌。如果客户端不暴露对外http接口,那么方案二就无从谈起。

那么我们接下来还是看看成熟的开源框架是如何实现数据同步的,在soul网关中提供了多种数据同步的实现方案,包括Websocket,Zookeeper,http 长轮询。websocket与zookeeper的方案相对比较直接,WebSocket方案更多的是协议层面的直接应用,Zookeeper方案更多的是依赖Zookeeper框架的能力。

在接下来的文章中,我们来重点看下实现比较复杂的,也是开源配置中心采用较多的http长轮询方案。

http长轮询方案可以简单地理解为上面两种方案的折中,即在不需要客户端暴露http接口的情况下实现admin端推,时效性又比定时拉取admin接口的方案一好。

二、http长轮询

在学习源码之前我们需要先搞清楚什么是http长轮询,为何要用http长轮询。

在了解Http长轮询(Http Long Polling)之前,我们需要先看下Http 轮询(Http Polling)。在之前我们自己想到的简单数据同步方案中,client端定时发起请求查询admin的接口,admin端在接收到查询请求后立即返回响应结果,这就是轮询(Polling)。
在这里插入图片描述

轮询很明显不是一个最优的方案,如果定时的频率较低那么数据更新的时效性就很差,如果定时的频率很高,那么就会产生大量的无效http请求,占用网络带宽资源,服务器的压力也会比较大。

由于轮询方案有明显的短板,大牛们又在轮询(Polling)方案的基础上设计出了长轮询(Long Polling)。简单来说,在client端给admin端发送请求后,admin端并不立即返回结果,挂起当前请求直到admin端发生数据变更或者达到超时时间的阈值。client端直到上次请求返回或者失败之后,再发起下一次轮询请求。长轮询的“长”其实指的就是请求的响应时间被admin端主动地延长,达到了admin端推送数据变更的目的。
在这里插入图片描述

Http长轮询方案可以显著地减少http轮询请求数量,例如在soul中长轮询的超时时间为90秒,也不用担心在轮询的间隔期内client端无法获取到admin端的数据变更。接下来我们就进入正题,看下再Soul网关中长轮询的源码实现。

三、Soul网关长轮询源码分析

Http长轮询源码-客户端

在HttpSyncDataConfiguration中我们可以看到Client配置的soul.sync.http相关属性加载到HttpConfig,并构造了HttpSyncDataService,那么核心的逻辑就在HttpSyncDataService中。

/*** Http sync data configuration for spring boot.** @author xiaoyu(Myth)*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {@Beanpublic SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use http long pull sync soul data");//初始化HttpSyncDataServicereturn new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}@Bean@ConfigurationProperties(prefix = "soul.sync.http")public HttpConfig httpConfig() {return new HttpConfig();}
}

在HttpSyncDataService的构造函数中,最后会调用start方法,具体实现如下。

private void start() {if (RUNNING.compareAndSet(false, true)) {// 启动时即遍历所有分组的配置,这里的分组指的是插件、选择器、规则等不同种类的数据,这里的请求并不是长轮询this.fetchGroupConfig(ConfigGroupEnum.values());int threadSize = serverList.size();//初始化线程池,线程数量等于admin server 节点数this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),SoulThreadFactory.create("http-long-polling", true));// 开启长轮询线程this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));} else {log.info("soul http long polling was started, executor=[{}]", executor);}
}

线程实现如下,有简单的重试机制,核心的代码逻辑仍在doLongPolling中:

class HttpLongPollingTask implements Runnable {private String server;private final int retryTimes = 3;HttpLongPollingTask(final String server) {this.server = server;}@Overridepublic void run() {//网关运行过程中,RUNNING始终为true;while (RUNNING.get()) {for (int time = 1; time <= retryTimes; time++) {try {doLongPolling(server);} catch (Exception e) {// print warnning log.if (time < retryTimes) {log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",time, retryTimes - time, e.getMessage());ThreadUtils.sleep(TimeUnit.SECONDS, 5);continue;}// 3次重试仍失败,尝试等待5分钟后再试log.error("Long polling failed, try again after 5 minutes!", e);ThreadUtils.sleep(TimeUnit.MINUTES, 5);}}}log.warn("Stop http long polling.");}
}

我们继续看doLongPolling:

private void doLongPolling(final String server) {MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);for (ConfigGroupEnum group : ConfigGroupEnum.values()) {ConfigData<?> cacheConfig = factory.cacheConfigData(group);String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));params.put(group.name(), Lists.newArrayList(value));}HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);HttpEntity httpEntity = new HttpEntity(params, headers);String listenerUrl = server + "/configs/listener";log.debug("request listener configs: [{}]", listenerUrl);JsonArray groupJson = null;try {String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();log.debug("listener result: [{}]", json);groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");} catch (RestClientException e) {String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());throw new SoulException(message, e);}//长轮询获取变更的group, 根据group拉取对应数据if (groupJson != null) {// fetch group configuration async.ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);if (ArrayUtils.isNotEmpty(changedGroups)) {log.info("Group config changed: {}", Arrays.toString(changedGroups));this.doFetchGroupConfig(server, changedGroups);}}
}

Http长轮询源码-服务端

接下来学习下数据同步中的http长轮询服务端源码实现。整体的同步流程如官方文档中图例所示。
在这里插入图片描述
Client端会调用/configs/listener接口,核心代码逻辑见HttpLongPollingDataChangedListener。

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {// 对比Client请求中group的md5是否一致,List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);String clientIp = getRemoteIp(request);// 如果出现不一致的group则存在变更的数据,这种情况下可以直接返回响应。if (CollectionUtils.isNotEmpty(changedGroup)) {this.generateResponse(response, changedGroup);log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);return;}// listen for configuration changed.final AsyncContext asyncContext = request.startAsync();// AsyncContext.settimeout() does not timeout properly, so you have to control it yourselfasyncContext.setTimeout(0L);// 阻塞线程的实现scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}

我们继续看下LongPollingClient线程的实现,run方法中定时调度线程,定时60秒后判断变更的ConfigGroup并返回响应。如果在60秒钟内数据发生变更如何处理呢,我们可以看到LongPollingClient中有个sendResponse方法,除了run方法内调用,还有DataChangeTask线程。

class LongPollingClient implements Runnable {private final AsyncContext asyncContext;private final String ip;private final long timeoutTime;private Future<?> asyncTimeoutFuture;LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {this.asyncContext = ac;this.ip = ip;this.timeoutTime = timeoutTime;}@Overridepublic void run() {// 定时调度线程,返回Future对象,超时时间为timeoutTime,默认为60秒this.asyncTimeoutFuture = scheduler.schedule(() -> {clients.remove(LongPollingClient.this);List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());sendResponse(changedGroups);}, timeoutTime, TimeUnit.MILLISECONDS);clients.add(this);}//如果在60s内数据发送变更,通过DataChangeTask调用此方法返回数据void sendResponse(final List<ConfigGroupEnum> changedGroups) {// cancel schedulerif (null != asyncTimeoutFuture) {asyncTimeoutFuture.cancel(false);}generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);asyncContext.complete();}
}

那我们看下DataChangeTask线程的实现,以选择器数据为例,当数据更新时会调用afterSelectorChanged方法,调度一个新的DataChangeTask线程。DataChangeTask线程中遍历全局变量clients(LongPollingClient线程列表),调用LongPollingClient的sendResponse方法返回变更的ConfigGroup。

@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}
......
class DataChangeTask implements Runnable {private final ConfigGroupEnum groupKey;private final long changeTime = System.currentTimeMillis();DataChangeTask(final ConfigGroupEnum groupKey) {this.groupKey = groupKey;}@Overridepublic void run() {for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {LongPollingClient client = iter.next();iter.remove();client.sendResponse(Collections.singletonList(groupKey));log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);}}
}

数据同步实践

前面我们学习了Http长轮询同步的核心代码,接下来主要是实际操作验证下同步的实际流程。使用http长轮询需要的配置如下:

  • 首先在pom.xml文件中 引入以下依赖:
     <dependency><groupId>org.dromara</groupId><artifactId>soul-spring-boot-starter-sync-data-http</artifactId><version>${last.version}</version></dependency>
  • 在 soul-bootstrap的 yml 文件中进行如下配置:
soul :sync:http:url: http://localhost:9095
#url: 配置成你的 soul-admin的 ip与端口地址,多个admin集群环境请使用(,)分隔。
  • soul-admin 配置, 或在 soul-admin 启动参数中设置--soul.sync.http.enabled=true,然后重启服务。
soul:sync:http:enabled: true

首先我们需要启动soul-admin,soul-bootstrap,soul-examples-http,在soul-admin中变更Selector /http配置,soul-bootstrap中可以立即得到响应。
在这里插入图片描述
如图所示,我们变更weight后保存Selector。在保存Selector的流程中触发DataChangedEvent。

private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());List<ConditionData> conditionDataList =selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());// publish change event.eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}

监听对应DataChangedEvent的实现如下,在soul中我们可以看到ApplicationListener的频繁使用,应用内的事件转发很便捷。可以看到针对SELECTOR配置变更调用了DataChangedListener的onSelectorChanged事件,而在http长轮询的同步策略中,DataChangedListener的实现为HttpLongPollingDataChangedListener。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {private ApplicationContext applicationContext;private List<DataChangedListener> listeners;public DataChangedEventDispatcher(final ApplicationContext applicationContext) {this.applicationContext = applicationContext;}@Override@SuppressWarnings("unchecked")public void onApplicationEvent(final DataChangedEvent event) {for (DataChangedListener listener : listeners) {switch (event.getGroupKey()) {case APP_AUTH:listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());break;case PLUGIN:listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());break;case RULE:listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());break;case SELECTOR:listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());break;case META_DATA:listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());break;default:throw new IllegalStateException("Unexpected value: " + event.getGroupKey());}}}@Overridepublic void afterPropertiesSet() {Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));}
}

通过类继承关系可以看到,HttpLongPollingDataChangedListener并没有直接实现DataChangedListener接口,而是继承了实现了接口的AbstractDataChangedListener抽象类。
在这里插入图片描述
在AbstractDataChangedListener中实现了onSelectorChanged方法。

@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {if (CollectionUtils.isEmpty(changed)) {return;}this.updateSelectorCache();this.afterSelectorChanged(changed, eventType);
}
......
default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {
}

在HttpLongPollingDataChangedListener中则是实现了onSelectorChanged方法。

@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}

看到这里就跟我们之前看到的DataChangeTask串联起来了,这是一个完整的变更配置时同步的流程。
看到这里就有个疑问,如果有多个admin实例,其中一个admin实例上的配置变更,如何保证多admin节点间数据的同步?

soul-admin集群数据同步

在昨天的文章结尾,我们提到了soul-admin 集群模式下,如何解决节点间的数据同步问题?这个问题其实就是缓存一致性问题。

在具体实验及看源码之前,我们先抛出几个问题:

1 soul-admin启动了两个节点,在admin网页端(节点A)更新并保存配置,此时数据库内数据会更新,节点A的本地缓存也会同时更新,问题是节点B是否会同步更新?如果会,更新的方式是怎样的?

2 如果节点B不会同步更新,soul-web从admin集群拉取数据时,会不会先拉取到节点A最新数据,然后拉取到节点B的数据覆盖掉最新的数据,导致soul-web中配置信息不是最新的?

我们可以先自己尝试下给这两个问题设计解决方案:

1 同步更新,在不引入第三方分布式服务的情况下,不容易做到。最简单的方式是定时查询数据库判断数据是否变更,存在变更就更新本地缓存。但是在更新的时间间隔内节点B的本地缓存不是最新的;

2 soul-web自身维护数据更新的时间戳,从admin拉取数据时,当前时间戳如果大于admin的时间戳,则不需要更新数据。这种方式可以保证soul-web始终只拉取最新的数据;

接下来我们看下soul具体的源码实现。

soul-admin节点间数据是否同步

我们仍然以Selector更新为例,Selector配置更新后触发DataChangedEvent事件,AbstractDataChangedListener中onSelectorChanged方法只是更新了本地缓存,并没有同步更新其他admin节点的逻辑。

@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {if (CollectionUtils.isEmpty(changed)) {return;}this.updateSelectorCache();this.afterSelectorChanged(changed, eventType);
}

通过查看updateSelectorCache的调用方,我们看下还有没有其他更新本地缓存的逻辑。可以看到在HttpLongPollingDataChangedListener中有更新本地缓存的逻辑。
在这里插入图片描述
在HttpLongPollingDataChangedListener中,可以看到启动了定时线程,默认5分钟更新一次本地缓存。看到这里发现跟我们的第一个问题的解决方案是一致的,采用了很简单的实现。那么这种实现就需要处理第二个问题,避免出现数据更新问题。

@Override
protected void afterInitialize() {long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();// 定时更新本地缓存,默认事件间隔5分钟scheduler.scheduleWithFixedDelay(() -> {log.info("http sync strategy refresh config start.");try {this.refreshLocalCache();log.info("http sync strategy refresh config success.");} catch (Exception e) {log.error("http sync strategy refresh config error!", e);}}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
//更新全部本地缓存
private void refreshLocalCache() {this.updateAppAuthCache();this.updatePluginCache();this.updateRuleCache();this.updateSelectorCache();this.updateMetaDataCache();
}

那么soul在更新缓存数据时的策略,如何避免出现同步旧数据的情况呢?我们接着往下看。

在soul-web发送长轮询请求时,我们可以看到请求的参数包括每个配置组的md5值,及配置组的最近修改时间。

private void doLongPolling(final String server) {MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);//长轮询请求参数for (ConfigGroupEnum group : ConfigGroupEnum.values()) {ConfigData<?> cacheConfig = factory.cacheConfigData(group);String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));params.put(group.name(), Lists.newArrayList(value));}HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);HttpEntity httpEntity = new HttpEntity(params, headers);String listenerUrl = server + "/configs/listener";log.debug("request listener configs: [{}]", listenerUrl);
......
}

我们看下md5及lastModifyTime的更新逻辑,这部分逻辑要么在soul-web端,要么在soul-admin中,我们通过查找源码可以确定其位置。在AbstractDataChangedListener中,每次updateCache方法计算配置组对应json字符串的md5值,并更新最近更新时间戳。

protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {String json = GsonUtils.getInstance().toJson(data);//设置md5,更新时间戳ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}

md5值可以用来判断缓存的配置内容是否发生变更,而lastModifyTime则用来判断变更数据是否应该更新到本地。在判断配置组(ConfigGroup)是否发生变更时利用了这两个字段。

private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {// md5值一致,说明内容无变化,直接返回falseif (StringUtils.equals(clientMd5, serverCache.getMd5())) {return false;}// 对比最近更新时间,如果服务器更新时间大于soul-web更新时间,返回true说明配置信息变更,soul-web需要更新该配置组数据long lastModifyTime = serverCache.getLastModifyTime();if (lastModifyTime >= clientModifyTime) {return true;}//我们之前提到soul-admin在集群模式下,可能出现本地缓存不一致的情况。如果soul-web拉取了最新的配置,最近更新时间大于未更新节点的更新时间//这种情况下需要刷新本地缓存,考虑到多client并发请求可能频繁访问db,这里需加锁boolean locked = false;try {locked = LOCK.tryLock(5, TimeUnit.SECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();return true;}if (locked) {try {ConfigDataCache latest = CACHE.get(serverCache.getGroup());//如果在当前线程获取到锁后,数据已经发生了变更,判断md5值。这里用'!='判断是否为不同的对象if (latest != serverCache) {return !StringUtils.equals(clientMd5, latest.getMd5());}// 数据未变更,从数据库拉取配置数据更新到内存。这里是所有分组的数据都会更新this.refreshLocalCache();latest = CACHE.get(serverCache.getGroup());return !StringUtils.equals(clientMd5, latest.getMd5());} finally {LOCK.unlock();}}// 加锁失败,可能有其他线程正在更新本地缓存,则通知soul-web需更新配置数据return true;
}

接下来我们再看下当soul-web从soul-admin查询到配置信息后,如何更新本地缓存。

private DataRefreshFactory factory;
......
private boolean updateCacheWithJson(final String json) {JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);JsonObject data = jsonObject.getAsJsonObject("data");// if the config cache will be updated?return factory.executor(data);
}

我们继续来看DataRefreshFactory的实现,可以看到本地缓存并不在DataRefreshFactory中,而是在每个配置组对应的DataRefresh中。

public final class DataRefreshFactory {private static final EnumMap<ConfigGroupEnum, DataRefresh> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);//构造方法,这么多Subscriber干啥的?我们先跳过,后续看插件的时候再跟进public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AppAuthDataRefresh(authDataSubscribers));ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataRefresh(metaDataSubscribers));}//使用parallelStream,并发更新每个配置组的本地缓存public boolean executor(final JsonObject data) {final boolean[] success = {false};ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));return success[0];}//获取配置组的配置信息public ConfigData<?> cacheConfigData(final ConfigGroupEnum group) {return ENUM_MAP.get(group).cacheConfigData();}
}

我们可以看到每种配置组都有自己的DataRefresh实现,同样的这里使用了模板方法的设计模式,通过AbstractDataRefresh定义一组通用的方法,具体配置组的DataRefresh类直接继承AbstractDataRefresh,如下图所示。
在这里插入图片描述
我们仍然以Selector为例看下具体的实现。
在这里插入图片描述
我们接着看DataRefresh的具体实现,以SelectorDataRefresh为例:

在DataRefreshFactory中,我们看到遍历调用每个DataRefresh实现类的refresh方法,其具体实现则在AbstractDataRefresh中。

AbstractDataRefresh
......
protected abstract void refresh(List<T> data);
@Override
public Boolean refresh(final JsonObject data) {boolean updated = false;JsonObject jsonObject = convert(data);if (null != jsonObject) {//解析配置组对应的数据ConfigData<T> result = fromJson(jsonObject);//是否更新缓存,若更新需要同时refreshif (this.updateCacheIfNeed(result)) {updated = true;refresh(result.getData());}}return updated;
}

updateCacheIfNeed的具体实现同样在AbstractDataRefresh中。这里使用了ConcurrentHashMap的merge方法,可以保证更新的原子性。假设我们使用常规的更新逻辑,先get oldVal,对比后再决定是否更新newVal,在有多个线程并发执行时可能出现并发问题。在日常的开发工作中也需要留意,注意规避潜在的并发问题。

protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {// 当前缓存中没有该配置组信息,更新后直接返回trueif (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {return true;}ResultHolder holder = new ResultHolder(false);GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {// md5值不一致,且oldVal更新时间小于newVal更新时间,更新newVal,且记录更新结果if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {log.info("update {} config: {}", groupEnum, newVal);holder.result = true;return newVal;}log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());return oldVal;});return holder.result;
}

我们接着看SelectorDataRefresh的具体实现。这里只有几行代码,大致猜测其中的含义为如果Selector数据为空,触发所有插件对于Selector的取消订阅事件,并更新Selector缓存。如果data不为空,更新Selector缓存,触发所有插件的onSelectorSubscribe事件。

SelectorDataRefresh
private final PluginDataSubscriber pluginDataSubscriber;
......
@Override
protected void refresh(final List<SelectorData> data) {if (CollectionUtils.isEmpty(data)) {log.info("clear all selector cache, old cache");data.forEach(pluginDataSubscriber::unSelectorSubscribe);pluginDataSubscriber.refreshSelectorDataAll();} else {// update cache for UpstreamCacheManagerpluginDataSubscriber.refreshSelectorDataAll();data.forEach(pluginDataSubscriber::onSelectorSubscribe);}
}

接下来的内容就涉及到插件体系,及插件如何使用缓存数据,后边我们继续看相关源码,最终拼出一个soul网关数据同步及数据查询的完整Picture。

参考:

https://www.ably.io/blog/websockets-vs-long-polling

https://dromara.org/zh-cn/docs/soul/dataSync.html

这篇关于Soul网关数据同步源码分析-Http长轮询的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/694666

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi