nacos 原理之为什么修改了配置文件后应用端会立刻生效-服务端篇1

本文主要是介绍nacos 原理之为什么修改了配置文件后应用端会立刻生效-服务端篇1,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

单位用了 nacos 来保存配置,前一段时间研究了下 nacos 的原理,今天做个整理和总结。

为什么在 nacos 服务端改了配置文件之后,应用端会立刻生效?

原来我以为服务端在修改了配置文件之后会把结果推送给应用端,后来看了代码之后才发现不是这样的。

简单的说一下,在应用端有一个线程会不断的查询服务端,我感兴趣的某些文件有没有发生变化:

  1. 如果服务端的配置文件有了变化之后,会立刻告诉应用端,某些文件发生了改变。紧接着应用端会根据返回的改变了的文件信息再去 nacos 服务端查询改变了的文件的文件内容。查到内容之后再做相关的变量的改变。
  2. 如果文件没有改变,则 nacos 服务端会有一个异步任务,如果在超时时间内配置文件:
    1. 没有改变,则到点儿后调度任务会响应给应用端,告诉应用端你关心的文件没有发生变化
    2. 如果有改变的话,则会立即给应用端发送响应,告诉应用端,你关心的哪些文件发生了改变。

下面我们来上代码来详细的看一下这个过程。

应用端有一个线程在长轮询 服务端的 /v1/cs/configs/listener 这个服务

/v1/cs/configs/listener 这个服务对应着 nacos 服务端的 com.alibaba.nacos.config.server.controller.ConfigController.listener() 这个方法

1. 让我们来看看 listener 这个方法的实现:

@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);String probeModify = request.getParameter("Listening-Configs");if (StringUtils.isBlank(probeModify)) {throw new IllegalArgumentException("invalid probeModify");}probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);Map<String, String> clientMd5Map;try {clientMd5Map = MD5Util.getClientMd5Map(probeModify);}catch (Throwable e) {throw new IllegalArgumentException("invalid probeModify");}// do long-pollinginner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}
  1. probeModify 是由分隔符拼起来的一个字符串,这个变量代表着应用关心的文件和文件的内容的 md5 集合。

    每个文件是由 dataId + groupId + md5 + tenantId(namespaceId) 确定的。dataId + 单词分隔符 + groupId + 单词分隔符 + tenantId(namespaceId)+ 行分隔符 如果应用有多个配置文件的话,则每个配置文件之间由行分隔符分隔。这个分隔与客户端的版本还有是否传了 tenantId 有关,我这儿列出的只是其中的一种。

  2. 接下来我们再看看 clientMd5Map 这个变量

    clientMd5Map 这个 map 的 key 代表了应用关心的文件,value 则代表了应用端关心的文件的内容对应的 md5 值。

  3. 再看看 inner.doPollingConfig() 这个方法调用。

/**轮询接口
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException {// 长轮询
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + “”;
}// else 兼容短轮询逻辑
List changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);// 兼容短轮询result
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version =2.0.0;
}
int versionNum = Protocol.getVersionNumber(version);/**2.0.4版本以前, 返回值放入header中
*/
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute(“content”, newResult);
}
Loggers.AUTH.info(new content:+ newResult);// 禁用缓存
response.setHeader(“Pragma”, “no-cache”);
response.setDateHeader(“Expires”, 0);
response.setHeader(“Cache-Control”, “no-cache,no-store”);
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + “”;
}

只看支持长轮询的逻辑,短轮询的逻辑更简单。重点只看这一行代码的调用

    longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);

把这个长轮询客户端请求添加到长轮询服务中了。

2. 再接着看 com.alibaba.nacos.config.server.service.LongPollingService.addLongPollingClient() 这个方法

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);/*** 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance*/long timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// do nothing but set fix polling timeout} else {long start = System.currentTimeMillis();List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);if (changedGroups.size() > 0) {generateResponse(req, rsp, changedGroups);LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",clientMd5Map.size(), probeRequestSize, changedGroups.size());return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;}}String ip = RequestUtil.getRemoteIp(req);// 一定要由HTTP线程调用,否则离开后容器会立即发送响应final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制asyncContext.setTimeout(0L);scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}

