zookeeper投票选举源码分析

2023-11-07 01:48

本文主要是介绍zookeeper投票选举源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 leader 选举

Leader 选举会分两个过程
启动的时候的 leader 选举、 leader 崩溃的时候的的选举

1. 服务器启动时的 leader

选举每个节点启动的时候状态都是 LOOKING,处于观望状态,接下来就开始进行选主流程进行 Leader 选举,至少需要两台机器(具体原因前面已经讲过了),我们选取 3 台机器组成的服务器集群为例。在集群初始化阶段,当有一台服务器 Server1 启动时,它本身是无法进行和完成 Leader 选举,当第二台服务器 Server2 启动时,这个时候两台机器可以相互通信,每台机器都试图找到 Leader,于是进入 Leader 选举过程。选举过程如下

(1) 每个 Server 发出一个投票。由于是初始情况,Server1和 Server2 都会将自己作为 Leader 服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID、epoch,使用(myid, ZXID,epoch)来表示,此时 Server1的投票为(1, 0),Server2 的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本
轮投票(epoch)、是否来自LOOKING状态的服务器。
(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行 PK,PK 规则如下
i. 优先检查 ZXID。ZXID 比较大的服务器优先作为Leader
ii. 如果 ZXID 相同,那么就比较 myid。myid 较大的服务器作为 Leader 服务器。
对于 Server1 而言,它的投票是(1, 0),接收 Server2的投票为(2, 0),首先会比较两者的 ZXID,均为 0,再
比较 myid,此时 Server2 的 myid 最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,
它不需要更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对
于 Server1、Server2 而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出
了 Leader。
(5) 改变服务器状态。一旦确定了 Leader,每个服务器就会更新自己的状态,如果是 Follower,那么就变更为
FOLLOWING,如果是 Leader,就变更为 LEADING。

2. 运行过程中的 leader 选举

当集群中的 leader 服务器出现宕机或者不可用的情况时,那么整个集群将无法对外提供服务,而是进入新一轮的
Leader 选举,服务器运行期间的 Leader 选举和启动时期的 Leader 选举基本过程是一致的。
(1) 变更状态。Leader 挂后,余下的非 Observer 服务器都会将自己的服务器状态变更为 LOOKING,然后开
始进入 Leader 选举过程。
(2) 每个 Server 会发出一个投票。在运行期间,每个服务器上的 ZXID 可能不同,此时假定 Server1 的 ZXID 为
123,Server3的ZXID为122;在第一轮投票中,Server1和 Server3 都会投自己,产生投票(1, 123),(3, 122),
然后各自将投票发送给集群中所有机器。接收来自各个服务器的投票。与启动时过程相同。
(3) 处理投票。与启动时过程相同,此时,Server1 将会成为 Leader。
(4) 统计投票。与启动时过程相同。
(5) 改变服务器的状态。与启动时过程相同

二 Leader 选举源码分析

1.从入口函数 QUORUMPEERMAIN 开始
2. QuorumPeer 重写了 Thread.start 方法
3. 初始化 LEADERELECTION 调用startLeaderElection
4. QuorumPeer 类的startLeaderElection方法中调用选举算法createElectionAlgorithm
5. createElectionAlgorithm方法 创建FastLeaderElection对象 构造函数中调用FastLeaderElection类的starter方法
6. 接下来调用 fle.start() , 也就是会调用 FastLeaderElection
start()方法,该方法主要是对发送线程和接收线程的初始
化 , 左 边 是 FastLeaderElection 的 start , 右 边 是
messager.start()
7. wsThread 和 wrThread 的 初 始 化 动 作 在
FastLeaderElection 的 starter 方法里面进行,这里面有两
个内部类,一个是 WorkerSender,一个是 WorkerReceiver,
负责发送投票信息和接收投票信息
8. 然后再回到 QuorumPeer.java。 FastLeaderElection 初始
化完成以后,调用 super.start(),最终运行 QuorumPeer 的
run 方法
9. 最重要的就是run方法 run方法中调用setCurrentVote(makeLEStrategy().lookForLeader());,最终根据策略应该运行 FastLeaderElection 中的选举算法

从入口函数 QuorumPeerMain开始

/*** To start the replicated server specify the configuration file name on* the command line.* @param args path to the configfile*/
public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {LOG.error("Invalid arguments, exiting abnormally", e);LOG.info(USAGE);System.err.println(USAGE);ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());} catch (ConfigException e) {LOG.error("Invalid config, exiting abnormally", e);System.err.println("Invalid config, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());} catch (DatadirException e) {LOG.error("Unable to access datadir, exiting abnormally", e);System.err.println("Unable to access datadir, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());} catch (AdminServerException e) {LOG.error("Unable to start AdminServer, exiting abnormally", e);System.err.println("Unable to start AdminServer, exiting abnormally");ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());} catch (Exception e) {LOG.error("Unexpected exception, exiting abnormally", e);ZKAuditProvider.addServerStartFailureAuditLog();ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());}LOG.info("Exiting normally");ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(),config.getDataLogDir(),config.getSnapRetainCount(),config.getPurgeInterval());purgeMgr.start();//判断是standalone模式还是集群模式if (args.length == 1 && config.isDistributed()) {//集群runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}
}public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");MetricsProvider metricsProvider;try {metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(),config.getMetricsProviderConfiguration());} catch (MetricsProviderLifeCycleException error) {throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);}try {ServerMetrics.metricsProviderInitialized(metricsProvider);ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;//为客户端提供读写的server 也就是2181的端口访问功能if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);}quorumPeer = getQuorumPeer();quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());//quorumPeer.setQuorumPeers(config.getAllMembers());quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());quorumPeer.setObserverMasterPort(config.getObserverMasterPort());quorumPeer.setConfigFileName(config.getConfigFilename());quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier() != null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}quorumPeer.initConfigInZKDatabase();quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.shouldUsePortUnification());quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());if (config.sslQuorumReloadCertFiles) {quorumPeer.getX509Util().enableCertFileReloading();}quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if (quorumPeer.isQuorumSaslAuthEnabled()) {quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();if (config.jvmPauseMonitorToRun) {quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));}//启动主线程quorumPeer.start();ZKAuditProvider.addZKStartStopAuditLog();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);} finally {if (metricsProvider != null) {try {metricsProvider.stop();} catch (Throwable error) {LOG.warn("Error while stopping metrics", error);}}}
}

