Eureka核心源码解析(一):应用实例注册、续约、下线

2024-01-05 18:40

本文主要是介绍Eureka核心源码解析(一):应用实例注册、续约、下线,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要来解析Eureka应用实例注册、续约、下线的核心源码,基于1.9.8版本

一、应用实例注册

1、Eureka Client发起注册

Eureka Client向Eureka Server发起注册应用实例需要符合如下条件:

  • 配置eureka.registration.enabled=true,Eureka Client向Eureka Server发起注册应用实例的开关
  • InstanceInfo在Eureka Client和Eureka Server数据不一致

每次InstanceInfo发生属性变化时,标记isInstanceInfoDirty属性为true,表示InstanceInfo在Eureka Client和Eureka Server数据不一致,需要注册。另外,InstanceInfo刚被创建时,在Eureka Server不存在,也会被注册

当符合条件时,InstanceInfo不会立即向Eureka Server注册,而是后台线程定时注册

当InstanceInfo的状态(status)属性发生变化时,并且配置eureka.shouldOnDemandUpdateStatusChange=true(默认为true)时,立即向Eureka Server注册

1)、应用实例信息复制
public class DiscoveryClient implements EurekaClient {/*** 应用实例状态变更监听器*/private ApplicationInfoManager.StatusChangeListener statusChangeListener;/*** 应用实例信息复制器*/private InstanceInfoReplicator instanceInfoReplicator;private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 向EurekaServer心跳执行器if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// 创建应用实例状态变更监听器// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};// 注册应用实例状态变更监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// 开启应用实例信息复制器instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}

调用InstanceInfoReplicator的start(int initialDelayMs) 方法,开启应用实例信息复制器。实现代码如下:

class InstanceInfoReplicator implements Runnable {public void start(int initialDelayMs) {if (started.compareAndSet(false, true)) {// 设置应用实例信息数据不一致instanceInfo.setIsDirty();  // for initial register// 提交任务Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}public void run() {try {// 刷新应用实例信息discoveryClient.refreshInstanceInfo();// 判断应用实例信息是否数据不一致Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 发起注册discoveryClient.register();// 设置应用实例信息数据一致instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {// 提交任务 不断循环定时执行任务Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

调用DiscoveryClient的register() 方法,EurekaClient向Eureka Server注册应用实例

2)、发起注册应用实例
public class DiscoveryClient implements EurekaClient {boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {// 调用AbstractJerseyEurekaHttpClient的register方法httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {@Overridepublic EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}

AbstractJerseyEurekaHttpClient的register()方法使用POST请求调用Eureka Server的apps/${APP_NAME}接口,参数为InstanceInfo,实现注册实例信息的注册

2、Eureka Server接收注册

Eureka Server接收注册核心流程如下图:

在这里插入图片描述

1)、接收注册请求
@Produces({"application/xml", "application/json"})
public class ApplicationResource {@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// 参数合法性校验// validate that the instanceinfo contains all the necessary required fieldsif (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}// 注册应用实例信息registry.register(info, "true".equals(isReplication));// 返回204状态码return Response.status(204).build();  // 204 to be backwards compatible}  
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {// 续约过期时间int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {leaseDuration = info.getLeaseInfo().getDurationInSecs();}// 注册应用实例信息super.register(info, leaseDuration, isReplication);// Eureka Server复制replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的register(...)方法注册实例信息

2)、Lease
public class Lease<T> {enum Action {Register, Cancel, Renew};public static final int DEFAULT_DURATION_IN_SECS = 90;/*** InstanceInfo实体*/private T holder;/*** 取消注册时间戳*/private long evictionTimestamp;/*** 注册时间戳*/private long registrationTimestamp;/*** 开始服务时间戳*/private long serviceUpTimestamp;/*** 租约最后更新时间戳*/// Make it volatile so that the expiration task would see this quickerprivate volatile long lastUpdateTimestamp;/*** 租约持续时长(毫秒)*/private long duration;public Lease(T r, int durationInSecs) {holder = r;registrationTimestamp = System.currentTimeMillis();lastUpdateTimestamp = registrationTimestamp;duration = (durationInSecs * 1000);}
3)、注册应用实例信息

调用了AbstractInstanceRegistry的register(...)方法,注册实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {/*** 租约映射* key1:应用名* key2:应用实例信息编号* value:租约*/private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {// 获取读锁read.lock();Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());// 增加注册次数到监控REGISTER.increment(isReplication);// 获得应用实例信息对应的租约if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();// 添加应用gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}// 创建租约Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 添加到租约gMap.put(registrant.getId(), lease);// 添加到最近注册的调试队列synchronized (recentRegisteredQueue) {recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));}// 添加到应用实例覆盖状态映射// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// 获得应用实例最终状态,并设置应用实例的状态// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 设置租约的开始服务的时间戳(只有第一次有效)// If the lease is registered with UP status, set lease service up timestampif (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}// 设置应用实例信息的操作类型为添加registrant.setActionType(ActionType.ADDED);// 添加到最近租约变更记录队列recentlyChangedQueue.add(new RecentlyChangedItem(lease));// 设置租约的最后更新时间戳registrant.setLastUpdatedTimestamp();// 设置响应缓存过期invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {// 释放锁read.unlock();}}

二、应用实例续约

1、Eureka Client发起续约

Eureka Client向Eureka Server发起注册应用实例成功后获得租约,Eureka Client固定间隔向Eureka Server发起续约(renew),避免租约过期

默认情况下,租约有效期为90秒,续约频率为30秒。两者比例为1:3,保证在网络异常等情况下,有三次重试的机会

1)、初始化定时任务

Eureka Client在初始化过程中,创建心跳线程,固定间隔向Eureka Server发起续约。实现代码如下:

public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 向EurekaServer心跳执行器if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// 创建应用实例状态变更监听器// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};// 注册应用实例状态变更监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// 开启应用实例信息复制器instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
2)、发起续约
public class DiscoveryClient implements EurekaClient {/*** 最后成功向Eureka Server心跳时间戳*/private volatile long lastSuccessfulHeartbeatTimestamp = -1;private class HeartbeatThread implements Runnable {public void run() {// 调用续约方法if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {// 调用AbstractJerseyEurekaHttpClient的sendHeartBeat方法httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}}  

AbstractJerseyEurekaHttpClient的renew()方法使用PUT请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,参数为status、lastDirtyTimestamp、overriddenstatus,实现续约

2、Eureka Server接收续约

Eureka Server接收续约核心流程如下图:

在这里插入图片描述

1)、接收续约请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {@PUTpublic Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);// 续约boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// 续约失败// Not found in the registry, immediately ask for a registerif (!isSuccess) {logger.warn("Not Found (Renew): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}// 比较InstanceInfo的lastDirtyTimestamp属性// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponse response;if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);// Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));}} else {response = Response.ok().build();}logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());return response;}
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {public boolean renew(final String appName, final String id, final boolean isReplication) {// 续约if (super.renew(appName, id, isReplication)) {// Eureka Server复制replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);return true;}return false;}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的renew(...)方法续约实例信息

2)、续约应用实例信息

调用了AbstractInstanceRegistry的renew(...)方法,续约实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {public boolean renew(String appName, String id, boolean isReplication) {// 增加续约次数到监控RENEW.increment(isReplication);// 获取应用名对应的租约Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew = null;if (gMap != null) {leaseToRenew = gMap.get(id);}// 租约不存在if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else {InstanceInfo instanceInfo = leaseToRenew.getHolder();if (instanceInfo != null) {// touchASGCache(instanceInfo.getASGName());InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);return false;}if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),instanceInfo.getOverriddenStatus().name(),instanceInfo.getId());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}// 新增续租每分钟次数renewsLastMin.increment();// 设置租约最后更新时间leaseToRenew.renew();return true;}}
public class Lease<T> {public void renew() {// 设置租约最后更新时间戳lastUpdateTimestamp = System.currentTimeMillis() + duration;}

续约的整个过程修改租约的过期时间,即使并发请求,也不会对数据的一致性产生影响,因此不需要像注册操作一样加锁

三、应用实例下线

1、Eureka Client发起下线

应用实例关闭时,Eureka Client向Eureka Server发起下线应用实例。需要满足如下条件才可发起:

  • 配置eureka.registration.enabled=true,应用实例开启注册开关。默认为false
  • 配置eureka.shouldUnregisterOnShutdown=true,应用实例开启关闭时下线开关。默认为true
public class DiscoveryClient implements EurekaClient {@PreDestroy@Overridepublic synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka() // eureka.registration.enabled=true&& clientConfig.shouldUnregisterOnShutdown()) { // eureka.shouldUnregisterOnShutdown=trueapplicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();logger.info("Completed shut down of DiscoveryClient");}}void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");// 调用AbstractJerseyEurekaHttpClient的cancel方法EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {@Overridepublic EurekaHttpResponse<Void> cancel(String appName, String id) {String urlPath = "apps/" + appName + '/' + id;ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.delete(ClientResponse.class);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}

AbstractJerseyEurekaHttpClient的cancel()方法使用DELETE请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,实现应用实例信息的下线

2、Eureka Server接收下线

Eureka Server接收下线请求核心流程如下图:

在这里插入图片描述

1)、接收下线请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {@DELETEpublic Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {try {// 下线boolean isSuccess = registry.cancel(app.getName(), id,"true".equals(isReplication));if (isSuccess) {logger.debug("Found (Cancel): {} - {}", app.getName(), id);return Response.ok().build();} else {logger.info("Not Found (Cancel): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}} catch (Throwable e) {logger.error("Error (cancel): {} - {}", app.getName(), id, e);return Response.serverError().build();}}  
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {@Overridepublic boolean cancel(final String appName, final String id,final boolean isReplication) {// 下线if (super.cancel(appName, id, isReplication)) {// Eureka Server复制replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to cancel it, reduce the number of clients to send renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;updateRenewsPerMinThreshold();}}return true;}return false;}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的cancel(...)方法下线应用实例信息

2)、下线应用实例信息

调用了AbstractInstanceRegistry的cancel(...)方法,下线应用实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {@Overridepublic boolean cancel(String appName, String id, boolean isReplication) {return internalCancel(appName, id, isReplication);}protected boolean internalCancel(String appName, String id, boolean isReplication) {try {// 获得读锁read.lock();// 增加取消注册次数到监控CANCEL.increment(isReplication);Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToCancel = null;// 移除租约映射if (gMap != null) {leaseToCancel = gMap.remove(id);}// 添加到最近取消注册的调试队列synchronized (recentCanceledQueue) {recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));}// 移除应用实例覆盖状态映射InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}// 租约不存在if (leaseToCancel == null) {// 添加取消注册不存在到监控CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {// 设置租约的取消注册时间戳leaseToCancel.cancel();// 添加到最近租约变更记录队列InstanceInfo instanceInfo = leaseToCancel.getHolder();String vip = null;String svip = null;if (instanceInfo != null) {instanceInfo.setActionType(ActionType.DELETED);recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));instanceInfo.setLastUpdatedTimestamp();vip = instanceInfo.getVIPAddress();svip = instanceInfo.getSecureVipAddress();}// 设置响应缓存过期invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);return true;}} finally {// 释放锁read.unlock();}}  
public class Lease<T> {public void cancel() {if (evictionTimestamp <= 0) {// 设置取消注册时间戳evictionTimestamp = System.currentTimeMillis();}}

参考:

Eureka 源码解析 —— 应用实例注册发现(一)之注册

Eureka 源码解析 —— 应用实例注册发现(二)之续租

Eureka 源码解析 —— 应用实例注册发现(三)之下线

这篇关于Eureka核心源码解析(一):应用实例注册、续约、下线的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/573789

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

zoj3820(树的直径的应用)

题意:在一颗树上找两个点,使得所有点到选择与其更近的一个点的距离的最大值最小。 思路:如果是选择一个点的话,那么点就是直径的中点。现在考虑两个点的情况,先求树的直径,再把直径最中间的边去掉,再求剩下的两个子树中直径的中点。 代码如下: #include <stdio.h>#include <string.h>#include <algorithm>#include <map>#

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、