看了下代码,总结了下固定轮询和非固定轮询的差异:

  1. 固定轮询只有在定时调度任务到了之后才会给出应用端响应,换句话说,如果服务器端修改了配置,固定轮询有很大可能不能立即收到文件变更的响应。
  2. 非固定轮询在询问的时候首先会查看一下配置文件是否发生了改变,如果是的话,就立即给出响应。如果不是的话,则加入定时调度任务。如果定时调度还没有开始执行,这时候服务端修改了配置文件,则会把定时调度任务取消,并且立即给出应用端响应,应用端几乎是实时收到服务端给出的文件变更响应。

默认是非固定轮询,首先查看应用端关心的配置文件是否发生了改变,即 changedGroups.size() 是否大于 0 。如果配置文件发生了改变的话,则立即给出响应,并告诉应用,哪些文件发生了改变。

如果配置文件没有改变,并且 noHangUpFlag 为 true 的时候,记录完日志之后就 return 了,由于没有启用异步调用,所以给应用响应的是空。

一般是在应用启动的时候 noHangUpFlag 才为真,应用首次加载配置文件,这个时候不能一直处于等待状态。

再接着往下看,如果配置文件没有改变并且应用端允许服务端挂起自己时,则会让线程池调度任务立即执行 ClientLongPolling 任务。

        String ip = RequestUtil.getRemoteIp(req);// 一定要由HTTP线程调用,否则离开后容器会立即发送响应final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制asyncContext.setTimeout(0L);scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));

ClientLongPolling 是 LongPollingService 的一个内部类:

