本文主要是介绍【Spring Cloud】Eureka Server源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
【学习背景】
在上篇文章中,分析了Eureka Client的源码,核心类是DiscoveryClient,完成了Client端的主要工作。本篇博客接着分析Eureka Server端。
【学习内容】
Eureka Server同时也是一个Eureka Client,在不禁止Eureka Server的客户端行为时,它会向它配置文件中的其他Eureka Server进行拉取注册表、服务注册和心跳发送等操作。
Eureka Server的功能主要包含:服务注册、接受服务心跳、服务剔除、服务下线、集群同步和获取注册表中服务实例信息。下面就从源码中逐个分析下其中的原理。
1. 服务注册
Eureka Client在发起服务注册时会将自身的服务实例元数据封装在InstanceInfo中,然后发送到Eureka Server。Eureka Server在接收到Eureka Client发送的InstanceINFO后将会尝试将其放到本地注册表中以供其他Eureka Client进行服务发现。
服务注册的主要实现在AbstractInstanceRegistry中的registry方法中,代码如下:
//AbstractInstanceRegistry.java
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication){try {// 获取读锁read.lock();// 这里的registry是ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>registry,根据appName对服务实例集群进行分类Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);if(gMap == null){final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = newConcurrentHashMap<String, Lease<InstanceInfo>>();// 这里有一个比较严谨的操作,防止在添加新的服务实例集群租约时把已有的其他线程添加的集群租约覆盖掉,如果存在该键值,直接返回已存在的值;否则添加该键值对,返回nullgMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if(gMap == null){gMap = gNewMap;}}//根据instanceId获取实例的租约Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());if(existingLease ! = null &&(existingLease.getHolder()! = null)){Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();// 如果该实例的租约已经存在,比较最后更新时间戳的大小,取最大值的注册信息为有效if(existingLastDirtyTimestamp > registrationLastDirtyTimestamp){registrant = existingLease.getHolder();}
else {// 如果租约不存在,这是一个新的注册实例synchronized(lock){if(this.expectedNumberOfRenewsPerMin > 0){// 自我保护机制this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;this.numberOfRenewsPerMinThreshold =(int)(this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());}}
}// 创建新的租约Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if(existingLease ! = null){// 如果租约存在,继承租约的服务上线初始时间lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 保存租约gMap.put(registrant.getId(), lease);// 添加最近注册队列// private final CircularQueue<Pair<Long, String>> recentRegisteredQueue// 用来统计最近注册服务实例的数据synchronized(recentRegisteredQueue){recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName()+ "(" + registrant.getId()+ ")"));}...// 根据覆盖状态规则得到服务实例的最终状态,并设置服务实例的当前状态InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 如果服务实例状态为UP,设置租约的服务上线时间,只有第一次设置有效if(InstanceStatus.UP.equals(registrant.getStatus())){lease.serviceUp();}registrant.setActionType(ActionType.ADDED);// 添加最近租约变更记录队列,标识ActionType为ADDED// 这将用于Eureka Client增量式获取注册表信息// private ConcurrentLinkedQueue<RecentlyChangedItem>recentlyChangedQueuerecentlyChangedQueue.add(new RecentlyChangedItem(lease));// 设置服务实例信息更新时间registrant.setLastUpdatedTimestamp();// 设置response缓存过期,这将用于Eureka Client全量获取注册表信息invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());} finally {// 释放锁read.unlock();}
}
简单总结下,上述服务注册代码的核心:
- 获取读锁,防止其他线程对注册表进行数据操作,避免数据不一致
- 查询对应的InstanceINFO是否已经存在注册表中,根据appName划分服务集群,使用InstanceId唯一标记服务实例
- 租约存在,比较两个InstanceInfo的最后更新时间,保留最新的租约
- 租约不存在,创建新的租约保存InstanceInfo,将租约放到Registry注册表中
- 缓存操作,将InstanceInfo加入用于统计Eureka Client增量式获取注册表信息的recentlyChangedQueue和失效responseCache中对应的缓存
- 设置服务实例租约的上线时间,用于计算租约的有效时间,释放读锁,完成注册。
2. 接受服务心跳
Eureka Client完成服务注册之后,需要定时向Eureka Server发送心跳请求(默认30秒一次),维持自己在Eureka Server中租约的有效性。
主要实现在AbstractInstanceRegistry中的renew方法中,代码如下:
// AbstractInstanceRegistry.java
public boolean renew(String appName, String id, boolean isReplication){RENEW.increment(isReplication);// 根据appName获取服务集群的租约集合Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew = null;if(gMap ! = null){leaseToRenew = gMap.get(id);}// 租约不存在,直接返回falseif(leaseToRenew == null){RENEW_NOT_FOUND.increment(isReplication);return false;} else {InstanceInfo instanceInfo = leaseToRenew.getHolder();if(instanceInfo ! = null){// 根据覆盖状态规则得到服务实例的最终状态InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if(overriddenInstanceStatus == InstanceStatus.UNKNOWN){// 如果得到的服务实例最后状态是UNKNOWN,取消续约RENEW_NOT_FOUND.increment(isReplication);return false;}if(! instanceInfo.getStatus().equals(overriddenInstanceStatus)){instanceInfo.setStatus(overriddenInstanceStatus);}}// 统计每分钟续租的次数,用于自我保护renewsLastMin.increment();// 更新租约中的有效时间leaseToRenew.renew();return true;}}
上述代码的核心:
- 根据服务实例的appName和instanceInfo Id 查询服务实例的租约
- 不存在,直接返回false,租约续租失败
- 存在,根据getOverriddenInstanceStatus方法得到的instanceStatus不为InstanceStatus.UNKNOWN,更新租约中的有效时间,达到续约的目的
3. 服务剔除
如果Eureka Client在注册后,既没有续约,也没有下线,那么服务的状态就处于不可知的状态,不能保证能够从该服务实例中获取到回馈,所以需要剔除该服务。
主要实现在AbstractInstanceRegistry中的evict方法中,代码如下:
// AbstractInstanceRegistry.java
@Override
public void evict(){evict(0l);
}
public void evict(long additionalLeaseMs){// 自我保护相关,如果出现该状态,不允许剔除服务if(! isLeaseExpirationEnabled()){return;}// 遍历注册表register,一次性获取所有的过期租约List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();for(Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()){Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if(leaseMap ! = null){for(Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()){Lease<InstanceInfo> lease = leaseEntry.getValue();// 1if(lease.isExpired(additionalLeaseMs)&& lease.getHolder()! = null){expiredLeases.add(lease);}}}}// 计算最大允许剔除的租约的数量,获取注册表租约总数int registrySize =(int)getLocalRegistrySize();// 计算注册表租约的阀值,与自我保护相关int registrySizeThreshold =(int)(registrySize * serverConfig.getRenewalPercentThreshold());int evictionLimit = registrySize - registrySizeThreshold;// 计算剔除租约的数量int toEvict = Math.min(expiredLeases.size(), evictionLimit);if(toEvict > 0){Random random = new Random(System.currentTimeMillis());// 逐个随机剔除for(int i = 0; i < toEvict; i++){int next = i + random.nextInt(expiredLeases.size()- i);Collections.swap(expiredLeases, i, next);Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();// 逐个剔除internalCancel(appName, id, false);}}}
上述代码的核心:
- 遍历registry注册表,找出其中所有的过期租约
- 根据配置文件中续约百分比阈值和当前注册表的租约总数量计算出最大允许的剔除租约的数量,分批次剔除
- 最后调用的是服务下线方法将其从注册表中清除掉
服务剔除中有些限制,主要是为了保证Eureka Server的可用性:
- 自我保护时期不能进行服务剔除操作
- 过期操作是分批进行
- 服务剔除是随机逐个剔除,均匀分布在所有应用中
- 服务剔除是一个定时任务,默认60秒一次
4. 服务下线
Eureka Client在应用销毁时,会向Eureka Server发送服务下线请求,清除注册表中关于本应用的租约,避免无效的服务调用。
主要实现在AbstractInstanceRegistry中的internalCancel方法中,代码如下:
// AbstractInstanceRegistry.java
@Override
public boolean cancel(String appName, String id, boolean isReplication){return internalCancel(appName, id, isReplication);
}
protected boolean internalCancel(String appName, String id, boolean isReplication){try {// 获取读锁,防止被其他线程进行修改read.lock();CANCEL.increment(isReplication);// 根据appName获取服务实例的集群Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToCancel = null;// 移除服务实例的租约if(gMap ! = null){leaseToCancel = gMap.remove(id);}// 将服务实例信息添加到最近下线服务实例统计队列synchronized(recentCanceledQueue){recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(),appName + "(" + id + ")"));}// 租约不存在,返回falseif(leaseToCancel == null){CANCEL_NOT_FOUND.increment(isReplication);return false;} else {// 设置租约的下线时间leaseToCancel.cancel();InstanceInfo instanceInfo = leaseToCancel.getHolder();...if(instanceInfo ! = null){instanceInfo.setActionType(ActionType.DELETED);// 添加最近租约变更记录队列,标识ActionType为DELETED// 这将用于Eureka Client增量式获取注册表信息recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));instanceInfo.setLastUpdatedTimestamp();}// 设置response缓存过期invalidateCache(appName, vip, svip);// 下线成功return true;}} finally {// 释放锁read.unlock();}}
上述代码的核心:
- 通过读锁,保存registry注册表中的数据的一致性,避免脏读
- 根据appName和InstanceInfo Id查询服务实例是否存在
- 不存在,直接返回下线失败
- 存在,从registry注册表中移除,设置租约的下线时间,同时在最近租约变更记录队列中添加新的下线记录,用于Eureka Client的增量式获取注册表信息
- 最后设置response缓存过期
5. 集群同步
如果Eureka Server是通过集群方式部署,那么为了维护集群中注册表数据的一致性,便需要一个机制同步注册表中的数据。
Eureka Server集群同步包含两个部分,一部分是Eureka Server在启动过程中从它的peer节点中拉取注册表信息,并将这些服务实例的信息注册到本地注册表;另一部分是Eureka Server每次对本地注册表进行操作时,同时会将操作同步到它的peer节点中,达到集群注册表数据统一的目的。
第一部分,初始化本地注册表信息,主要实现在PeerAwareInstanceRegistry中的syncUp方法中,代码如下:
// PeerAwareInstanceRegistry.java
public int syncUp(){// 从临近的peer中复制整个注册表int count = 0;// 如果获取不到,线程等待for(int i = 0;((i < serverConfig.getRegistrySyncRetries())&&(count == 0));i++){if(i > 0){try {Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());} catch(InterruptedException e){break;}}// 获取所有的服务实例Applications apps = eurekaClient.getApplications();for(Application app : apps.getRegisteredApplications()){for(InstanceInfo instance : app.getInstances()){try {// 判断是否可注册,主要用于AWS环境下进行,若部署在其他的环境,直接返回trueif(isRegisterable(instance)){// 注册到自身的注册表中register(instance, instance.getLeaseInfo().getDurationInSecs(),true);count++;}} catch(Throwable t){logger.error("During DS init copy", t);}}}}return count;}
第二部分,为了保证集群数据的一致性,每个Eureka Server对本地注册表进行操作后,会同步给所有的节点,主要实现在PeerAwareInstanceRegistryImpl中,对所有操作registry、cancel、renew等,都进行了同步操作,主要方法replicateToPeers,代码如下:
//PeerAwareInstanceRegistryImpl.java
private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, booleanisReplication){Stopwatch tracer = action.getTimer().start();try {if(isReplication){numberOfReplicationsLastMin.increment();}// 如果peer集群为空,或者这本来就是复制操作,那么就不再复制,防止造成循环复制if(peerEurekaNodes == Collections.EMPTY_LIST || isReplication){return;}// 向peer集群中的每一个peer进行同步for(final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()){// 如果peer节点是自身的话,不进行同步复制if(peerEurekaNodes.isThisMyUrl(node.getServiceUrl())){continue;}// 根据Action调用不同的同步请求replicateInstanceActionsToPeers(action, appName, id, info, newStatus,node);}} finally {tracer.stop();}
}
根据Action的不同,调用不同方法,进行同步:
//PeerAwareInstanceRegistryImpl.java
private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node){try {InstanceInfo infoFromRegistry = null;CurrentRequestVersion.set(Version.V2);switch(action){case Cancel:// 同步下线node.cancel(appName, id);break;case Heartbeat:InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);infoFromRegistry = getInstanceByAppAndId(appName, id, false);// 同步心跳node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;case Register:// 同步注册node.register(info);break;case StatusUpdate:infoFromRegistry = getInstanceByAppAndId(appName, id, false);// 同步状态更新node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;case DeleteStatusOverride:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch(Throwable t){logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
6. 获取注册表中服务实例信息
Erueka Server中获取注册表信息分为全量获取和增量获取两种。
全量获取,主要通过AbstractInstanceRegistry中的getApplicationsFromMultipleRegions方法实现,代码如下:
//AbstractInstanceRegistry.java
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions){boolean includeRemoteRegion = null ! = remoteRegions && remoteRegions.length ! = 0;Applications apps = new Applications();apps.setVersion(1L);// 从本地registry获取所有的服务实例信息InstanceInfofor(Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()){Application app = null;if(entry.getValue()! = null){for(Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()){Lease<InstanceInfo> lease = stringLeaseEntry.getValue();if(app == null){app = new Application(lease.getHolder().getAppName());}app.addInstance(decorateInstanceInfo(lease));}}if(app ! = null){apps.addApplication(app);}}if(includeRemoteRegion){// 获取远程Region中的Eureka Server中的注册表信息...}apps.setAppsHashCode(apps.getReconcileHashCode());return apps;
}
增量获取,主要通过AbstractInstanceRegistry中的getApplicationDeltasFromMultipleRegions方法实现,代码如下:
//AbstractInstanceRegistry.java
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions){if(null == remoteRegions){remoteRegions = allKnownRemoteRegions; // null means all remote regions.}
boolean includeRemoteRegion = remoteRegions.length ! = 0;Applications apps = new Applications();apps.setVersion(responseCache.getVersionDeltaWithRegions().get());Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();try {write.lock(); // 开启写锁// 遍历recentlyChangedQueue队列获取最近变化的服务实例信息InstanceInfoIterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();while(iter.hasNext()){//...}if(includeRemoteRegion){// 获取远程Region中的Eureka Server的增量式注册表信息...} finally {write.unlock();}// 计算应用集合一致性哈希码,用以在Eureka Client拉取时进行对比apps.setAppsHashCode(apps.getReconcileHashCode());return apps;}
【学习总结】
断断续续,算是把Eureka注册中心相关应用和原理部分都学习了一遍,过程难点在于分析源码比较难懂枯燥,不如自己搭建应用兴趣大,有结果向导。除此之外,学习还是需要反复的,总结下来,相信对后面的回顾还是会有很大帮助的。
这篇关于【Spring Cloud】Eureka Server源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!