本文主要是介绍Eureka核心源码解析(一):应用实例注册、续约、下线,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
本文主要来解析Eureka应用实例注册、续约、下线的核心源码,基于1.9.8版本
一、应用实例注册
1、Eureka Client发起注册
Eureka Client向Eureka Server发起注册应用实例需要符合如下条件:
- 配置
eureka.registration.enabled=true
,Eureka Client向Eureka Server发起注册应用实例的开关 - InstanceInfo在Eureka Client和Eureka Server数据不一致
每次InstanceInfo发生属性变化时,标记isInstanceInfoDirty
属性为true,表示InstanceInfo在Eureka Client和Eureka Server数据不一致,需要注册。另外,InstanceInfo刚被创建时,在Eureka Server不存在,也会被注册
当符合条件时,InstanceInfo不会立即向Eureka Server注册,而是后台线程定时注册
当InstanceInfo的状态(status)属性发生变化时,并且配置eureka.shouldOnDemandUpdateStatusChange=true
(默认为true)时,立即向Eureka Server注册
1)、应用实例信息复制
public class DiscoveryClient implements EurekaClient {/*** 应用实例状态变更监听器*/private ApplicationInfoManager.StatusChangeListener statusChangeListener;/*** 应用实例信息复制器*/private InstanceInfoReplicator instanceInfoReplicator;private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 向EurekaServer心跳执行器if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// 创建应用实例状态变更监听器// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};// 注册应用实例状态变更监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// 开启应用实例信息复制器instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
调用InstanceInfoReplicator的start(int initialDelayMs)
方法,开启应用实例信息复制器。实现代码如下:
class InstanceInfoReplicator implements Runnable {public void start(int initialDelayMs) {if (started.compareAndSet(false, true)) {// 设置应用实例信息数据不一致instanceInfo.setIsDirty(); // for initial register// 提交任务Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}public void run() {try {// 刷新应用实例信息discoveryClient.refreshInstanceInfo();// 判断应用实例信息是否数据不一致Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 发起注册discoveryClient.register();// 设置应用实例信息数据一致instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {// 提交任务 不断循环定时执行任务Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}
调用DiscoveryClient的register()
方法,EurekaClient向Eureka Server注册应用实例
2)、发起注册应用实例
public class DiscoveryClient implements EurekaClient {boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {// 调用AbstractJerseyEurekaHttpClient的register方法httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {@Overridepublic EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}
AbstractJerseyEurekaHttpClient的register()
方法使用POST请求调用Eureka Server的apps/${APP_NAME}
接口,参数为InstanceInfo,实现注册实例信息的注册
2、Eureka Server接收注册
Eureka Server接收注册核心流程如下图:
1)、接收注册请求
@Produces({"application/xml", "application/json"})
public class ApplicationResource {@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// 参数合法性校验// validate that the instanceinfo contains all the necessary required fieldsif (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}// 注册应用实例信息registry.register(info, "true".equals(isReplication));// 返回204状态码return Response.status(204).build(); // 204 to be backwards compatible}
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {// 续约过期时间int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {leaseDuration = info.getLeaseInfo().getDurationInSecs();}// 注册应用实例信息super.register(info, leaseDuration, isReplication);// Eureka Server复制replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);}
PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的register(...)
方法注册实例信息
2)、Lease
public class Lease<T> {enum Action {Register, Cancel, Renew};public static final int DEFAULT_DURATION_IN_SECS = 90;/*** InstanceInfo实体*/private T holder;/*** 取消注册时间戳*/private long evictionTimestamp;/*** 注册时间戳*/private long registrationTimestamp;/*** 开始服务时间戳*/private long serviceUpTimestamp;/*** 租约最后更新时间戳*/// Make it volatile so that the expiration task would see this quickerprivate volatile long lastUpdateTimestamp;/*** 租约持续时长(毫秒)*/private long duration;public Lease(T r, int durationInSecs) {holder = r;registrationTimestamp = System.currentTimeMillis();lastUpdateTimestamp = registrationTimestamp;duration = (durationInSecs * 1000);}
3)、注册应用实例信息
调用了AbstractInstanceRegistry的register(...)
方法,注册实例信息,代码如下:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {/*** 租约映射* key1:应用名* key2:应用实例信息编号* value:租约*/private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {// 获取读锁read.lock();Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());// 增加注册次数到监控REGISTER.increment(isReplication);// 获得应用实例信息对应的租约if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();// 添加应用gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}// 创建租约Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 添加到租约gMap.put(registrant.getId(), lease);// 添加到最近注册的调试队列synchronized (recentRegisteredQueue) {recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));}// 添加到应用实例覆盖状态映射// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// 获得应用实例最终状态,并设置应用实例的状态// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 设置租约的开始服务的时间戳(只有第一次有效)// If the lease is registered with UP status, set lease service up timestampif (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}// 设置应用实例信息的操作类型为添加registrant.setActionType(ActionType.ADDED);// 添加到最近租约变更记录队列recentlyChangedQueue.add(new RecentlyChangedItem(lease));// 设置租约的最后更新时间戳registrant.setLastUpdatedTimestamp();// 设置响应缓存过期invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {// 释放锁read.unlock();}}
二、应用实例续约
1、Eureka Client发起续约
Eureka Client向Eureka Server发起注册应用实例成功后获得租约,Eureka Client固定间隔向Eureka Server发起续约(renew),避免租约过期
默认情况下,租约有效期为90秒,续约频率为30秒。两者比例为1:3,保证在网络异常等情况下,有三次重试的机会
1)、初始化定时任务
Eureka Client在初始化过程中,创建心跳线程,固定间隔向Eureka Server发起续约。实现代码如下:
public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 向EurekaServer心跳执行器if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// 创建应用实例状态变更监听器// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};// 注册应用实例状态变更监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// 开启应用实例信息复制器instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
2)、发起续约
public class DiscoveryClient implements EurekaClient {/*** 最后成功向Eureka Server心跳时间戳*/private volatile long lastSuccessfulHeartbeatTimestamp = -1;private class HeartbeatThread implements Runnable {public void run() {// 调用续约方法if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {// 调用AbstractJerseyEurekaHttpClient的sendHeartBeat方法httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}}
AbstractJerseyEurekaHttpClient的renew()
方法使用PUT请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}
接口,参数为status、lastDirtyTimestamp、overriddenstatus,实现续约
2、Eureka Server接收续约
Eureka Server接收续约核心流程如下图:
1)、接收续约请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {@PUTpublic Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);// 续约boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// 续约失败// Not found in the registry, immediately ask for a registerif (!isSuccess) {logger.warn("Not Found (Renew): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}// 比较InstanceInfo的lastDirtyTimestamp属性// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponse response;if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);// Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));}} else {response = Response.ok().build();}logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());return response;}
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {public boolean renew(final String appName, final String id, final boolean isReplication) {// 续约if (super.renew(appName, id, isReplication)) {// Eureka Server复制replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);return true;}return false;}
PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的renew(...)
方法续约实例信息
2)、续约应用实例信息
调用了AbstractInstanceRegistry的renew(...)
方法,续约实例信息,代码如下:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {public boolean renew(String appName, String id, boolean isReplication) {// 增加续约次数到监控RENEW.increment(isReplication);// 获取应用名对应的租约Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew = null;if (gMap != null) {leaseToRenew = gMap.get(id);}// 租约不存在if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else {InstanceInfo instanceInfo = leaseToRenew.getHolder();if (instanceInfo != null) {// touchASGCache(instanceInfo.getASGName());InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);return false;}if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),instanceInfo.getOverriddenStatus().name(),instanceInfo.getId());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}// 新增续租每分钟次数renewsLastMin.increment();// 设置租约最后更新时间leaseToRenew.renew();return true;}}
public class Lease<T> {public void renew() {// 设置租约最后更新时间戳lastUpdateTimestamp = System.currentTimeMillis() + duration;}
续约的整个过程修改租约的过期时间,即使并发请求,也不会对数据的一致性产生影响,因此不需要像注册操作一样加锁
三、应用实例下线
1、Eureka Client发起下线
应用实例关闭时,Eureka Client向Eureka Server发起下线应用实例。需要满足如下条件才可发起:
- 配置
eureka.registration.enabled=true
,应用实例开启注册开关。默认为false - 配置
eureka.shouldUnregisterOnShutdown=true
,应用实例开启关闭时下线开关。默认为true
public class DiscoveryClient implements EurekaClient {@PreDestroy@Overridepublic synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka() // eureka.registration.enabled=true&& clientConfig.shouldUnregisterOnShutdown()) { // eureka.shouldUnregisterOnShutdown=trueapplicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();logger.info("Completed shut down of DiscoveryClient");}}void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");// 调用AbstractJerseyEurekaHttpClient的cancel方法EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {@Overridepublic EurekaHttpResponse<Void> cancel(String appName, String id) {String urlPath = "apps/" + appName + '/' + id;ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.delete(ClientResponse.class);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}
AbstractJerseyEurekaHttpClient的cancel()
方法使用DELETE请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}
接口,实现应用实例信息的下线
2、Eureka Server接收下线
Eureka Server接收下线请求核心流程如下图:
1)、接收下线请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {@DELETEpublic Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {try {// 下线boolean isSuccess = registry.cancel(app.getName(), id,"true".equals(isReplication));if (isSuccess) {logger.debug("Found (Cancel): {} - {}", app.getName(), id);return Response.ok().build();} else {logger.info("Not Found (Cancel): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}} catch (Throwable e) {logger.error("Error (cancel): {} - {}", app.getName(), id, e);return Response.serverError().build();}}
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {@Overridepublic boolean cancel(final String appName, final String id,final boolean isReplication) {// 下线if (super.cancel(appName, id, isReplication)) {// Eureka Server复制replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to cancel it, reduce the number of clients to send renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;updateRenewsPerMinThreshold();}}return true;}return false;}
PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的cancel(...)
方法下线应用实例信息
2)、下线应用实例信息
调用了AbstractInstanceRegistry的cancel(...)
方法,下线应用实例信息,代码如下:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {@Overridepublic boolean cancel(String appName, String id, boolean isReplication) {return internalCancel(appName, id, isReplication);}protected boolean internalCancel(String appName, String id, boolean isReplication) {try {// 获得读锁read.lock();// 增加取消注册次数到监控CANCEL.increment(isReplication);Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToCancel = null;// 移除租约映射if (gMap != null) {leaseToCancel = gMap.remove(id);}// 添加到最近取消注册的调试队列synchronized (recentCanceledQueue) {recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));}// 移除应用实例覆盖状态映射InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}// 租约不存在if (leaseToCancel == null) {// 添加取消注册不存在到监控CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {// 设置租约的取消注册时间戳leaseToCancel.cancel();// 添加到最近租约变更记录队列InstanceInfo instanceInfo = leaseToCancel.getHolder();String vip = null;String svip = null;if (instanceInfo != null) {instanceInfo.setActionType(ActionType.DELETED);recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));instanceInfo.setLastUpdatedTimestamp();vip = instanceInfo.getVIPAddress();svip = instanceInfo.getSecureVipAddress();}// 设置响应缓存过期invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);return true;}} finally {// 释放锁read.unlock();}}
public class Lease<T> {public void cancel() {if (evictionTimestamp <= 0) {// 设置取消注册时间戳evictionTimestamp = System.currentTimeMillis();}}
参考:
Eureka 源码解析 —— 应用实例注册发现(一)之注册
Eureka 源码解析 —— 应用实例注册发现(二)之续租
Eureka 源码解析 —— 应用实例注册发现(三)之下线
这篇关于Eureka核心源码解析(一):应用实例注册、续约、下线的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!