本文主要是介绍Eureka Server Replicate,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
为了方便说明,就把上篇博客的图再贴一遍了。
上篇说道Application Service向Eureka Server注册服务的过程,在完成注册之后,由于Eureka Server是对等集群,其他Server也需要同步这一注册信息。与zookeeper相比,Eureka并不追求很强的一致性,而是认为A(可用性)和P(分区容错性)更重要。
基于此,Eureka采用复制的方式进行注册信息的同步。
1、在完成注册方法之后,进行复制。
PeerAwareInstanceRegistryImpl类
public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {leaseDuration = info.getLeaseInfo().getDurationInSecs();}super.register(info, leaseDuration, isReplication);replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);}
2、具体的复制步骤
复制方法,复制eureka的所有action操作除了流量。private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {Stopwatch tracer = action.getTimer().start();try {if (isReplication) {numberOfReplicationsLastMin.increment();}// 如果已经复制过,就不再复制if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {return;}//遍历eureka集群中的所有节点,进行复制操作for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {continue;}replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}} finally {tracer.stop();}}
Eureka Server的复制不会进行第二次,可以参考上面的代码。
判断isReplication的值,如果复制过,则不再复制。如果没有复制过,遍历集群中的node节点个数(即Eureka Server集群),依次进行复制操作。
具体的复制action,包括取消、注册、心跳、状态更新等。
private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus,PeerEurekaNode node) {try {InstanceInfo infoFromRegistry = null;CurrentRequestVersion.set(Version.V2);switch (action) {case Cancel:node.cancel(appName, id);break;case Heartbeat:InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;case Register:node.register(info);break;case StatusUpdate:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;case DeleteStatusOverride:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch (Throwable t) {logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);}}
peerEurekaNodes 如何获取集群中的所有 node 节点?
首先,在PeerAwareInstanceRegistryImpl类中的init方法中找到了peerEurekaNodes的赋值。
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {this.numberOfReplicationsLastMin.start();this.peerEurekaNodes = peerEurekaNodes;initializedResponseCache();scheduleRenewalThresholdUpdateTask();initRemoteRegionRegistry();try {Monitors.registerObject(this);} catch (Throwable e) {logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);}}
在其父类中AwsInstanceRegistry也有其实现。
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {super.init(peerEurekaNodes);this.awsAsgUtil = new AwsAsgUtil(serverConfig, clientConfig, this);// We first check if the instance is STARTING or DOWN, then we check explicit overrides,// then we see if our ASG is UP, then we check the status of a potentially existing lease.this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),new OverrideExistsRule(overriddenInstanceStatusMap), new AsgEnabledRule(this.awsAsgUtil),new LeaseExistsRule());}
继续往上找,DefaultEurekaServerContext类中,
@PostConstruct@Overridepublic void initialize() throws Exception {logger.info("Initializing ...");peerEurekaNodes.start();registry.init(peerEurekaNodes);logger.info("Initialized");}
最后,EurekaBootStrap中进行了调用。
protected void initEurekaServerContext() throws Exception {//省略配置信息logger.info("Initializing the eureka client...");//创建node组PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes(registry,eurekaServerConfig,eurekaClientConfig,serverCodecs,applicationInfoManager);serverContext = new DefaultEurekaServerContext(eurekaServerConfig,serverCodecs,registry,peerEurekaNodes,applicationInfoManager);EurekaServerContextHolder.initialize(serverContext);serverContext.initialize();logger.info("Initialized server context");// Copy registry from neighboring eureka nodeint registryCount = registry.syncUp();registry.openForTraffic(applicationInfoManager, registryCount);// Register all monitoring statistics.EurekaMonitors.registerAllStats();}
这篇关于Eureka Server Replicate的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!