zookeeper投票选举源码分析

2023-11-07 01:48

本文主要是介绍zookeeper投票选举源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 leader 选举

Leader 选举会分两个过程
启动的时候的 leader 选举、 leader 崩溃的时候的的选举

1. 服务器启动时的 leader

选举每个节点启动的时候状态都是 LOOKING,处于观望状态,接下来就开始进行选主流程进行 Leader 选举,至少需要两台机器(具体原因前面已经讲过了),我们选取 3 台机器组成的服务器集群为例。在集群初始化阶段,当有一台服务器 Server1 启动时,它本身是无法进行和完成 Leader 选举,当第二台服务器 Server2 启动时,这个时候两台机器可以相互通信,每台机器都试图找到 Leader,于是进入 Leader 选举过程。选举过程如下

(1) 每个 Server 发出一个投票。由于是初始情况,Server1和 Server2 都会将自己作为 Leader 服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID、epoch,使用(myid, ZXID,epoch)来表示,此时 Server1的投票为(1, 0),Server2 的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本
轮投票(epoch)、是否来自LOOKING状态的服务器。
(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行 PK,PK 规则如下
i. 优先检查 ZXID。ZXID 比较大的服务器优先作为Leader
ii. 如果 ZXID 相同,那么就比较 myid。myid 较大的服务器作为 Leader 服务器。
对于 Server1 而言,它的投票是(1, 0),接收 Server2的投票为(2, 0),首先会比较两者的 ZXID,均为 0,再
比较 myid,此时 Server2 的 myid 最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,
它不需要更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对
于 Server1、Server2 而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出
了 Leader。
(5) 改变服务器状态。一旦确定了 Leader,每个服务器就会更新自己的状态,如果是 Follower,那么就变更为
FOLLOWING,如果是 Leader,就变更为 LEADING。

2. 运行过程中的 leader 选举

当集群中的 leader 服务器出现宕机或者不可用的情况时,那么整个集群将无法对外提供服务,而是进入新一轮的
Leader 选举,服务器运行期间的 Leader 选举和启动时期的 Leader 选举基本过程是一致的。
(1) 变更状态。Leader 挂后,余下的非 Observer 服务器都会将自己的服务器状态变更为 LOOKING,然后开
始进入 Leader 选举过程。
(2) 每个 Server 会发出一个投票。在运行期间,每个服务器上的 ZXID 可能不同,此时假定 Server1 的 ZXID 为
123,Server3的ZXID为122;在第一轮投票中,Server1和 Server3 都会投自己,产生投票(1, 123),(3, 122),
然后各自将投票发送给集群中所有机器。接收来自各个服务器的投票。与启动时过程相同。
(3) 处理投票。与启动时过程相同,此时,Server1 将会成为 Leader。
(4) 统计投票。与启动时过程相同。
(5) 改变服务器的状态。与启动时过程相同

二 Leader 选举源码分析

1.从入口函数 QUORUMPEERMAIN 开始
2. QuorumPeer 重写了 Thread.start 方法
3. 初始化 LEADERELECTION 调用startLeaderElection
4. QuorumPeer 类的startLeaderElection方法中调用选举算法createElectionAlgorithm
5. createElectionAlgorithm方法 创建FastLeaderElection对象 构造函数中调用FastLeaderElection类的starter方法
6. 接下来调用 fle.start() , 也就是会调用 FastLeaderElection
start()方法,该方法主要是对发送线程和接收线程的初始
化 , 左 边 是 FastLeaderElection 的 start , 右 边 是
messager.start()
7. wsThread 和 wrThread 的 初 始 化 动 作 在
FastLeaderElection 的 starter 方法里面进行,这里面有两
个内部类,一个是 WorkerSender,一个是 WorkerReceiver,
负责发送投票信息和接收投票信息
8. 然后再回到 QuorumPeer.java。 FastLeaderElection 初始
化完成以后,调用 super.start(),最终运行 QuorumPeer 的
run 方法
9. 最重要的就是run方法 run方法中调用setCurrentVote(makeLEStrategy().lookForLeader());,最终根据策略应该运行 FastLeaderElection 中的选举算法

从入口函数 QuorumPeerMain开始

/*** To start the replicated server specify the configuration file name on* the command line.* @param args path to the configfile*/
public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {LOG.error("Invalid arguments, exiting abnormally", e);LOG.info(USAGE);System.err.println(USAGE);ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());} catch (ConfigException e) {LOG.error("Invalid config, exiting abnormally", e);System.err.println("Invalid config, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());} catch (DatadirException e) {LOG.error("Unable to access datadir, exiting abnormally", e);System.err.println("Unable to access datadir, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());} catch (AdminServerException e) {LOG.error("Unable to start AdminServer, exiting abnormally", e);System.err.println("Unable to start AdminServer, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());} catch (Exception e) {LOG.error("Unexpected exception, exiting abnormally", e);ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());}LOG.info("Exiting normally");ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(),config.getDataLogDir(),config.getSnapRetainCount(),config.getPurgeInterval());purgeMgr.start();//判断是standalone模式还是集群模式if (args.length == 1 && config.isDistributed()) {//集群runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}
}public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");MetricsProvider metricsProvider;try {metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(),config.getMetricsProviderConfiguration());} catch (MetricsProviderLifeCycleException error) {throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);}try {ServerMetrics.metricsProviderInitialized(metricsProvider);ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;//为客户端提供读写的server 也就是2181的端口访问功能if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);}quorumPeer = getQuorumPeer();quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());//quorumPeer.setQuorumPeers(config.getAllMembers());quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());quorumPeer.setObserverMasterPort(config.getObserverMasterPort());quorumPeer.setConfigFileName(config.getConfigFilename());quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier() != null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}quorumPeer.initConfigInZKDatabase();quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.shouldUsePortUnification());quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());if (config.sslQuorumReloadCertFiles) {quorumPeer.getX509Util().enableCertFileReloading();}quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if (quorumPeer.isQuorumSaslAuthEnabled()) {quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();if (config.jvmPauseMonitorToRun) {quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));}//启动主线程quorumPeer.start();ZKAuditProvider.addZKStartStopAuditLog();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);} finally {if (metricsProvider != null) {try {metricsProvider.stop();} catch (Throwable error) {LOG.warn("Error while stopping metrics", error);}}}
}