package com.alibaba.nacos.config.server.service;import com.alibaba.nacos.config.server.model.SampleResult;import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;import com.alibaba.nacos.config.server.monitor.MetricsMonitor;import com.alibaba.nacos.config.server.utils.GroupKey;import com.alibaba.nacos.config.server.utils.LogUtil;import com.alibaba.nacos.config.server.utils.MD5Util;import com.alibaba.nacos.config.server.utils.RequestUtil;import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener;import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event;import org.apache.commons.lang3.StringUtils;import org.springframework.stereotype.Service;import javax.servlet.AsyncContext;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.util.*;import java.util.concurrent.*;import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;import static com.alibaba.nacos.config.server.utils.LogUtil.pullLog;/*** 长轮询服务。负责处理** @author Nacos*/@Servicepublic class LongPollingService extends AbstractEventListener {private static final int FIXED_POLLING_INTERVAL_MS = 10000;private static final int SAMPLE_PERIOD = 100;private static final int SAMPLE_TIMES = 3;private static final String TRUE_STR = "true";private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>();private static boolean isFixedPolling() {return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false);}private static int getFixedPollingInterval() {return SwitchService.getSwitchInteger(SwitchService.FIXED_POLLING_INTERVAL, FIXED_POLLING_INTERVAL_MS);}public boolean isClientLongPolling(String clientIp) {return getClientPollingRecord(clientIp) != null;}public Map<String, String> getClientSubConfigInfo(String clientIp) {ClientLongPolling record = getClientPollingRecord(clientIp);if (record == null) {return Collections.<String, String>emptyMap();}return record.clientMd5Map;}public SampleResult getSubscribleInfo(String dataId, String group, String tenant) {String groupKey = GroupKey.getKeyTenant(dataId, group, tenant);SampleResult sampleResult = new SampleResult();Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);for (ClientLongPolling clientLongPolling : allSubs) {if (clientLongPolling.clientMd5Map.containsKey(groupKey)) {lisentersGroupkeyStatus.put(clientLongPolling.ip, clientLongPolling.clientMd5Map.get(groupKey));}}sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);return sampleResult;}public SampleResult getSubscribleInfoByIp(String clientIp) {SampleResult sampleResult = new SampleResult();Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);for (ClientLongPolling clientLongPolling : allSubs) {if (clientLongPolling.ip.equals(clientIp)) {// 一个ip可能有多个监听if (!lisentersGroupkeyStatus.equals(clientLongPolling.clientMd5Map)) {lisentersGroupkeyStatus.putAll(clientLongPolling.clientMd5Map);}}}sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);return sampleResult;}/*** 聚合采样结果中的采样ip和监听配置的信息;合并策略用后面的覆盖前面的是没有问题的** @param sampleResults sample Results* @return Results*/public SampleResult mergeSampleResult(List<SampleResult> sampleResults) {SampleResult mergeResult = new SampleResult();Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);for (SampleResult sampleResult : sampleResults) {Map<String, String> lisentersGroupkeyStatusTmp = sampleResult.getLisentersGroupkeyStatus();for (Map.Entry<String, String> entry : lisentersGroupkeyStatusTmp.entrySet()) {lisentersGroupkeyStatus.put(entry.getKey(), entry.getValue());}}mergeResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);return mergeResult;}public Map<String, Set<String>> collectApplicationSubscribeConfigInfos() {if (allSubs == null || allSubs.isEmpty()) {return null;}HashMap<String, Set<String>> app2Groupkeys = new HashMap<String, Set<String>>(50);for (ClientLongPolling clientLongPolling : allSubs) {if (StringUtils.isEmpty(clientLongPolling.appName) || "unknown".equalsIgnoreCase(clientLongPolling.appName)) {continue;}Set<String> appSubscribeConfigs = app2Groupkeys.get(clientLongPolling.appName);Set<String> clientSubscribeConfigs = clientLongPolling.clientMd5Map.keySet();if (appSubscribeConfigs == null) {appSubscribeConfigs = new HashSet<String>(clientSubscribeConfigs.size());}appSubscribeConfigs.addAll(clientSubscribeConfigs);app2Groupkeys.put(clientLongPolling.appName, appSubscribeConfigs);}return app2Groupkeys;}public SampleResult getCollectSubscribleInfo(String dataId, String group, String tenant) {List<SampleResult> sampleResultLst = new ArrayList<SampleResult>(50);for (int i = 0; i < SAMPLE_TIMES; i++) {SampleResult sampleTmp = getSubscribleInfo(dataId, group, tenant);if (sampleTmp != null) {sampleResultLst.add(sampleTmp);}if (i < SAMPLE_TIMES - 1) {try {Thread.sleep(SAMPLE_PERIOD);} catch (InterruptedException e) {LogUtil.clientLog.error("sleep wrong", e);}}}SampleResult sampleResult = mergeSampleResult(sampleResultLst);return sampleResult;}public SampleResult getCollectSubscribleInfoByIp(String ip) {SampleResult sampleResult = new SampleResult();sampleResult.setLisentersGroupkeyStatus(new HashMap<String, String>(50));for (int i = 0; i < SAMPLE_TIMES; i++) {SampleResult sampleTmp = getSubscribleInfoByIp(ip);if (sampleTmp != null) {if (sampleTmp.getLisentersGroupkeyStatus() != null&& !sampleResult.getLisentersGroupkeyStatus().equals(sampleTmp.getLisentersGroupkeyStatus())) {sampleResult.getLisentersGroupkeyStatus().putAll(sampleTmp.getLisentersGroupkeyStatus());}}if (i < SAMPLE_TIMES - 1) {try {Thread.sleep(SAMPLE_PERIOD);} catch (InterruptedException e) {LogUtil.clientLog.error("sleep wrong", e);}}}return sampleResult;}private ClientLongPolling getClientPollingRecord(String clientIp) {if (allSubs == null) {return null;}for (ClientLongPolling clientLongPolling : allSubs) {HttpServletRequest request = (HttpServletRequest) clientLongPolling.asyncContext.getRequest();if (clientIp.equals(RequestUtil.getRemoteIp(request))) {return clientLongPolling;}}return null;}public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);/*** 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance*/long timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// do nothing but set fix polling timeout} else {long start = System.currentTimeMillis();List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);if (changedGroups.size() > 0) {generateResponse(req, rsp, changedGroups);LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",clientMd5Map.size(), probeRequestSize, changedGroups.size());return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;}}String ip = RequestUtil.getRemoteIp(req);// 一定要由HTTP线程调用,否则离开后容器会立即发送响应final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制asyncContext.setTimeout(0L);scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}@Overridepublic List<Class<? extends Event>> interest() {List<Class<? extends Event>> eventTypes = new ArrayList<Class<? extends Event>>();eventTypes.add(LocalDataChangeEvent.class);return eventTypes;}@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// ignore} else {if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent)event;scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}static public boolean isSupportLongPolling(HttpServletRequest req) {return null != req.getHeader(LONG_POLLING_HEADER);}@SuppressWarnings("PMD.ThreadPoolCreationRule")public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("com.alibaba.nacos.LongPolling");return t;}});scheduler.scheduleWithFixedDelay(new StatTask(), 0L, 10L, TimeUnit.SECONDS);}// =================static public final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";static public final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";final ScheduledExecutorService scheduler;/*** 长轮询订阅关系*/final Queue<ClientLongPolling> allSubs;// =================class DataChangeTask implements Runnable {@Overridepublic void run() {try {ConfigCacheService.getContentBetaMd5(groupKey);for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();if (clientSub.clientMd5Map.containsKey(groupKey)) {// 如果beta发布且不在beta列表直接跳过if (isBeta && !betaIps.contains(clientSub.ip)) {continue;}// 如果tag发布且不在tag列表直接跳过if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {continue;}getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // 删除订阅关系LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - changeTime),"in-advance",RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),"polling",clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());}}DataChangeTask(String groupKey) {this(groupKey, false, null);}DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) {this(groupKey, isBeta, betaIps, null);}DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps, String tag) {this.groupKey = groupKey;this.isBeta = isBeta;this.betaIps = betaIps;this.tag = tag;}final String groupKey;final long changeTime = System.currentTimeMillis();final boolean isBeta;final List<String> betaIps;final String tag;}// =================class StatTask implements Runnable {@Overridepublic void run() {memoryLog.info("[long-pulling] client count " + allSubs.size());MetricsMonitor.getLongPollingMonitor().set(allSubs.size());}}// =================class ClientLongPolling implements Runnable {@Overridepublic void run() {asyncTimeoutFuture = scheduler.schedule(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());/*** 删除订阅关系*/allSubs.remove(ClientLongPolling.this);if (isFixedPolling()) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest)asyncContext.getRequest(),(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);if (changedGroups.size() > 0) {sendResponse(changedGroups);} else {sendResponse(null);}} else {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);sendResponse(null);}} catch (Throwable t) {LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());}}}, timeoutTime, TimeUnit.MILLISECONDS);allSubs.add(this);}void sendResponse(List<String> changedGroups) {/***  取消超时任务*/if (null != asyncTimeoutFuture) {asyncTimeoutFuture.cancel(false);}generateResponse(changedGroups);}void generateResponse(List<String> changedGroups) {if (null == changedGroups) {/*** 告诉容器发送HTTP响应*/asyncContext.complete();return;}HttpServletResponse response = (HttpServletResponse)asyncContext.getResponse();try {String respString = MD5Util.compareMd5ResultString(changedGroups);// 禁用缓存response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);response.getWriter().println(respString);asyncContext.complete();} catch (Exception se) {pullLog.error(se.toString(), se);asyncContext.complete();}}ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {this.asyncContext = ac;this.clientMd5Map = clientMd5Map;this.probeRequestSize = probeRequestSize;this.createTime = System.currentTimeMillis();this.ip = ip;this.timeoutTime = timeoutTime;this.appName = appName;this.tag = tag;}// =================final AsyncContext asyncContext;final Map<String, String> clientMd5Map;final long createTime;final String ip;final String appName;final String tag;final int probeRequestSize;final long timeoutTime;Future<?> asyncTimeoutFuture;}void generateResponse(HttpServletRequest request, HttpServletResponse response, List<String> changedGroups) {if (null == changedGroups) {return;}try {String respString = MD5Util.compareMd5ResultString(changedGroups);// 禁用缓存response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);response.getWriter().println(respString);} catch (Exception se) {pullLog.error(se.toString(), se);}}public Map<String, Long> getRetainIps() {return retainIps;}public void setRetainIps(Map<String, Long> retainIps) {this.retainIps = retainIps;}}

代码挺长的,在这里我们先看 ClientLongPolling 的 run() 方法。

    @Overridepublic void run() {asyncTimeoutFuture = scheduler.schedule(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());/*** 删除订阅关系*/allSubs.remove(ClientLongPolling.this);if (isFixedPolling()) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest)asyncContext.getRequest(),(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);if (changedGroups.size() > 0) {sendResponse(changedGroups);} else {sendResponse(null);}} else {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);sendResponse(null);}} catch (Throwable t) {LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());}}}, timeoutTime, TimeUnit.MILLISECONDS);allSubs.add(this);}