调用 QuorumPeer 的 start方法

@Override
public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");}//恢复db  loaddatabase主要是从本地文件中恢复数据,以及获取最新的 zxidloadDataBase();startServerCnxnFactory();try {adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem starting AdminServer", e);System.out.println(e);}//选举初始化startLeaderElection();startJvmPauseMonitor();super.start();
}ResponderThread responder;public synchronized void stopLeaderElection() {responder.running = false;responder.interrupt();
}public synchronized void startLeaderElection() {try {//如果当前节点状态是LOOKING 投票给自己if (getPeerState() == ServerState.LOOKING) {currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}} catch (IOException e) {RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}//根据配置获取选举算法 配置选举算法,选举算法有 3 种,可以通过在 zoo.cfg 里
//面进行配置,默认是 fast 选举this.electionAlg = createElectionAlgorithm(electionType);
}@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {Election le = null;//TODO: use a factory rather than a switchswitch (electionAlgorithm) {case 1:throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");case 2:throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");case 3://leader选举io负责类QuorumCnxManager qcm = createCnxnManager();QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);if (oldQcm != null) {LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");oldQcm.halt();}QuorumCnxManager.Listener listener = qcm.listener;if (listener != null) {//启动已绑定的选举线程 等待集群中其他机器连接listener.start();//基于TCP的选举算法FastLeaderElection fle = new FastLeaderElection(this, qcm);fle.start();le = fle;} else {LOG.error("Null listener when initializing cnx manager");}break;default:assert false;}return le;
}@SuppressWarnings("deprecation")
protected Election makeLEStrategy() {LOG.debug("Initializing leader election protocol...");return electionAlg;
}protected synchronized void setLeader(Leader newLeader) {leader = newLeader;
}protected synchronized void setFollower(Follower newFollower) {follower = newFollower;
}protected synchronized void setObserver(Observer newObserver) {observer = newObserver;
}public synchronized ZooKeeperServer getActiveServer() {if (leader != null) {return leader.zk;} else if (follower != null) {return follower.zk;} else if (observer != null) {return observer.zk;}return null;
}boolean shuttingDownLE = false;@Override
public void run() {updateThreadName();LOG.debug("Starting quorum peer");try {jmxQuorumBean = new QuorumBean(this);MBeanRegistry.getInstance().register(jmxQuorumBean, null);for (QuorumServer s : getView().values()) {ZKMBeanInfo p;if (getId() == s.id) {p = jmxLocalPeerBean = new LocalPeerBean(this);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxLocalPeerBean = null;}} else {RemotePeerBean rBean = new RemotePeerBean(this, s);try {MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);jmxRemotePeerBean.put(s.id, rBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);}}}} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxQuorumBean = null;}try {/** Main loop*/while (running) {//判断当前节点状态switch (getPeerState()) {case LOOKING://如果是LOOKING 则进入选举流程LOG.info("LOOKING");ServerMetrics.getMetrics().LOOKING_COUNT.add(1);if (Boolean.getBoolean("readonlymode.enabled")) {LOG.info("Attempting to start ReadOnlyZooKeeperServer");// Create read-only server but don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide we're partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr = new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");} catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}//此处通过策略模式来决定当前用哪个选举算法来进行领导选举setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {try {reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}//此处通过策略模式决定当前用哪个选举算法来进行领导选举setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}}break;case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {observer.shutdown();setObserver(null);updateServerState();// Add delay jitter before we switch to LOOKING// state to reduce the load of ObserverMasterif (isRunning()) {Observer.waitForObserverElectionDelay();}}break;case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {follower.shutdown();setFollower(null);updateServerState();}break;case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}updateServerState();}break;}}} finally {LOG.warn("QuorumPeer main thread exited");MBeanRegistry instance = MBeanRegistry.getInstance();instance.unregister(jmxQuorumBean);instance.unregister(jmxLocalPeerBean);for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {instance.unregister(remotePeerBean);}jmxQuorumBean = null;jmxLocalPeerBean = null;jmxRemotePeerBean = null;}
}

FastLeaderElection 构造方法中调用starter方法

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {this.stop = false;this.manager = manager;starter(self, manager);
}/*** This method is invoked by the constructor. Because it is a* part of the starting procedure of the object that must be on* any constructor of this class, it is probably best to keep as* a separate method. As we have a single constructor currently,* it is not strictly necessary to have it separate.** @param self      QuorumPeer that created this object* @param manager   Connection manager*/
private void starter(QuorumPeer self, QuorumCnxManager manager) {this.self = self;proposedLeader = -1;proposedZxid = -1;//业务层发送队列 业务对象ToSendsendqueue = new LinkedBlockingQueue<ToSend>();//业务层接收队列 业务对象Notificationrecvqueue = new LinkedBlockingQueue<Notification>();this.messenger = new Messenger(manager);
}/*** This method starts the sender and receiver threads.*/
public void start() {this.messenger.start();
}/*** Constructor of class Messenger.** @param manager   Connection manager*/Messenger(QuorumCnxManager manager) {//创建发送投票信息this.ws = new WorkerSender(manager);this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");this.wsThread.setDaemon(true);//创建接收投票信息this.wr = new WorkerReceiver(manager);this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");this.wrThread.setDaemon(true);}/*** Starts instances of WorkerSender and WorkerReceiver*/void start() {//启动业务层发送线程 将消息发送给IO负责类QuorumCnxManagerthis.wsThread.start();//启动业务层接收线程 从IO负责类QuorumCnxManager 接收消息this.wrThread.start();}/*** Starts a new round of leader election. Whenever our QuorumPeer* changes its state to LOOKING, this method is invoked, and it* sends notifications to all other peers. LOOKFORLEADER 开始选举*/public Vote lookForLeader() throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);self.jmxLeaderElectionBean = null;}self.start_fle = Time.currentElapsedTime();try {/** The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority* of participants has voted for it.*///收到的投票Map<Long, Vote> recvset = new HashMap<Long, Vote>();/** The votes from previous leader elections, as well as the votes from the current leader election are* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than* the electionEpoch of the received notifications) in a leader election.*///存储选举结果Map<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = minNotificationInterval;synchronized (this) {//增加逻辑时钟logicalclock.incrementAndGet();//更新自己的zxid和epochupdateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = {}, proposed zxid=0x{}",self.getId(),Long.toHexString(proposedZxid));//发送投票 包括发送给自己sendNotifications();SyncedLearnerTracker voteSet;/** Loop in which we exchange notifications until we find a leader*///进行while循环 直到选举出leaderwhile ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {/** Remove next notification from queue, times out after 2 times* the termination time*///从IO线程里拿到投票信息 自己的投票也在这里处理Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*///如果为空 消息发完了 继续发送 一直到选出leader为止if (n == null) {if (manager.haveDelivered()) {sendNotifications();} else {//消息还没投递出去 可能是其他server还没启动 尝试再连接manager.connectAll();}/** Exponential backoff*///延长超时时间int tmpTimeOut = notTimeout * 2;notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);LOG.info("Notification time out: {}", notTimeout);//收到投票消息 判断收到的消息是不是属于这个集群内} else if (validVoter(n.sid) && validVoter(n.leader)) {/** Only proceed if the vote comes from a replica in the current or next* voting view for a replica in the current or next voting view.*///判断收到的消息的节点的状态switch (n.state) {case LOOKING:if (getInitLastLoggedZxid() == -1) {LOG.debug("Ignoring notification as our zxid is -1");break;}if (n.zxid == -1) {LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);break;}// If notification > current, replace and send messages out//判断接收到的节点epoch大于logicalclock  则表示当前是新一轮的选举if (n.electionEpoch > logicalclock.get()) {//更新本地logicalclocklogicalclock.set(n.electionEpoch);//清空接收队列recvset.clear();//检查收到的消息是否可以胜出 依次比较epoch zxid myidif (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {//胜出后 把投票改为对方的票据updateProposal(n.leader, n.zxid, n.peerEpoch);} else {//否则 票据不变updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}//继续广播 让其他节点知道我现在的票据sendNotifications();//如果收到的消息epoch小于当前节点的epoch 则忽略这条消息} else if (n.electionEpoch < logicalclock.get()) {LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",Long.toHexString(n.electionEpoch),Long.toHexString(logicalclock.get()));break;//如果epoch相同 继续比较zxid myid 如果胜出 则更新自己的票据 并发出广播} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}LOG.debug("Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",n.sid,n.leader,Long.toHexString(n.zxid),Long.toHexString(n.electionEpoch));// don't care about the version if it's in LOOKING state//添加到本机投票集合 用来做选举终结判断recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));//判断选举是否结束 默认算法是超过半数server同意if (voteSet.hasAllQuorums()) {// Verify if there is any change in the proposed leader//一直等到新的通知到达 直到超时while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {recvqueue.put(n);break;}}/** This predicate is true once we don't read any new* relevant message from the reception queue*///确定leaderif (n == null) {//修改状态setPeerState(proposedLeader, voteSet);Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;}}break;//OBSERVING 不参与选举投票case OBSERVING:LOG.debug("Notification from observer: {}", n.sid);break;//这两种需要参与选举case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*///判断epoch是否相同if (n.electionEpoch == logicalclock.get()) {//如果相同 加入本机的投票集合recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));//判断是否结束 如果结束 确认leader是否有效if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {//修改自己的状态并返回投票结果setPeerState(n.leader, voteSet);Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}}/** Before joining an established ensemble, verify that* a majority are following the same leader.** Note that the outofelection map also stores votes from the current leader election.* See ZOOKEEPER-1732 for more information.*/outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {synchronized (this) {logicalclock.set(n.electionEpoch);setPeerState(n.leader, voteSet);}Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);break;}} else {if (!validVoter(n.leader)) {LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);}if (!validVoter(n.sid)) {LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);}}}return null;} finally {try {if (self.jmxLeaderElectionBean != null) {MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());}}/*** Send notifications to all peers upon a change in our vote*//***   广播消息*/private void sendNotifications() {//循环发送for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();//消息实体ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch,qv.toString().getBytes());LOG.debug("Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"+ " {} (myid), 0x{} (n.peerEpoch) ",proposedLeader,Long.toHexString(proposedZxid),Long.toHexString(logicalclock.get()),sid,self.getId(),Long.toHexString(proposedEpoch));//添加到发送队列 这个队列会被workersender消费sendqueue.offer(notmsg);}}class WorkerSender extends ZooKeeperThread {volatile boolean stop;QuorumCnxManager manager;WorkerSender(QuorumCnxManager manager) {super("WorkerSender");this.stop = false;this.manager = manager;}public void run() {while (!stop) {try {//从发送队列中获取消息实体ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if (m == null) {continue;}process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}/*** Called by run() once there is a new message to send.** @param m     message to send*/void process(ToSend m) {ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);manager.toSend(m.sid, requestBuffer);}}

消息如何广播,看 sendNotifications 方法

  1. 调用sendNotifications sendqueue.offer(notmsg);
  2. WorkerSender 的run方法里执行process方法 进行发送广播

FastLeaderElection 选举过程

其实在这个投票过程中就涉及到几个类,
FastLeaderElection:FastLeaderElection实现了Election接口,实现各服务器之间基于 TCP 协议进行选举
Notification:内部类,Notification 表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的 id、zxid、选举周期等信息
ToSend:ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的 id、zxid、选举周期等信息
Messenger : Messenger 包 含 了 WorkerReceiver 和WorkerSender 两个内部类;
WorkerReceiver:实现了 Runnable 接口,是选票接收器。其会不断地从 QuorumCnxManager 中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue 中
WorkerSender:也实现了 Runnable 接口,为选票发送器,其会不断地从 sendqueue 中获取待发送的选票,并将其传递到底层 QuorumCnxManager 中

这篇关于zookeeper投票选举源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360426

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专