本文主要是介绍Spring Cloud Eureka Server 源码解析(八) 统计最后一分钟收到的续约心跳数、定时清除过期 Client,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.入口
入口找到自动配置类EurekaServerAutoConfiguration:
//EurekaServerInitializerConfiguration.java
public void start() {new Thread(() -> {try {// TODO: is this class even needed now?//其他内容就不看了,直接找到我们需要关注的两个任务//看contextInitialized方法eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));EurekaServerInitializerConfiguration.this.running = true;publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {// Help!log.error("Could not initialize Eureka servlet context", ex);}}).start();
}//EurekaServerBootstrap.java
public void contextInitialized(ServletContext context) {try {initEurekaEnvironment();//看这,初始化EurekaServer上下文initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}
}//EurekaServerBootstrap.java
protected void initEurekaServerContext() throws Exception {// For backward compatibilityJsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);if (isAws(this.applicationInfoManager.getInfo())) {this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,this.eurekaClientConfig, this.registry, this.applicationInfoManager);this.awsBinder.start();}EurekaServerContextHolder.initialize(this.serverContext);log.info("Initialized server context");// Copy registry from neighboring eureka node// 从邻近的eureka节点复制注册表// 从对等eureka节点填充注册表信息。如果通信失败,此操作将失// 败转移到其他节点,直到列表耗尽为止。// registryCount:表示从其他eureka节点下载到的注册信息中所有实例数量int registryCount = this.registry.syncUp();// 走这,打开交通this.registry.openForTraffic(this.applicationInfoManager, registryCount);// Register all monitoring statistics.EurekaMonitors.registerAllStats();
}//InstanceRegistry.java
/*** If* {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)}* is called with a zero argument, it means that leases are not automatically* cancelled if the instance hasn't sent any renewals recently. This happens for a* standalone server. It seems like a bad default, so we set it to the smallest* non-zero value we can, so that any instances that subsequently register can bump up* the threshold.* 如果用一个零参数调用{@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)},* 这意味着如果实例最近没有发送任何续订,租约不会自动取消。这种情况发生在独立服务器上。* 这似乎是一个糟糕的默认值,因此我们将其设置为尽可能小的非零值,以便随后注册的任何实例都可以提高阈值。*/
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {//count值,是服务端一开始启动的时候 从其他eureka节点下载到的注册信息中所有实例数量//defaultOpenForTrafficCount默认为1super.openForTraffic(applicationInfoManager,count == 0 ? this.defaultOpenForTrafficCount : count);
}//PeerAwareInstanceRegistryImpl.java
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// Renewals happen every 30 seconds and for a minute it should be a factor of 2.// 如果每30秒发生一次续约操作,一分钟内应该是2次数this.expectedNumberOfClientsSendingRenews = count;// 更新预期每分钟收到续约请求数的阈值updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {//count值,是服务端一开始启动的时候 从其他eureka节点下载到的注册信息中所有实例数量//如果服务器无法在启动时从对等eureka节点获取注册表信息//那么在指定的期限内,服务器不会返回注册表信息(全量下载中提到过)this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");//设置此实例的状态。应用程序可以使用它来指示是否已准备好接收流量。//在这里设置状态还会通知所有已注册的侦听器状态更改事件。applicationInfoManager.setInstanceStatus(InstanceStatus.UP);//看supersuper.postInit();
}//AbstractInstanceRegistry.java
protected void postInit() {//统计最后一分钟收到的续约心跳数的定时任务renewsLastMin.start();//evictionTaskRef是个原子引用,放的就是定时清除过期clien的任务,驱逐任务if (evictionTaskRef.get() != null) {//取消之前已经存在的驱逐任务evictionTaskRef.get().cancel();}//新建一个驱逐任务evictionTaskRef.set(new EvictionTask());//启动驱逐任务,repeated任务,固定时间循环定时执行//serverConfig.getEvictionIntervalTimerInMs():驱逐任务循环执行的间隔时间evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}
2. 统计最后一分钟收到的续约心跳数
先看当前最后一分钟收到的续约心跳数 统计的定时任务:
看一下启动的start方法:
//MeasuredRate.java
public synchronized void start() {if (!isActive) {timer.schedule(new TimerTask() {@Overridepublic void run() {try {// Zero out the current bucket.// lastBucket就是当前最后一分钟收到的续约心跳数// currentBucket是正在统计中收到的续约心跳数// 可以看到每60秒就会从currentBucket获取统计的数量,赋值给lastBucket// 同时会清空currentBucket已经统计的数量lastBucket.set(currentBucket.getAndSet(0));} catch (Throwable e) {logger.error("Cannot reset the Measured Rate", e);}} //sampleInterval就是传入的60秒,每60秒统计一次}, sampleInterval, sampleInterval);isActive = true;}
}
看一下哪些点会增加统计数量:
注意,图中两个调用点,分别是两个任务实例,分开统计的。
3. 定时清除过期 Client
现在看定时清除过期 Client的任务:
//AbstractInstanceRegistry.java
protected void postInit() {//当前最后一分钟收到的续约心跳数 统计的定时任务renewsLastMin.start();//evictionTaskRef是个原子引用,放的就是定时清除过期clien的任务,驱逐任务if (evictionTaskRef.get() != null) {//取消之前已经存在的驱逐任务evictionTaskRef.get().cancel();}//新建一个驱逐任务evictionTaskRef.set(new EvictionTask());//启动驱逐任务,repeated任务,固定时间循环定时执行//serverConfig.getEvictionIntervalTimerInMs():驱逐任务循环执行的间隔时间evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}
看EvictionTask:
//AbstractInstanceRegistry.EvictionTask.java(内部类)
class EvictionTask extends TimerTask {//记录上一次执行清除的时间点private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);@Overridepublic void run() {try {//计算补偿时间long compensationTimeMs = getCompensationTimeMs();logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);//执行清除evict(compensationTimeMs);} catch (Throwable e) {logger.error("Could not run the evict task", e);}}...
}
3.1 计算补偿时间
先看计算补偿时间:
//AbstractInstanceRegistry.EvictionTask#getCompensationTimeMs
/*** compute a compensation time defined as the actual time this task was executed since the prev iteration,* vs the configured amount of time for execution. This is useful for cases where changes in time (due to* clock skew or gc for example) causes the actual eviction task to execute later than the desired time* according to the configured cycle.* * 计算补偿时间,该补偿时间定义为自上次迭代以来此任务实际执行的时间,与配置的执行时间之比。 * 当时间变化(例如由于时钟偏斜或gc)导致实际驱逐任务的执行时间晚于所需时间(根据配置的周期)* 时,此功能很有用。*/
long getCompensationTimeMs() {//获取当前时间long currNanos = getCurrentTimeNano();//获取上一次清除开始的时间,并赋值这一次的开始时间//第一次执行获取是0long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);if (lastNanos == 0l) {return 0l;}//本次开始执行的时间 距离 上一次开始执行的时间,大部分情况下应该就是定时任务执行的时间间隔//但是实际执行的时间是有可能超过配置的定时时间间隔,这个时候就可以理解为清除任务执行完的耗时时间//这个时间最低不会低于定时任务的定时间隔时间long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);//减去配置的定时时间间隔,如果大于0,说明上一次执行任务的耗时超过了定时时间间隔//导致此次执行任务的时间晚于应该执行的时间,所以要加上这个补偿时间//在这个补偿时间之内的client也是过期的,也需要清楚long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();return compensationTime <= 0l ? 0l : compensationTime;
}
关于定时任务的验证:
public static void main(String[] args) {Timer timer = new Timer("test Timer", false);//这个定时任务每两秒执行一次,前两次执行耗时要3秒,之后耗时只要1秒timer.schedule(new TimerTask() {private int time = 0;@Overridepublic void run() {System.out.println("start:" + new Date().toString());try {if (time > 1) {Thread.sleep(1000);} else {//前两次定时任务需要执行3秒Thread.sleep(3000);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end:" + new Date().toString());time++;}},1000,2000); }
3.2 清除过期client
现在看清除方法,看下这个补偿时间怎么用的:
//AbstractInstanceRegistry.java
public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");//是否允许续约过期,允许为true,不允许为false//底层其实就是判断自我保护机制启动没有,false不允许续约过期,其实就是自我保护机制启动了//自我保护机制开启的规则是:最后一分钟收到的续约心跳数 低于 预期每分钟收到的续约心跳数//这些前面几章都说过就不看了if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");return;}// We collect first all expired items, to evict them in random order. For large eviction sets,// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,// the impact should be evenly distributed across all applications.// 我们首先收集所有过期的元素,以随机顺序将其逐出。 对于大型驱逐集,如果不这样做,// 我们可能会在自我保护开始之前就清除整个应用程序。通过将其随机化,影响应平均分布// 在所有应用程序中。List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();//registry,本地注册表for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {//遍历获取内层mapMap<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if (leaseMap != null) {for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();//判断是否过期,过期加入expiredLeases集合//看到过期方法,传入了补偿时间if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {expiredLeases.add(lease);}}}}// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for// triggering self-preservation. Without that we would wipe out full registry.// 为了补偿GC暂停或本地时间漂移,我们需要使用当前注册表大小作为触发自我保护的基础。 // 否则,我们将清除完整的注册表。int registrySize = (int) getLocalRegistrySize();//当前注册表中所有实例数量int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());//保护机制开启的阈值//计算允许清除的最多数量int evictionLimit = registrySize - registrySizeThreshold;//已经过期的数量 和 允许清除的最多数量 之间 取最小值int toEvict = Math.min(expiredLeases.size(), evictionLimit);if (toEvict > 0) {logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);Random random = new Random(System.currentTimeMillis());for (int i = 0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)int next = i + random.nextInt(expiredLeases.size() - i);//在指定列表中的指定位置交换元素。为了随机删除Collections.swap(expiredLeases, i, next);//获取交换后的进行删除Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();logger.warn("DS: Registry: expired lease for {}/{}", appName, id);//内部清除internalCancel(appName, id, false);}}
}
3.2.1 判断是否过期
//Lease.java
public boolean isExpired(long additionalLeaseMs) {//evictionTimestamp:驱逐时间,下架的时候会赋值,不为空说明已经下架了//lastUpdateTimestamp:最近一次心跳续约时间//duration:续约的过期时间(默认90秒,超过该时间认为挂了)//additionalLeaseMs:补偿时间,补偿上一次清除任务执行的耗时超过了定时执行的间隔时间的时间//有驱逐时间说明被下架了,认为过期//超过指定时间没有收到心跳,认为过期return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
3.2.2 内部清除(和下架方法是一个方法,分析过了)
//AbstractInstanceRegistry.java
protected boolean internalCancel(String appName, String id, boolean isReplication) {try {read.lock();//读锁CANCEL.increment(isReplication);//下架操作计数器+1Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//记录删除的续约对象Lease<InstanceInfo> leaseToCancel = null;if (gMap != null) {//核心方法就是这个removeleaseToCancel = gMap.remove(id);}synchronized (recentCanceledQueue) {recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));}//overriddenStatus的清除InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}if (leaseToCancel == null) {//下架的时候发现注册表中已经没有了CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {//续约对象里有一个时间戳专门记录 下架时间的 evictionTimestamp//通过更新驱逐时间来取消租约。leaseToCancel.cancel();InstanceInfo instanceInfo = leaseToCancel.getHolder();String vip = null;String svip = null;if (instanceInfo != null) {//记录行为类型instanceInfo.setActionType(ActionType.DELETED);//加入最近更新队列recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));//更新服务端最新修改时间instanceInfo.setLastUpdatedTimestamp();//获取此实例的虚拟Internet协议地址。 如果未指定,则默认为主机名。vip = instanceInfo.getVIPAddress();//获取此实例的安全虚拟Internet协议地址。如果未指定,则默认为主机名。svip = instanceInfo.getSecureVipAddress();}//是对应微服务相关缓存失效invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);return true;}} finally {read.unlock();}
}
这篇关于Spring Cloud Eureka Server 源码解析(八) 统计最后一分钟收到的续约心跳数、定时清除过期 Client的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!