在 run() 方法里做了两件事:

  1. 第一件事就是用线程池调度任务又调度了一个任务。
  2. 第二件事就是把 ClientLongPolling 加入到 allSubs 客户端订阅集合中。

先看第一件事,过 timeoutTime 时间后会执行这个任务,再看这个任务干了些啥,挑重点的看一下:

   allSubs.remove(ClientLongPolling.this);

删除订阅关系,为什么要删除呢?因为客户端会长轮询,并不是服务端推送给客户端,如果不删除的话,客户端每轮询一次就往进去添加一次, allSubs 会越来越大,最终结果就是把内存撑爆了。

如果是固定轮询的话,则检查应用端关心的文件是否发生了改变,如果改变了的话则把改变了的文件 key 信息响应给应用,如果没改变的话,则响应数据为空。

因为服务端在修改配置文件的时候不会通知固定轮询,所以固定轮询只能在定时调度任务执行的时候去检查是否文件发生了变更,检查的间隔时间是固定的,因此叫固定轮询。

如果是非固定轮询的话,则给应用一个空的响应。

为什么非固定轮询给一个空的响应呢?因为非固定轮询在询问之初就会检查一下配置文件是否发生了改变,如果发生了改变了的话就会立即给出响应。还有就是应用处于等待状态,在这个等待过程中如果服务端修改了配置文件则会取消定时调度任务,然后立即给出响应。所以在定时调度任务中给出空的响应,文件发生了变更会在别的地方给出响应的。

