Spring Cloud Eureka Server 源码解析(八) 统计最后一分钟收到的续约心跳数、定时清除过期 Client

本文主要是介绍Spring Cloud Eureka Server 源码解析(八) 统计最后一分钟收到的续约心跳数、定时清除过期 Client,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.入口

入口找到自动配置类EurekaServerAutoConfiguration:
在这里插入图片描述
在这里插入图片描述

//EurekaServerInitializerConfiguration.java
public void start() {new Thread(() -> {try {// TODO: is this class even needed now?//其他内容就不看了,直接找到我们需要关注的两个任务//看contextInitialized方法eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));EurekaServerInitializerConfiguration.this.running = true;publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {// Help!log.error("Could not initialize Eureka servlet context", ex);}}).start();
}//EurekaServerBootstrap.java
public void contextInitialized(ServletContext context) {try {initEurekaEnvironment();//看这,初始化EurekaServer上下文initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}
}//EurekaServerBootstrap.java
protected void initEurekaServerContext() throws Exception {// For backward compatibilityJsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);if (isAws(this.applicationInfoManager.getInfo())) {this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,this.eurekaClientConfig, this.registry, this.applicationInfoManager);this.awsBinder.start();}EurekaServerContextHolder.initialize(this.serverContext);log.info("Initialized server context");// Copy registry from neighboring eureka node// 从邻近的eureka节点复制注册表// 从对等eureka节点填充注册表信息。如果通信失败,此操作将失// 败转移到其他节点,直到列表耗尽为止。// registryCount:表示从其他eureka节点下载到的注册信息中所有实例数量int registryCount = this.registry.syncUp();// 走这,打开交通this.registry.openForTraffic(this.applicationInfoManager, registryCount);// Register all monitoring statistics.EurekaMonitors.registerAllStats();
}//InstanceRegistry.java
/*** If* {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)}* is called with a zero argument, it means that leases are not automatically* cancelled if the instance hasn't sent any renewals recently. This happens for a* standalone server. It seems like a bad default, so we set it to the smallest* non-zero value we can, so that any instances that subsequently register can bump up* the threshold.* 如果用一个零参数调用{@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)},* 这意味着如果实例最近没有发送任何续订,租约不会自动取消。这种情况发生在独立服务器上。* 这似乎是一个糟糕的默认值,因此我们将其设置为尽可能小的非零值,以便随后注册的任何实例都可以提高阈值。*/
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {//count值,是服务端一开始启动的时候 从其他eureka节点下载到的注册信息中所有实例数量//defaultOpenForTrafficCount默认为1super.openForTraffic(applicationInfoManager,count == 0 ? this.defaultOpenForTrafficCount : count);
}//PeerAwareInstanceRegistryImpl.java
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// Renewals happen every 30 seconds and for a minute it should be a factor of 2.// 如果每30秒发生一次续约操作,一分钟内应该是2次数this.expectedNumberOfClientsSendingRenews = count;// 更新预期每分钟收到续约请求数的阈值updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {//count值,是服务端一开始启动的时候 从其他eureka节点下载到的注册信息中所有实例数量//如果服务器无法在启动时从对等eureka节点获取注册表信息//那么在指定的期限内,服务器不会返回注册表信息(全量下载中提到过)this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");//设置此实例的状态。应用程序可以使用它来指示是否已准备好接收流量。//在这里设置状态还会通知所有已注册的侦听器状态更改事件。applicationInfoManager.setInstanceStatus(InstanceStatus.UP);//看supersuper.postInit();
}//AbstractInstanceRegistry.java
protected void postInit() {//统计最后一分钟收到的续约心跳数的定时任务renewsLastMin.start();//evictionTaskRef是个原子引用,放的就是定时清除过期clien的任务,驱逐任务if (evictionTaskRef.get() != null) {//取消之前已经存在的驱逐任务evictionTaskRef.get().cancel();}//新建一个驱逐任务evictionTaskRef.set(new EvictionTask());//启动驱逐任务,repeated任务,固定时间循环定时执行//serverConfig.getEvictionIntervalTimerInMs():驱逐任务循环执行的间隔时间evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}

2. 统计最后一分钟收到的续约心跳数

先看当前最后一分钟收到的续约心跳数 统计的定时任务:
在这里插入图片描述
看一下启动的start方法:

//MeasuredRate.java
public synchronized void start() {if (!isActive) {timer.schedule(new TimerTask() {@Overridepublic void run() {try {// Zero out the current bucket.// lastBucket就是当前最后一分钟收到的续约心跳数// currentBucket是正在统计中收到的续约心跳数// 可以看到每60秒就会从currentBucket获取统计的数量,赋值给lastBucket// 同时会清空currentBucket已经统计的数量lastBucket.set(currentBucket.getAndSet(0));} catch (Throwable e) {logger.error("Cannot reset the Measured Rate", e);}} //sampleInterval就是传入的60秒,每60秒统计一次}, sampleInterval, sampleInterval);isActive = true;}
}

看一下哪些点会增加统计数量:
在这里插入图片描述
注意,图中两个调用点,分别是两个任务实例,分开统计的。

3. 定时清除过期 Client

现在看定时清除过期 Client的任务:

//AbstractInstanceRegistry.java
protected void postInit() {//当前最后一分钟收到的续约心跳数 统计的定时任务renewsLastMin.start();//evictionTaskRef是个原子引用,放的就是定时清除过期clien的任务,驱逐任务if (evictionTaskRef.get() != null) {//取消之前已经存在的驱逐任务evictionTaskRef.get().cancel();}//新建一个驱逐任务evictionTaskRef.set(new EvictionTask());//启动驱逐任务,repeated任务,固定时间循环定时执行//serverConfig.getEvictionIntervalTimerInMs():驱逐任务循环执行的间隔时间evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}

在这里插入图片描述
看EvictionTask:

//AbstractInstanceRegistry.EvictionTask.java(内部类)
class EvictionTask extends TimerTask {//记录上一次执行清除的时间点private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);@Overridepublic void run() {try {//计算补偿时间long compensationTimeMs = getCompensationTimeMs();logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);//执行清除evict(compensationTimeMs);} catch (Throwable e) {logger.error("Could not run the evict task", e);}}...
}

3.1 计算补偿时间

先看计算补偿时间:

//AbstractInstanceRegistry.EvictionTask#getCompensationTimeMs
/*** compute a compensation time defined as the actual time this task was executed since the prev iteration,* vs the configured amount of time for execution. This is useful for cases where changes in time (due to* clock skew or gc for example) causes the actual eviction task to execute later than the desired time* according to the configured cycle.* * 计算补偿时间,该补偿时间定义为自上次迭代以来此任务实际执行的时间,与配置的执行时间之比。 * 当时间变化(例如由于时钟偏斜或gc)导致实际驱逐任务的执行时间晚于所需时间(根据配置的周期)* 时,此功能很有用。*/
long getCompensationTimeMs() {//获取当前时间long currNanos = getCurrentTimeNano();//获取上一次清除开始的时间,并赋值这一次的开始时间//第一次执行获取是0long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);if (lastNanos == 0l) {return 0l;}//本次开始执行的时间 距离 上一次开始执行的时间,大部分情况下应该就是定时任务执行的时间间隔//但是实际执行的时间是有可能超过配置的定时时间间隔,这个时候就可以理解为清除任务执行完的耗时时间//这个时间最低不会低于定时任务的定时间隔时间long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);//减去配置的定时时间间隔,如果大于0,说明上一次执行任务的耗时超过了定时时间间隔//导致此次执行任务的时间晚于应该执行的时间,所以要加上这个补偿时间//在这个补偿时间之内的client也是过期的,也需要清楚long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();return compensationTime <= 0l ? 0l : compensationTime;
}

关于定时任务的验证:

public static void main(String[] args) {Timer timer = new Timer("test Timer", false);//这个定时任务每两秒执行一次,前两次执行耗时要3秒,之后耗时只要1秒timer.schedule(new TimerTask() {private int time = 0;@Overridepublic void run() {System.out.println("start:" + new Date().toString());try {if (time > 1) {Thread.sleep(1000);} else {//前两次定时任务需要执行3秒Thread.sleep(3000);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end:" + new Date().toString());time++;}},1000,2000);
}

在这里插入图片描述

3.2 清除过期client

现在看清除方法,看下这个补偿时间怎么用的:
在这里插入图片描述

//AbstractInstanceRegistry.java
public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");//是否允许续约过期,允许为true,不允许为false//底层其实就是判断自我保护机制启动没有,false不允许续约过期,其实就是自我保护机制启动了//自我保护机制开启的规则是:最后一分钟收到的续约心跳数 低于  预期每分钟收到的续约心跳数//这些前面几章都说过就不看了if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");return;}// We collect first all expired items, to evict them in random order. For large eviction sets,// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,// the impact should be evenly distributed across all applications.// 我们首先收集所有过期的元素,以随机顺序将其逐出。 对于大型驱逐集,如果不这样做,// 我们可能会在自我保护开始之前就清除整个应用程序。通过将其随机化,影响应平均分布// 在所有应用程序中。List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();//registry,本地注册表for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {//遍历获取内层mapMap<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if (leaseMap != null) {for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();//判断是否过期,过期加入expiredLeases集合//看到过期方法,传入了补偿时间if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {expiredLeases.add(lease);}}}}// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for// triggering self-preservation. Without that we would wipe out full registry.// 为了补偿GC暂停或本地时间漂移,我们需要使用当前注册表大小作为触发自我保护的基础。 // 否则,我们将清除完整的注册表。int registrySize = (int) getLocalRegistrySize();//当前注册表中所有实例数量int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());//保护机制开启的阈值//计算允许清除的最多数量int evictionLimit = registrySize - registrySizeThreshold;//已经过期的数量  和 允许清除的最多数量 之间  取最小值int toEvict = Math.min(expiredLeases.size(), evictionLimit);if (toEvict > 0) {logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);Random random = new Random(System.currentTimeMillis());for (int i = 0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)int next = i + random.nextInt(expiredLeases.size() - i);//在指定列表中的指定位置交换元素。为了随机删除Collections.swap(expiredLeases, i, next);//获取交换后的进行删除Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();logger.warn("DS: Registry: expired lease for {}/{}", appName, id);//内部清除internalCancel(appName, id, false);}}
}

3.2.1 判断是否过期

//Lease.java
public boolean isExpired(long additionalLeaseMs) {//evictionTimestamp:驱逐时间,下架的时候会赋值,不为空说明已经下架了//lastUpdateTimestamp:最近一次心跳续约时间//duration:续约的过期时间(默认90秒,超过该时间认为挂了)//additionalLeaseMs:补偿时间,补偿上一次清除任务执行的耗时超过了定时执行的间隔时间的时间//有驱逐时间说明被下架了,认为过期//超过指定时间没有收到心跳,认为过期return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}

3.2.2 内部清除(和下架方法是一个方法,分析过了)

//AbstractInstanceRegistry.java
protected boolean internalCancel(String appName, String id, boolean isReplication) {try {read.lock();//读锁CANCEL.increment(isReplication);//下架操作计数器+1Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//记录删除的续约对象Lease<InstanceInfo> leaseToCancel = null;if (gMap != null) {//核心方法就是这个removeleaseToCancel = gMap.remove(id);}synchronized (recentCanceledQueue) {recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));}//overriddenStatus的清除InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}if (leaseToCancel == null) {//下架的时候发现注册表中已经没有了CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {//续约对象里有一个时间戳专门记录 下架时间的 evictionTimestamp//通过更新驱逐时间来取消租约。leaseToCancel.cancel();InstanceInfo instanceInfo = leaseToCancel.getHolder();String vip = null;String svip = null;if (instanceInfo != null) {//记录行为类型instanceInfo.setActionType(ActionType.DELETED);//加入最近更新队列recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));//更新服务端最新修改时间instanceInfo.setLastUpdatedTimestamp();//获取此实例的虚拟Internet协议地址。 如果未指定,则默认为主机名。vip = instanceInfo.getVIPAddress();//获取此实例的安全虚拟Internet协议地址。如果未指定,则默认为主机名。svip = instanceInfo.getSecureVipAddress();}//是对应微服务相关缓存失效invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);return true;}} finally {read.unlock();}
}

这篇关于Spring Cloud Eureka Server 源码解析(八) 统计最后一分钟收到的续约心跳数、定时清除过期 Client的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/206729

相关文章

SpringBoot项目中Maven剔除无用Jar引用的最佳实践

《SpringBoot项目中Maven剔除无用Jar引用的最佳实践》在SpringBoot项目开发中,Maven是最常用的构建工具之一,通过Maven,我们可以轻松地管理项目所需的依赖,而,... 目录1、引言2、Maven 依赖管理的基础概念2.1 什么是 Maven 依赖2.2 Maven 的依赖传递机

SpringBoot实现动态插拔的AOP的完整案例

《SpringBoot实现动态插拔的AOP的完整案例》在现代软件开发中,面向切面编程(AOP)是一种非常重要的技术,能够有效实现日志记录、安全控制、性能监控等横切关注点的分离,在传统的AOP实现中,切... 目录引言一、AOP 概述1.1 什么是 AOP1.2 AOP 的典型应用场景1.3 为什么需要动态插

Linux中shell解析脚本的通配符、元字符、转义符说明

《Linux中shell解析脚本的通配符、元字符、转义符说明》:本文主要介绍shell通配符、元字符、转义符以及shell解析脚本的过程,通配符用于路径扩展,元字符用于多命令分割,转义符用于将特殊... 目录一、linux shell通配符(wildcard)二、shell元字符(特殊字符 Meta)三、s

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

数据库oracle用户密码过期查询及解决方案

《数据库oracle用户密码过期查询及解决方案》:本文主要介绍如何处理ORACLE数据库用户密码过期和修改密码期限的问题,包括创建用户、赋予权限、修改密码、解锁用户和设置密码期限,文中通过代码介绍... 目录前言一、创建用户、赋予权限、修改密码、解锁用户和设置期限二、查询用户密码期限和过期后的修改1.查询用

Window Server创建2台服务器的故障转移群集的图文教程

《WindowServer创建2台服务器的故障转移群集的图文教程》本文主要介绍了在WindowsServer系统上创建一个包含两台成员服务器的故障转移群集,文中通过图文示例介绍的非常详细,对大家的... 目录一、 准备条件二、在ServerB安装故障转移群集三、在ServerC安装故障转移群集,操作与Ser

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一