MIT 6.824 2A Raft Election

2023-10-21 15:40
文章标签 mit raft 6.824 2a election

本文主要是介绍MIT 6.824 2A Raft Election,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

这个Lab吸取了上次的教训,尽量用小粒度的锁。
用的是主流的 electionLoop+pingLoop的实现,但是踩了很多坑,主要是Time.timerterm更新的问题。
这个链接:演示图像 和Raft论文的图2是很有用的,最好一开始就按照这两个做。
遇到问题也不要死磕,可以看下别人的,自己漏了哪个地方,因为多线程+选举的程序,debug真的无比困难。

文章目录

    • 知识点总结
    • Raft 2A
      • 一些坑
      • 完整代码

知识点总结

  • raft 如何进行leader选举?
    使用随机计时器

  • 为什么要有raft?它用来解决什么问题?
    raft是一个一致性算法,
    复制状态机(replicated state machines)用于对有单个集群leader的大型系统进行leader选举和配置信息的存储。复制状态机通常使用复制日志实现,一致性算法的任务是保证复制日志的一致性。

  • 共识算法的特性有哪些?
    1.在网络延迟、分区和包丢失、复制和重排序条件下保证安全性(永远返回正确的结果)
    2.只要大多数服务器可操作、可通信那么就是完全可用的
    3.不依赖时间来保证一致性
    4.少数速度较慢的服务器不影响整体系统性能

  • ※raft如何实现一致性?
    首先选举一位leader leader有管理复制日志的责任,接受来自客户的日志条目,在其它服务器上复制它们。
    这样就把一致性问题分为了三个子问题:leader选举,日志复制与一致性,安全性

  • brain split如何解决?
    在raft中,使用过半票决的方式解决。
    如果系统中有2*F+1个服务器,那么,最多可以接受F个服务器出现故障
    raft实际上更加依赖于过半服务,因为raft的每一个操作的过半服务器,必然有一个处于上一个操作的过半服务器中,新的过半服务器就会知道旧的过半服务器的term

  • raft在分布式数据库中的角色是什么?
    中间层,构建多副本日志。只有过半服务器拷贝了操作的副本,才能够继续执行数据库操作

  • raft应用层调用流程?
    客户端通过Start函数将请求存放在log中,commit之后,raft通过channel(ApplyCh)通知客户端成功。
    包括Log位置,term number等等信息。

Raft 2A

在这里插入图片描述

一些坑

  1. 选举没注意处理过期的Term 导致follower总是莫名其妙变成leader
  2. 选举没注意更新Term
  3. 不要用Time.timer 不熟的话很容易出错

完整代码