再接着看看第二件事:

 allSubs.add(this);

为什么要把 this(ClientLongPolling) 加入到 allSubs 中呢?这儿埋下了伏笔,为以后服务端修改了配置文件后取消定时任务/立即给出应用端响应埋下了伏笔。

画了一个简单的流程图,省略了一些内容,保留了主干流程来帮助大家理解。

在这里插入图片描述

涉及到的内容不少,还有一大片代码,剩下的部分会在下一篇中讲述。服务端在修改配置文件的时候做了哪些事。

这篇关于nacos 原理之为什么修改了配置文件后应用端会立刻生效-服务端篇1的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/799277

相关文章

修改若依框架Token的过期时间问题

《修改若依框架Token的过期时间问题》本文介绍了如何修改若依框架中Token的过期时间,通过修改`application.yml`文件中的配置来实现,默认单位为分钟,希望此经验对大家有所帮助,也欢迎... 目录修改若依框架Token的过期时间修改Token的过期时间关闭Token的过期时js间总结修改若依

MySQL修改密码的四种实现方式

《MySQL修改密码的四种实现方式》文章主要介绍了如何使用命令行工具修改MySQL密码,包括使用`setpassword`命令和`mysqladmin`命令,此外,还详细描述了忘记密码时的处理方法,包... 目录mysql修改密码四种方式一、set password命令二、使用mysqladmin三、修改u

使用Python在Excel中插入、修改、提取和删除超链接

《使用Python在Excel中插入、修改、提取和删除超链接》超链接是Excel中的常用功能,通过点击超链接可以快速跳转到外部网站、本地文件或工作表中的特定单元格,有效提升数据访问的效率和用户体验,这... 目录引言使用工具python在Excel中插入超链接Python修改Excel中的超链接Python

MySQL中的MVCC底层原理解读

《MySQL中的MVCC底层原理解读》本文详细介绍了MySQL中的多版本并发控制(MVCC)机制,包括版本链、ReadView以及在不同事务隔离级别下MVCC的工作原理,通过一个具体的示例演示了在可重... 目录简介ReadView版本链演示过程总结简介MVCC(Multi-Version Concurr

5分钟获取deepseek api并搭建简易问答应用

《5分钟获取deepseekapi并搭建简易问答应用》本文主要介绍了5分钟获取deepseekapi并搭建简易问答应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1、获取api2、获取base_url和chat_model3、配置模型参数方法一:终端中临时将加

JavaScript中的isTrusted属性及其应用场景详解

《JavaScript中的isTrusted属性及其应用场景详解》在现代Web开发中,JavaScript是构建交互式应用的核心语言,随着前端技术的不断发展,开发者需要处理越来越多的复杂场景,例如事件... 目录引言一、问题背景二、isTrusted 属性的来源与作用1. isTrusted 的定义2. 为

Python调用另一个py文件并传递参数常见的方法及其应用场景

《Python调用另一个py文件并传递参数常见的方法及其应用场景》:本文主要介绍在Python中调用另一个py文件并传递参数的几种常见方法,包括使用import语句、exec函数、subproce... 目录前言1. 使用import语句1.1 基本用法1.2 导入特定函数1.3 处理文件路径2. 使用ex

Linux:alias如何设置永久生效

《Linux:alias如何设置永久生效》在Linux中设置别名永久生效的步骤包括:在/root/.bashrc文件中配置别名,保存并退出,然后使用source命令(或点命令)使配置立即生效,这样,别... 目录linux:alias设置永久生效步骤保存退出后功能总结Linux:alias设置永久生效步骤

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制