本文主要是介绍MIT 6.824 2A Raft Election,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
这个Lab吸取了上次的教训,尽量用小粒度的锁。
用的是主流的 electionLoop
+pingLoop
的实现,但是踩了很多坑,主要是Time.timer
和term
更新的问题。
这个链接:演示图像 和Raft论文的图2是很有用的,最好一开始就按照这两个做。
遇到问题也不要死磕,可以看下别人的,自己漏了哪个地方,因为多线程+选举的程序,debug真的无比困难。
文章目录
- 知识点总结
- Raft 2A
- 一些坑
- 完整代码
知识点总结
-
raft 如何进行leader选举?
使用随机计时器 -
为什么要有raft?它用来解决什么问题?
raft是一个一致性算法,
复制状态机(replicated state machines)用于对有单个集群leader的大型系统进行leader选举和配置信息的存储。复制状态机通常使用复制日志实现,一致性算法的任务是保证复制日志的一致性。 -
共识算法的特性有哪些?
1.在网络延迟、分区和包丢失、复制和重排序条件下保证安全性(永远返回正确的结果)
2.只要大多数服务器可操作、可通信那么就是完全可用的
3.不依赖时间来保证一致性
4.少数速度较慢的服务器不影响整体系统性能 -
※raft如何实现一致性?
首先选举一位leader leader有管理复制日志的责任,接受来自客户的日志条目,在其它服务器上复制它们。
这样就把一致性问题分为了三个子问题:leader选举,日志复制与一致性,安全性 -
brain split如何解决?
在raft中,使用过半票决的方式解决。
如果系统中有2*F+1个服务器,那么,最多可以接受F个服务器出现故障
raft实际上更加依赖于过半服务,因为raft的每一个操作的过半服务器,必然有一个处于上一个操作的过半服务器中,新的过半服务器就会知道旧的过半服务器的term -
raft在分布式数据库中的角色是什么?
中间层,构建多副本日志。只有过半服务器拷贝了操作的副本,才能够继续执行数据库操作 -
raft应用层调用流程?
客户端通过Start函数将请求存放在log中,commit之后,raft通过channel(ApplyCh)通知客户端成功。
包括Log位置,term number等等信息。
Raft 2A
一些坑
- 选举没注意处理过期的Term 导致follower总是莫名其妙变成leader
- 选举没注意更新Term
- 不要用Time.timer 不熟的话很容易出错
完整代码
package raft//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
// create a new Raft server.
// rf.Start(command interface{}) (index, Term, isleader)
// start agreement on a new log entry
// rf.GetState() (Term, isLeader)
// ask a Raft for its current Term, and whether it thinks it is leader
// ApplyMsg
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester)
// in the same server.
//import ("math/rand"// "bytes""sync""sync/atomic""time"// "6.824/labgob""6.824/labrpc"
)// ApplyMsg
// as each Raft peer becomes aware that successive log Entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {CommandValid boolCommand interface{}CommandIndex int// For 2D:SnapshotValid boolSnapshot []byteSnapshotTerm intSnapshotIndex int
}const (FOLLOWER = iotaCANDIDATELEADER
)
type LogEntry struct {Command interface{}Term int
}//
// A Go object implementing a single Raft peer.
//
type Raft struct {mu sync.Mutex // Lock to protect shared access to this peer's statepeers []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister // Object to hold this peer's persisted stateme int // this peer's index into peers[]dead int32 // set by Kill()// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.//persistent for allrole intcurrentTerm intvotedFor intlog []*LogEntry//volatile for allcommitIndex intlastApplied int//volatile for leadersnextIndex []intmatchindex []intelectionTimer time.TimerlastElectionTimer time.TimeelectTime int64
}// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {var term intvar isleader bool// Your code here (2A).rf.mu.Lock()defer rf.mu.Unlock()term = rf.currentTermisleader = rf.role==LEADERreturn term, isleader
}
func (rf *Raft) getElectTime() int64 {rand.Seed(int64(rf.me)*1000)return rand.Int63n(100) + 300
}
func (rf *Raft) becomeCandidate() {_, err := DPrintf("%d change to candidate, it role is %d", rf.me,rf.role)if err != nil {return}rf.role = CANDIDATErf.currentTerm++rf.votedFor = rf.me//rf.persist()
}func (rf *Raft) becomeFollower(term int) {rf.role = FOLLOWERrf.currentTerm = termrf.votedFor = -1//rf.persist()//rf.lastElectionTimer = time.Now()_, err := DPrintf("%d change to follower with Term %d", rf.me, rf.currentTerm)if err != nil {return}
}func (rf *Raft) becomeLeader() {rf.role = LEADER//rf.persist()/*for i := 0; i < rf.PeersNum; i++ {rf.matchIndex[i] = 0rf.nextIndex[i] = len(rf.log)}*/go rf.pingLoop()_, err := DPrintf("%d change to leader", rf.me)if err != nil {return}
}func (rf *Raft) electionLoop() {for !rf.killed() {// 等待选举超时rf.mu.Lock()// 跳过leaderif rf.role == LEADER {_, err := DPrintf("%d role is leader,will skip election...", rf.me )if err != nil {return}rf.mu.Unlock()}else if time.Since(rf.lastElectionTimer).Milliseconds() > rf.electTime{_, err := DPrintf("%d time out!lastElectionTimer is %d," +"electTime is %d will get lock...", rf.me,time.Since(rf.lastElectionTimer).Milliseconds(),rf.electTime)if err != nil {return}//开始选举rf.becomeCandidate()// request vote from each Peer except itself// 发起选举args := RequestVoteArgs{rf.currentTerm,rf.me,rf.lastApplied,rf.lastApplied}var conMutex sync.Mutexcount := 1for server,_ := range rf.peers{if server==rf.me{continue}go func(server int){reply := RequestVoteReply{}ok := rf.SendRequestVote(server, &args, &reply)rf.mu.Lock()defer rf.mu.Unlock()_, err := DPrintf("%d send Request to %d", rf.me, server)if err != nil {return}// 注意!处理过期的termif ok==false||reply.Term<rf.currentTerm{_, err := DPrintf("%d return be", rf.me)if err != nil {return}return}// 更大的term,更新并且变成followerif reply.Term>rf.currentTerm{rf.currentTerm = reply.Termrf.becomeFollower(reply.Term)return}if reply.VoteGranted {conMutex.Lock()defer conMutex.Unlock()// 已经是leader了,跳过if rf.role==LEADER{return}count = count + 1if count> len(rf.peers)/2 {rf.becomeLeader()_, err := DPrintf("%d get %d ticket,become leader",rf.me, count)if err != nil {return}}}}(server)}rf.lastElectionTimer = time.Now()rf.electTime = rf.getElectTime()rf.mu.Unlock()}else{rf.mu.Unlock()}time.Sleep(80 * time.Millisecond)}
}func (rf *Raft) pingLoop() {for {rf.mu.Lock()// 如果不是leader退出循环if rf.role != LEADER {_, err := DPrintf("%d not leader!",rf.me)if err != nil {return}rf.mu.Unlock()return}args := AppendEntriesArgs{}args.Term = rf.currentTermargs.LeaderId = rf.mefor server,_ := range rf.peers{if server==rf.me{continue}rf.mu.Unlock()go func(server int){reply := AppendEntriesReply{}ok := rf.SendAppendEntries(server, &args, &reply)rf.mu.Lock()defer rf.mu.Unlock()if ok==false {return}// 注意!判断Termif reply.Term>rf.currentTerm{rf.becomeFollower(reply.Term)}_, err := DPrintf("leader %d send heart pack to %d",rf.me,server)if err != nil {return}}(server)rf.mu.Lock()}rf.mu.Unlock()time.Sleep(100*time.Millisecond)}
}//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
func (rf *Raft) persist() {// Your code here (2C).// Example:// w := new(bytes.Buffer)// e := labgob.NewEncoder(w)// e.Encode(rf.xxx)// e.Encode(rf.yyy)// data := w.Bytes()// rf.persister.SaveRaftState(data)
}//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {if data == nil || len(data) < 1 { // bootstrap without any state?return}// Your code here (2C).// Example:// r := bytes.NewBuffer(data)// d := labgob.NewDecoder(r)// var xxx// var yyy// if d.Decode(&xxx) != nil ||// d.Decode(&yyy) != nil {// error...// } else {// rf.xxx = xxx// rf.yyy = yyy// }
}//
// A service wants to switch to snapshot. Only do so if Raft hasn't
// have more recent info since it communicate the snapshot on applyCh.
//
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {// Your code here (2D).return true
}// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {// Your code here (2D).}//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {// Your data here (2A, 2B).Term intCandidateId intLastLogIndex intLastLogTerm int
}//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {// Your data here (2A).Term intVoteGranted bool
}func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (2A, 2B).rf.mu.Lock()defer rf.mu.Unlock()_, err := DPrintf("%d RequestVote get from %d", rf.me,args.CandidateId)if err != nil {return}// 拒绝投票if args.Term<=rf.currentTerm{_, err := DPrintf("confuse vote to %d",args.CandidateId)if err != nil {return}reply.VoteGranted = falsereply.Term = rf.currentTermreturn}//无论如何,更新Termreply.Term = rf.currentTerm// 转换为followerif args.Term > rf.currentTerm{_, err := DPrintf("%d become to follower because of bigger term",rf.me)if err != nil {return}//转换为followerrf.becomeFollower(args.Term)}//每个任期只能投票给一个人if rf.votedFor == -1 {_, err := DPrintf("%d vote to %d",rf.me, args.CandidateId)if err != nil {return}reply.VoteGranted = truerf.votedFor = args.CandidateId// 授予投票也要更新时间rf.lastElectionTimer = time.Now()// 投票之后再次更新Term!reply.Term = rf.currentTerm}rf.persist()
}func (rf *Raft) SendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {ok := rf.peers[server].Call("Raft.RequestVote", args, reply)return ok
}type AppendEntriesArgs struct {Term intLeaderId intPrevLogIndex intPrevLogTerm []intEntries []intLeaderCommit int
}type AppendEntriesReply struct{Term intSuccess bool
}func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply){rf.mu.Lock()defer rf.mu.Unlock()// 小于当前任if args.Term<rf.currentTerm {reply.Term = rf.currentTermreply.Success = falsereturn}// 发现更大的任期,则转为该任期的followerif args.Term > rf.currentTerm {rf.becomeFollower(args.Term)}// 正确的心跳包reply.Success = truerf.lastElectionTimer = time.Now()//日志操作rf.persist()
}
func (rf *Raft) SendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)return ok
}
func (rf *Raft) Start(command interface{}) (int, int, bool) {index := -1term := -1isLeader := true// Your code here (2B).return index, term, isLeader
}//
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {atomic.StoreInt32(&rf.dead, 1)// Your code here, if desired.
}func (rf *Raft) killed() bool {z := atomic.LoadInt32(&rf.dead)return z == 1
}// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {for rf.killed() == false {// Your code here to check if a leader election should// be started and to randomize sleeping time using// time.Sleep().}
}// Make
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = me// Your initialization code here (2A, 2B, 2C).//一开始的角色是FOLLOWERrf.role = FOLLOWERrf.votedFor = -1rf.commitIndex = 0rf.lastApplied = 0rf.currentTerm = 0// initialize from state persisted before a crashrf.readPersist(persister.ReadRaftState())// start ticker goroutine to start elections//go rf.ticker()//设置选举过期时间rf.lastElectionTimer = time.Now()rf.electTime = rf.getElectTime()//开始选举循环go rf.electionLoop()return rf
}
这篇关于MIT 6.824 2A Raft Election的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!