Spring Cloud Eureka Server 源码解析(八) 统计最后一分钟收到的续约心跳数、定时清除过期 Client

本文主要是介绍Spring Cloud Eureka Server 源码解析(八) 统计最后一分钟收到的续约心跳数、定时清除过期 Client,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.入口

入口找到自动配置类EurekaServerAutoConfiguration:
在这里插入图片描述
在这里插入图片描述

//EurekaServerInitializerConfiguration.java
public void start() {new Thread(() -> {try {// TODO: is this class even needed now?//其他内容就不看了,直接找到我们需要关注的两个任务//看contextInitialized方法eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));EurekaServerInitializerConfiguration.this.running = true;publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {// Help!log.error("Could not initialize Eureka servlet context", ex);}}).start();
}//EurekaServerBootstrap.java
public void contextInitialized(ServletContext context) {try {initEurekaEnvironment();//看这,初始化EurekaServer上下文initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}
}//EurekaServerBootstrap.java
protected void initEurekaServerContext() throws Exception {// For backward compatibilityJsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);if (isAws(this.applicationInfoManager.getInfo())) {this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,this.eurekaClientConfig, this.registry, this.applicationInfoManager);this.awsBinder.start();}EurekaServerContextHolder.initialize(this.serverContext);log.info("Initialized server context");// Copy registry from neighboring eureka node// 从邻近的eureka节点复制注册表// 从对等eureka节点填充注册表信息。如果通信失败,此操作将失// 败转移到其他节点,直到列表耗尽为止。// registryCount:表示从其他eureka节点下载到的注册信息中所有实例数量int registryCount = this.registry.syncUp();// 走这,打开交通this.registry.openForTraffic(this.applicationInfoManager, registryCount);// Register all monitoring statistics.EurekaMonitors.registerAllStats();
}//InstanceRegistry.java
/*** If* {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)}* is called with a zero argument, it means that leases are not automatically* cancelled if the instance hasn't sent any renewals recently. This happens for a* standalone server. It seems like a bad default, so we set it to the smallest* non-zero value we can, so that any instances that subsequently register can bump up* the threshold.* 如果用一个零参数调用{@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)},* 这意味着如果实例最近没有发送任何续订,租约不会自动取消。这种情况发生在独立服务器上。* 这似乎是一个糟糕的默认值,因此我们将其设置为尽可能小的非零值,以便随后注册的任何实例都可以提高阈值。*/
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {//count值,是服务端一开始启动的时候 从其他eureka节点下载到的注册信息中所有实例数量//defaultOpenForTrafficCount默认为1super.openForTraffic(applicationInfoManager,count == 0 ? this.defaultOpenForTrafficCount : count);
}//PeerAwareInstanceRegistryImpl.java
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// Renewals happen every 30 seconds and for a minute it should be a factor of 2.// 如果每30秒发生一次续约操作,一分钟内应该是2次数this.expectedNumberOfClientsSendingRenews = count;// 更新预期每分钟收到续约请求数的阈值updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {//count值,是服务端一开始启动的时候 从其他eureka节点下载到的注册信息中所有实例数量//如果服务器无法在启动时从对等eureka节点获取注册表信息//那么在指定的期限内,服务器不会返回注册表信息(全量下载中提到过)this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");//设置此实例的状态。应用程序可以使用它来指示是否已准备好接收流量。//在这里设置状态还会通知所有已注册的侦听器状态更改事件。applicationInfoManager.setInstanceStatus(InstanceStatus.UP);//看supersuper.postInit();
}//AbstractInstanceRegistry.java
protected void postInit() {//统计最后一分钟收到的续约心跳数的定时任务renewsLastMin.start();//evictionTaskRef是个原子引用,放的就是定时清除过期clien的任务,驱逐任务if (evictionTaskRef.get() != null) {//取消之前已经存在的驱逐任务evictionTaskRef.get().cancel();}//新建一个驱逐任务evictionTaskRef.set(new EvictionTask());//启动驱逐任务,repeated任务,固定时间循环定时执行//serverConfig.getEvictionIntervalTimerInMs():驱逐任务循环执行的间隔时间evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}

2. 统计最后一分钟收到的续约心跳数

先看当前最后一分钟收到的续约心跳数 统计的定时任务:
在这里插入图片描述
看一下启动的start方法:

//MeasuredRate.java
public synchronized void start() {if (!isActive) {timer.schedule(new TimerTask() {@Overridepublic void run() {try {// Zero out the current bucket.// lastBucket就是当前最后一分钟收到的续约心跳数// currentBucket是正在统计中收到的续约心跳数// 可以看到每60秒就会从currentBucket获取统计的数量,赋值给lastBucket// 同时会清空currentBucket已经统计的数量lastBucket.set(currentBucket.getAndSet(0));} catch (Throwable e) {logger.error("Cannot reset the Measured Rate", e);}} //sampleInterval就是传入的60秒,每60秒统计一次}, sampleInterval, sampleInterval);isActive = true;}
}

看一下哪些点会增加统计数量:
在这里插入图片描述
注意,图中两个调用点,分别是两个任务实例,分开统计的。

3. 定时清除过期 Client

现在看定时清除过期 Client的任务:

//AbstractInstanceRegistry.java
protected void postInit() {//当前最后一分钟收到的续约心跳数 统计的定时任务renewsLastMin.start();//evictionTaskRef是个原子引用,放的就是定时清除过期clien的任务,驱逐任务if (evictionTaskRef.get() != null) {//取消之前已经存在的驱逐任务evictionTaskRef.get().cancel();}//新建一个驱逐任务evictionTaskRef.set(new EvictionTask());//启动驱逐任务,repeated任务,固定时间循环定时执行//serverConfig.getEvictionIntervalTimerInMs():驱逐任务循环执行的间隔时间evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}

在这里插入图片描述
看EvictionTask:

//AbstractInstanceRegistry.EvictionTask.java(内部类)
class EvictionTask extends TimerTask {//记录上一次执行清除的时间点private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);@Overridepublic void run() {try {//计算补偿时间long compensationTimeMs = getCompensationTimeMs();logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);//执行清除evict(compensationTimeMs);} catch (Throwable e) {logger.error("Could not run the evict task", e);}}...
}

3.1 计算补偿时间

先看计算补偿时间:

//AbstractInstanceRegistry.EvictionTask#getCompensationTimeMs
/*** compute a compensation time defined as the actual time this task was executed since the prev iteration,* vs the configured amount of time for execution. This is useful for cases where changes in time (due to* clock skew or gc for example) causes the actual eviction task to execute later than the desired time* according to the configured cycle.* * 计算补偿时间,该补偿时间定义为自上次迭代以来此任务实际执行的时间,与配置的执行时间之比。 * 当时间变化(例如由于时钟偏斜或gc)导致实际驱逐任务的执行时间晚于所需时间(根据配置的周期)* 时,此功能很有用。*/
long getCompensationTimeMs() {//获取当前时间long currNanos = getCurrentTimeNano();//获取上一次清除开始的时间,并赋值这一次的开始时间//第一次执行获取是0long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);if (lastNanos == 0l) {return 0l;}//本次开始执行的时间 距离 上一次开始执行的时间,大部分情况下应该就是定时任务执行的时间间隔//但是实际执行的时间是有可能超过配置的定时时间间隔,这个时候就可以理解为清除任务执行完的耗时时间//这个时间最低不会低于定时任务的定时间隔时间long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);//减去配置的定时时间间隔,如果大于0,说明上一次执行任务的耗时超过了定时时间间隔//导致此次执行任务的时间晚于应该执行的时间,所以要加上这个补偿时间//在这个补偿时间之内的client也是过期的,也需要清楚long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();return compensationTime <= 0l ? 0l : compensationTime;
}

关于定时任务的验证:

public static void main(String[] args) {Timer timer = new Timer("test Timer", false);//这个定时任务每两秒执行一次,前两次执行耗时要3秒,之后耗时只要1秒timer.schedule(new TimerTask() {private int time = 0;@Overridepublic void run() {System.out.println("start:" + new Date().toString());try {if (time > 1) {Thread.sleep(1000);} else {//前两次定时任务需要执行3秒Thread.sleep(3000);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end:" + new Date().toString());time++;}},1000,2000);
}

在这里插入图片描述

3.2 清除过期client

现在看清除方法,看下这个补偿时间怎么用的:
在这里插入图片描述

//AbstractInstanceRegistry.java
public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");//是否允许续约过期,允许为true,不允许为false//底层其实就是判断自我保护机制启动没有,false不允许续约过期,其实就是自我保护机制启动了//自我保护机制开启的规则是:最后一分钟收到的续约心跳数 低于  预期每分钟收到的续约心跳数//这些前面几章都说过就不看了if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");return;}// We collect first all expired items, to evict them in random order. For large eviction sets,// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,// the impact should be evenly distributed across all applications.// 我们首先收集所有过期的元素,以随机顺序将其逐出。 对于大型驱逐集,如果不这样做,// 我们可能会在自我保护开始之前就清除整个应用程序。通过将其随机化,影响应平均分布// 在所有应用程序中。List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();//registry,本地注册表for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {//遍历获取内层mapMap<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if (leaseMap != null) {for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();//判断是否过期,过期加入expiredLeases集合//看到过期方法,传入了补偿时间if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {expiredLeases.add(lease);}}}}// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for// triggering self-preservation. Without that we would wipe out full registry.// 为了补偿GC暂停或本地时间漂移,我们需要使用当前注册表大小作为触发自我保护的基础。 // 否则,我们将清除完整的注册表。int registrySize = (int) getLocalRegistrySize();//当前注册表中所有实例数量int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());//保护机制开启的阈值//计算允许清除的最多数量int evictionLimit = registrySize - registrySizeThreshold;//已经过期的数量  和 允许清除的最多数量 之间  取最小值int toEvict = Math.min(expiredLeases.size(), evictionLimit);if (toEvict > 0) {logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);Random random = new Random(System.currentTimeMillis());for (int i = 0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)int next = i + random.nextInt(expiredLeases.size() - i);//在指定列表中的指定位置交换元素。为了随机删除Collections.swap(expiredLeases, i, next);//获取交换后的进行删除Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();logger.warn("DS: Registry: expired lease for {}/{}", appName, id);//内部清除internalCancel(appName, id, false);}}
}

3.2.1 判断是否过期

//Lease.java
public boolean isExpired(long additionalLeaseMs) {//evictionTimestamp:驱逐时间,下架的时候会赋值,不为空说明已经下架了//lastUpdateTimestamp:最近一次心跳续约时间//duration:续约的过期时间(默认90秒,超过该时间认为挂了)//additionalLeaseMs:补偿时间,补偿上一次清除任务执行的耗时超过了定时执行的间隔时间的时间//有驱逐时间说明被下架了,认为过期//超过指定时间没有收到心跳,认为过期return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}

3.2.2 内部清除(和下架方法是一个方法,分析过了)

//AbstractInstanceRegistry.java
protected boolean internalCancel(String appName, String id, boolean isReplication) {try {read.lock();//读锁CANCEL.increment(isReplication);//下架操作计数器+1Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//记录删除的续约对象Lease<InstanceInfo> leaseToCancel = null;if (gMap != null) {//核心方法就是这个removeleaseToCancel = gMap.remove(id);}synchronized (recentCanceledQueue) {recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));}//overriddenStatus的清除InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}if (leaseToCancel == null) {//下架的时候发现注册表中已经没有了CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {//续约对象里有一个时间戳专门记录 下架时间的 evictionTimestamp//通过更新驱逐时间来取消租约。leaseToCancel.cancel();InstanceInfo instanceInfo = leaseToCancel.getHolder();String vip = null;String svip = null;if (instanceInfo != null) {//记录行为类型instanceInfo.setActionType(ActionType.DELETED);//加入最近更新队列recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));//更新服务端最新修改时间instanceInfo.setLastUpdatedTimestamp();//获取此实例的虚拟Internet协议地址。 如果未指定,则默认为主机名。vip = instanceInfo.getVIPAddress();//获取此实例的安全虚拟Internet协议地址。如果未指定,则默认为主机名。svip = instanceInfo.getSecureVipAddress();}//是对应微服务相关缓存失效invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);return true;}} finally {read.unlock();}
}

这篇关于Spring Cloud Eureka Server 源码解析(八) 统计最后一分钟收到的续约心跳数、定时清除过期 Client的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/206729

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

hdu1496(用hash思想统计数目)

作为一个刚学hash的孩子,感觉这道题目很不错,灵活的运用的数组的下标。 解题步骤:如果用常规方法解,那么时间复杂度为O(n^4),肯定会超时,然后参考了网上的解题方法,将等式分成两个部分,a*x1^2+b*x2^2和c*x3^2+d*x4^2, 各自作为数组的下标,如果两部分相加为0,则满足等式; 代码如下: #include<iostream>#include<algorithm