调用 QuorumPeer 的 start方法

@Override
public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");}//恢复db  loaddatabase主要是从本地文件中恢复数据,以及获取最新的 zxidloadDataBase();startServerCnxnFactory();try {adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem starting AdminServer", e);System.out.println(e);}//选举初始化startLeaderElection();startJvmPauseMonitor();super.start();
}ResponderThread responder;public synchronized void stopLeaderElection() {responder.running = false;responder.interrupt();
}public synchronized void startLeaderElection() {try {//如果当前节点状态是LOOKING 投票给自己if (getPeerState() == ServerState.LOOKING) {currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}} catch (IOException e) {RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}//根据配置获取选举算法 配置选举算法,选举算法有 3 种,可以通过在 zoo.cfg 里
//面进行配置,默认是 fast 选举this.electionAlg = createElectionAlgorithm(electionType);
}@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {Election le = null;//TODO: use a factory rather than a switchswitch (electionAlgorithm) {case 1:throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");case 2:throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");case 3://leader选举io负责类QuorumCnxManager qcm = createCnxnManager();QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);if (oldQcm != null) {LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");oldQcm.halt();}QuorumCnxManager.Listener listener = qcm.listener;if (listener != null) {//启动已绑定的选举线程 等待集群中其他机器连接listener.start();//基于TCP的选举算法FastLeaderElection fle = new FastLeaderElection(this, qcm);fle.start();le = fle;} else {LOG.error("Null listener when initializing cnx manager");}break;default:assert false;}return le;
}@SuppressWarnings("deprecation")
protected Election makeLEStrategy() {LOG.debug("Initializing leader election protocol...");return electionAlg;
}protected synchronized void setLeader(Leader newLeader) {leader = newLeader;
}protected synchronized void setFollower(Follower newFollower) {follower = newFollower;
}protected synchronized void setObserver(Observer newObserver) {observer = newObserver;
}public synchronized ZooKeeperServer getActiveServer() {if (leader != null) {return leader.zk;} else if (follower != null) {return follower.zk;} else if (observer != null) {return observer.zk;}return null;
}boolean shuttingDownLE = false;@Override
public void run() {updateThreadName();LOG.debug("Starting quorum peer");try {jmxQuorumBean = new QuorumBean(this);MBeanRegistry.getInstance().register(jmxQuorumBean, null);for (QuorumServer s : getView().values()) {ZKMBeanInfo p;if (getId() == s.id) {p = jmxLocalPeerBean = new LocalPeerBean(this);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxLocalPeerBean = null;}} else {RemotePeerBean rBean = new RemotePeerBean(this, s);try {MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);jmxRemotePeerBean.put(s.id, rBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);}}}} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxQuorumBean = null;}try {/** Main loop*/while (running) {//判断当前节点状态switch (getPeerState()) {case LOOKING://如果是LOOKING 则进入选举流程LOG.info("LOOKING");ServerMetrics.getMetrics().LOOKING_COUNT.add(1);if (Boolean.getBoolean("readonlymode.enabled")) {LOG.info("Attempting to start ReadOnlyZooKeeperServer");// Create read-only server but don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide we're partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr = new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");} catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}//此处通过策略模式来决定当前用哪个选举算法来进行领导选举setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {try {reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}//此处通过策略模式决定当前用哪个选举算法来进行领导选举setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}}break;case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {observer.shutdown();setObserver(null);updateServerState();// Add delay jitter before we switch to LOOKING// state to reduce the load of ObserverMasterif (isRunning()) {Observer.waitForObserverElectionDelay();}}break;case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {follower.shutdown();setFollower(null);updateServerState();}break;case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}updateServerState();}break;}}} finally {LOG.warn("QuorumPeer main thread exited");MBeanRegistry instance = MBeanRegistry.getInstance();instance.unregister(jmxQuorumBean);instance.unregister(jmxLocalPeerBean);for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {instance.unregister(remotePeerBean);}jmxQuorumBean = null;jmxLocalPeerBean = null;jmxRemotePeerBean = null;}
}

FastLeaderElection 构造方法中调用starter方法

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {this.stop = false;this.manager = manager;starter(self, manager);
}/*** This method is invoked by the constructor. Because it is a* part of the starting procedure of the object that must be on* any constructor of this class, it is probably best to keep as* a separate method. As we have a single constructor currently,* it is not strictly necessary to have it separate.** @param self      QuorumPeer that created this object* @param manager   Connection manager*/
private void starter(QuorumPeer self, QuorumCnxManager manager) {this.self = self;proposedLeader = -1;proposedZxid = -1;//业务层发送队列 业务对象ToSendsendqueue = new LinkedBlockingQueue<ToSend>();//业务层接收队列 业务对象Notificationrecvqueue = new LinkedBlockingQueue<Notification>();this.messenger = new Messenger(manager);
}/*** This method starts the sender and receiver threads.*/
public void start() {this.messenger.start();
}/*** Constructor of class Messenger.** @param manager   Connection manager*/Messenger(QuorumCnxManager manager) {//创建发送投票信息this.ws = new WorkerSender(manager);this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");this.wsThread.setDaemon(true);//创建接收投票信息this.wr = new WorkerReceiver(manager);this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");this.wrThread.setDaemon(true);}/*** Starts instances of WorkerSender and WorkerReceiver*/void start() {//启动业务层发送线程 将消息发送给IO负责类QuorumCnxManagerthis.wsThread.start();//启动业务层接收线程 从IO负责类QuorumCnxManager 接收消息this.wrThread.start();}/*** Starts a new round of leader election. Whenever our QuorumPeer* changes its state to LOOKING, this method is invoked, and it* sends notifications to all other peers. LOOKFORLEADER 开始选举*/public Vote lookForLeader() throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);self.jmxLeaderElectionBean = null;}self.start_fle = Time.currentElapsedTime();try {/** The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority* of participants has voted for it.*///收到的投票Map<Long, Vote> recvset = new HashMap<Long, Vote>();/** The votes from previous leader elections, as well as the votes from the current leader election are* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than* the electionEpoch of the received notifications) in a leader election.*///存储选举结果Map<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = minNotificationInterval;synchronized (this) {//增加逻辑时钟logicalclock.incrementAndGet();//更新自己的zxid和epochupdateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = {}, proposed zxid=0x{}",self.getId(),Long.toHexString(proposedZxid));//发送投票 包括发送给自己sendNotifications();SyncedLearnerTracker voteSet;/** Loop in which we exchange notifications until we find a leader*///进行while循环 直到选举出leaderwhile ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {/** Remove next notification from queue, times out after 2 times* the termination time*///从IO线程里拿到投票信息 自己的投票也在这里处理Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*///如果为空 消息发完了 继续发送 一直到选出leader为止if (n == null) {if (manager.haveDelivered()) {sendNotifications();} else {//消息还没投递出去 可能是其他server还没启动 尝试再连接manager.connectAll();}/** Exponential backoff*///延长超时时间int tmpTimeOut = notTimeout * 2;notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);LOG.info("Notification time out: {}", notTimeout);//收到投票消息 判断收到的消息是不是属于这个集群内} else if (validVoter(n.sid) && validVoter(n.leader)) {/** Only proceed if the vote comes from a replica in the current or next* voting view for a replica in the current or next voting view.*///判断收到的消息的节点的状态switch (n.state) {case LOOKING:if (getInitLastLoggedZxid() == -1) {LOG.debug("Ignoring notification as our zxid is -1");break;}if (n.zxid == -1) {LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);break;}// If notification > current, replace and send messages out//判断接收到的节点epoch大于logicalclock  则表示当前是新一轮的选举if (n.electionEpoch > logicalclock.get()) {//更新本地logicalclocklogicalclock.set(n.electionEpoch);//清空接收队列recvset.clear();//检查收到的消息是否可以胜出 依次比较epoch zxid myidif (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {//胜出后 把投票改为对方的票据updateProposal(n.leader, n.zxid, n.peerEpoch);} else {//否则 票据不变updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}//继续广播 让其他节点知道我现在的票据sendNotifications();//如果收到的消息epoch小于当前节点的epoch 则忽略这条消息} else if (n.electionEpoch < logicalclock.get()) {LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",Long.toHexString(n.electionEpoch),Long.toHexString(logicalclock.get()));break;//如果epoch相同 继续比较zxid myid 如果胜出 则更新自己的票据 并发出广播} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}LOG.debug("Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",n.sid,n.leader,Long.toHexString(n.zxid),Long.toHexString(n.electionEpoch));// don't care about the version if it's in LOOKING state//添加到本机投票集合 用来做选举终结判断recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));//判断选举是否结束 默认算法是超过半数server同意if (voteSet.hasAllQuorums()) {// Verify if there is any change in the proposed leader//一直等到新的通知到达 直到超时while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {recvqueue.put(n);break;}}/** This predicate is true once we don't read any new* relevant message from the reception queue*///确定leaderif (n == null) {//修改状态setPeerState(proposedLeader, voteSet);Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;}}break;//OBSERVING 不参与选举投票case OBSERVING:LOG.debug("Notification from observer: {}", n.sid);break;//这两种需要参与选举case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*///判断epoch是否相同if (n.electionEpoch == logicalclock.get()) {//如果相同 加入本机的投票集合recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));//判断是否结束 如果结束 确认leader是否有效if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {//修改自己的状态并返回投票结果setPeerState(n.leader, voteSet);Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}}/** Before joining an established ensemble, verify that* a majority are following the same leader.** Note that the outofelection map also stores votes from the current leader election.* See ZOOKEEPER-1732 for more information.*/outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {synchronized (this) {logicalclock.set(n.electionEpoch);setPeerState(n.leader, voteSet);}Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);break;}} else {if (!validVoter(n.leader)) {LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);}if (!validVoter(n.sid)) {LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);}}}return null;} finally {try {if (self.jmxLeaderElectionBean != null) {MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());}}/*** Send notifications to all peers upon a change in our vote*//***   广播消息*/private void sendNotifications() {//循环发送for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();//消息实体ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch,qv.toString().getBytes());LOG.debug("Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"+ " {} (myid), 0x{} (n.peerEpoch) ",proposedLeader,Long.toHexString(proposedZxid),Long.toHexString(logicalclock.get()),sid,self.getId(),Long.toHexString(proposedEpoch));//添加到发送队列 这个队列会被workersender消费sendqueue.offer(notmsg);}}class WorkerSender extends ZooKeeperThread {volatile boolean stop;QuorumCnxManager manager;WorkerSender(QuorumCnxManager manager) {super("WorkerSender");this.stop = false;this.manager = manager;}public void run() {while (!stop) {try {//从发送队列中获取消息实体ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if (m == null) {continue;}process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}/*** Called by run() once there is a new message to send.** @param m     message to send*/void process(ToSend m) {ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);manager.toSend(m.sid, requestBuffer);}}

消息如何广播,看 sendNotifications 方法

  1. 调用sendNotifications sendqueue.offer(notmsg);
  2. WorkerSender 的run方法里执行process方法 进行发送广播

FastLeaderElection 选举过程

其实在这个投票过程中就涉及到几个类,
FastLeaderElection:FastLeaderElection实现了Election接口,实现各服务器之间基于 TCP 协议进行选举
Notification:内部类,Notification 表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的 id、zxid、选举周期等信息
ToSend:ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的 id、zxid、选举周期等信息
Messenger : Messenger 包 含 了 WorkerReceiver 和WorkerSender 两个内部类;
WorkerReceiver:实现了 Runnable 接口,是选票接收器。其会不断地从 QuorumCnxManager 中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue 中
WorkerSender:也实现了 Runnable 接口,为选票发送器,其会不断地从 sendqueue 中获取待发送的选票,并将其传递到底层 QuorumCnxManager 中

这篇关于zookeeper投票选举源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360426

相关文章

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专

Spring中Bean有关NullPointerException异常的原因分析

《Spring中Bean有关NullPointerException异常的原因分析》在Spring中使用@Autowired注解注入的bean不能在静态上下文中访问,否则会导致NullPointerE... 目录Spring中Bean有关NullPointerException异常的原因问题描述解决方案总结

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

python-nmap实现python利用nmap进行扫描分析

《python-nmap实现python利用nmap进行扫描分析》Nmap是一个非常用的网络/端口扫描工具,如果想将nmap集成进你的工具里,可以使用python-nmap这个python库,它提供了... 目录前言python-nmap的基本使用PortScanner扫描PortScannerAsync异

Oracle数据库执行计划的查看与分析技巧

《Oracle数据库执行计划的查看与分析技巧》在Oracle数据库中,执行计划能够帮助我们深入了解SQL语句在数据库内部的执行细节,进而优化查询性能、提升系统效率,执行计划是Oracle数据库优化器为... 目录一、什么是执行计划二、查看执行计划的方法(一)使用 EXPLAIN PLAN 命令(二)通过 S

Zookeeper安装和配置说明

一、Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。 ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境; ■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例; ■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置