package raft//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, Term, isleader)
//   start agreement on a new log entry
// rf.GetState() (Term, isLeader)
//   ask a Raft for its current Term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//import ("math/rand"//	"bytes""sync""sync/atomic""time"//	"6.824/labgob""6.824/labrpc"
)// ApplyMsg
// as each Raft peer becomes aware that successive log Entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {CommandValid boolCommand      interface{}CommandIndex int// For 2D:SnapshotValid boolSnapshot      []byteSnapshotTerm  intSnapshotIndex int
}const (FOLLOWER = iotaCANDIDATELEADER
)
type LogEntry struct {Command interface{}Term    int
}//
// A Go object implementing a single Raft peer.
//
type Raft struct {mu        sync.Mutex          // Lock to protect shared access to this peer's statepeers     []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister          // Object to hold this peer's persisted stateme        int                 // this peer's index into peers[]dead      int32               // set by Kill()// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.//persistent for allrole			intcurrentTerm		intvotedFor		intlog 			[]*LogEntry//volatile for allcommitIndex		intlastApplied		int//volatile for leadersnextIndex		[]intmatchindex		[]intelectionTimer time.TimerlastElectionTimer 	time.TimeelectTime 		int64
}// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {var term intvar isleader bool// Your code here (2A).rf.mu.Lock()defer rf.mu.Unlock()term = rf.currentTermisleader = rf.role==LEADERreturn term, isleader
}
func (rf *Raft) getElectTime() int64 {rand.Seed(int64(rf.me)*1000)return rand.Int63n(100) + 300
}
func (rf *Raft) becomeCandidate() {_, err := DPrintf("%d change to candidate, it role is %d", rf.me,rf.role)if err != nil {return}rf.role = CANDIDATErf.currentTerm++rf.votedFor = rf.me//rf.persist()
}func (rf *Raft) becomeFollower(term int) {rf.role = FOLLOWERrf.currentTerm = termrf.votedFor = -1//rf.persist()//rf.lastElectionTimer = time.Now()_, err := DPrintf("%d change to follower with Term %d", rf.me, rf.currentTerm)if err != nil {return}
}func (rf *Raft) becomeLeader() {rf.role = LEADER//rf.persist()/*for i := 0; i < rf.PeersNum; i++ {rf.matchIndex[i] = 0rf.nextIndex[i] = len(rf.log)}*/go rf.pingLoop()_, err := DPrintf("%d change to leader", rf.me)if err != nil {return}
}func (rf *Raft) electionLoop() {for !rf.killed() {// 等待选举超时rf.mu.Lock()// 跳过leaderif rf.role == LEADER {_, err := DPrintf("%d role is leader,will skip election...", rf.me )if err != nil {return}rf.mu.Unlock()}else if time.Since(rf.lastElectionTimer).Milliseconds() > rf.electTime{_, err := DPrintf("%d time out!lastElectionTimer is %d," +"electTime is %d will get lock...", rf.me,time.Since(rf.lastElectionTimer).Milliseconds(),rf.electTime)if err != nil {return}//开始选举rf.becomeCandidate()// request vote from each Peer except itself// 发起选举args := RequestVoteArgs{rf.currentTerm,rf.me,rf.lastApplied,rf.lastApplied}var conMutex sync.Mutexcount := 1for server,_ := range rf.peers{if server==rf.me{continue}go func(server int){reply := RequestVoteReply{}ok := rf.SendRequestVote(server, &args, &reply)rf.mu.Lock()defer rf.mu.Unlock()_, err := DPrintf("%d send Request to %d", rf.me, server)if err != nil {return}// 注意!处理过期的termif ok==false||reply.Term<rf.currentTerm{_, err := DPrintf("%d return be", rf.me)if err != nil {return}return}// 更大的term,更新并且变成followerif reply.Term>rf.currentTerm{rf.currentTerm = reply.Termrf.becomeFollower(reply.Term)return}if reply.VoteGranted {conMutex.Lock()defer conMutex.Unlock()//  已经是leader了,跳过if rf.role==LEADER{return}count = count + 1if count> len(rf.peers)/2 {rf.becomeLeader()_, err := DPrintf("%d get %d ticket,become leader",rf.me, count)if err != nil {return}}}}(server)}rf.lastElectionTimer = time.Now()rf.electTime = rf.getElectTime()rf.mu.Unlock()}else{rf.mu.Unlock()}time.Sleep(80 * time.Millisecond)}
}func (rf *Raft) pingLoop() {for {rf.mu.Lock()// 如果不是leader退出循环if rf.role != LEADER {_, err := DPrintf("%d not leader!",rf.me)if err != nil {return}rf.mu.Unlock()return}args := AppendEntriesArgs{}args.Term = rf.currentTermargs.LeaderId = rf.mefor server,_ := range rf.peers{if server==rf.me{continue}rf.mu.Unlock()go func(server int){reply := AppendEntriesReply{}ok := rf.SendAppendEntries(server, &args, &reply)rf.mu.Lock()defer rf.mu.Unlock()if ok==false {return}// 注意!判断Termif reply.Term>rf.currentTerm{rf.becomeFollower(reply.Term)}_, err := DPrintf("leader %d send heart pack to %d",rf.me,server)if err != nil {return}}(server)rf.mu.Lock()}rf.mu.Unlock()time.Sleep(100*time.Millisecond)}
}//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
func (rf *Raft) persist() {// Your code here (2C).// Example:// w := new(bytes.Buffer)// e := labgob.NewEncoder(w)// e.Encode(rf.xxx)// e.Encode(rf.yyy)// data := w.Bytes()// rf.persister.SaveRaftState(data)
}//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {if data == nil || len(data) < 1 { // bootstrap without any state?return}// Your code here (2C).// Example:// r := bytes.NewBuffer(data)// d := labgob.NewDecoder(r)// var xxx// var yyy// if d.Decode(&xxx) != nil ||//    d.Decode(&yyy) != nil {//   error...// } else {//   rf.xxx = xxx//   rf.yyy = yyy// }
}//
// A service wants to switch to snapshot.  Only do so if Raft hasn't
// have more recent info since it communicate the snapshot on applyCh.
//
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {// Your code here (2D).return true
}// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {// Your code here (2D).}//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {// Your data here (2A, 2B).Term			intCandidateId		intLastLogIndex 	intLastLogTerm		int
}//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {// Your data here (2A).Term 	intVoteGranted	bool
}func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (2A, 2B).rf.mu.Lock()defer rf.mu.Unlock()_, err := DPrintf("%d RequestVote get from %d", rf.me,args.CandidateId)if err != nil {return}// 拒绝投票if args.Term<=rf.currentTerm{_, err := DPrintf("confuse vote to %d",args.CandidateId)if err != nil {return}reply.VoteGranted = falsereply.Term = rf.currentTermreturn}//无论如何,更新Termreply.Term = rf.currentTerm// 转换为followerif args.Term > rf.currentTerm{_, err := DPrintf("%d become to follower because of bigger term",rf.me)if err != nil {return}//转换为followerrf.becomeFollower(args.Term)}//每个任期只能投票给一个人if rf.votedFor == -1 {_, err := DPrintf("%d vote to %d",rf.me, args.CandidateId)if err != nil {return}reply.VoteGranted = truerf.votedFor = args.CandidateId// 授予投票也要更新时间rf.lastElectionTimer = time.Now()// 投票之后再次更新Term!reply.Term = rf.currentTerm}rf.persist()
}func (rf *Raft) SendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {ok := rf.peers[server].Call("Raft.RequestVote", args, reply)return ok
}type AppendEntriesArgs struct {Term intLeaderId intPrevLogIndex intPrevLogTerm	[]intEntries		[]intLeaderCommit	int
}type AppendEntriesReply struct{Term    intSuccess bool
}func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply){rf.mu.Lock()defer rf.mu.Unlock()// 小于当前任if args.Term<rf.currentTerm {reply.Term = rf.currentTermreply.Success = falsereturn}// 发现更大的任期,则转为该任期的followerif args.Term > rf.currentTerm {rf.becomeFollower(args.Term)}// 正确的心跳包reply.Success = truerf.lastElectionTimer = time.Now()//日志操作rf.persist()
}
func (rf *Raft) SendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)return ok
}
func (rf *Raft) Start(command interface{}) (int, int, bool) {index := -1term := -1isLeader := true// Your code here (2B).return index, term, isLeader
}//
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {atomic.StoreInt32(&rf.dead, 1)// Your code here, if desired.
}func (rf *Raft) killed() bool {z := atomic.LoadInt32(&rf.dead)return z == 1
}// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {for rf.killed() == false {// Your code here to check if a leader election should// be started and to randomize sleeping time using// time.Sleep().}
}// Make
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = me// Your initialization code here (2A, 2B, 2C).//一开始的角色是FOLLOWERrf.role = FOLLOWERrf.votedFor = -1rf.commitIndex = 0rf.lastApplied = 0rf.currentTerm = 0// initialize from state persisted before a crashrf.readPersist(persister.ReadRaftState())// start ticker goroutine to start elections//go rf.ticker()//设置选举过期时间rf.lastElectionTimer = time.Now()rf.electTime = rf.getElectTime()//开始选举循环go rf.electionLoop()return rf
}

这篇关于MIT 6.824 2A Raft Election的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/255348

相关文章

单图创造虚拟世界只需10秒!斯坦福MIT联合发布WonderWorld:高质量交互生成

文章链接:https://arxiv.org/pdf/2406.09394 项目地址: https://WonderWorld-2024.github.io/ 本文介绍了一种新颖的框架—— WonderWorld,它可以进行交互式三维场景外推,使用户能够基于单张输入图像和用户指定的文本探索和塑造虚拟环境。尽管现有方法在场景生成的视觉质量上有了显著改进,但这些方法通常是离线运行的,生成一个

分布式共识算法(故障容错算法)系列整理(四):Raft

五篇分布式共识系列文章合集: 分布式共识算法(拜占庭容错算法)的系列整理一:PBFT、PoW、PoS、DPos 分布式共识算法(故障容错算法)系列整理(二):Bully、Gossip、NWR 分布式共识算法(故障容错算法)系列整理(三):Paxos 分布式共识算法(故障容错算法)系列整理(四):Raft 分布式共识算法(故障容错算法)系列整理(五):ZAB Raft算法的成员身份(服务器节点状态

分布式一致性和CAP理论、Paxos算法、Raft算法、Zab协议

1.分布式一致性的重要性 在分布式系统中,一致性是一个至关重要的概念。分布式系统由多个节点组成,这些节点通过网络进行通信和协作。然而,由于网络延迟、节点故障等原因,分布式系统中的数据一致性往往面临着挑战。 一致性指的是在分布式系统中的所有节点上,对于某一数据的操作结果都是一致的。换句话说,所有节点应该具备相同的数据视图。如果一个节点对数据进行了修改,其他节点也应该能够感知到这个修改,并且在

分布式协议之巅 — 揭秘基础Paxos与Raft协议如何实现分布式系统达成一致性(非变种Paxos协议)

揭秘Paxos与Raft协议如何实现分布式系统达成一致性 前提介绍Paxos专题大纲Paxos协议Paxo协议的角色标准Paxos角色Proposer(提案者)Acceptor(接受者)Learner(学习者) 提案编号与确认值的组合解析Paxos协议的基石:Basis Paxos执行流程解析阶段一Prepare阶段Promise阶段 阶段二Accept阶段Accepted阶段 最后总

[bigdata-036] mit-scheme试用

1. 安装Mit-scheme apt-get install mit-scheme 2. 写一个hello wold代码, hi.scm (begin(display "Hello, World!")(newline)) 3. 在命令行执行 scheme,进入交互界面,然后输入命令 (load 'hi.scm') 将执行这个文件,然后输出 "Hello, World

Cannot open channel to 2 at election address CentOSA/192.168.184.128:3888 解决办法

解决办法 1、先确定网络是否正常? 2、先确定zk是否启动成功? 重新启动一下Zk即可!  /opt/modules/zookeeper-3.4.5#进入后,使用下面命令启动正常 bin/zkServer.sh start## sh zkServer.sh start 启动回报错误 LOOKING (my state) 2020-02-07 13:49:18,539 [myi

人工智能 | 深度学习最新技术综述(MIT公开课2019)

博主github:https://github.com/MichaelBeechan 博主CSDN:https://blog.csdn.net/u011344545 下载链接:https://download.csdn.net/download/u011344545/11170869 deeplearning.mit.edu MIT:https://github.com/lexfridman/

【基于Raft的k-v存储数据库实现】

基于Raft的k-v存储数据库实现 基本概念1. 什么是分布式系统2. 什么是Raft协议3. 什么是序列化和反序列化4. RPC相关5. c11的部分新特性6. 什么是共识,一致性算法7. 共识算法要满足的性质8. Raft中的一些重要概念8.1 Raft是如何保证一个Term只有一个Leader的?8.2 过程 原文链接 基本概念 1. 什么是分布式系统 建立在网络之

Paxos、Raft、ZAB

https://blog.csdn.net/qq_34370153/article/details/80998622