MIT 6.824 2A Raft Election

2023-10-21 15:40
文章标签 mit raft 6.824 2a election

本文主要是介绍MIT 6.824 2A Raft Election,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

这个Lab吸取了上次的教训,尽量用小粒度的锁。
用的是主流的 electionLoop+pingLoop的实现,但是踩了很多坑,主要是Time.timerterm更新的问题。
这个链接:演示图像 和Raft论文的图2是很有用的,最好一开始就按照这两个做。
遇到问题也不要死磕,可以看下别人的,自己漏了哪个地方,因为多线程+选举的程序,debug真的无比困难。

文章目录

    • 知识点总结
    • Raft 2A
      • 一些坑
      • 完整代码

知识点总结

  • raft 如何进行leader选举?
    使用随机计时器

  • 为什么要有raft?它用来解决什么问题?
    raft是一个一致性算法,
    复制状态机(replicated state machines)用于对有单个集群leader的大型系统进行leader选举和配置信息的存储。复制状态机通常使用复制日志实现,一致性算法的任务是保证复制日志的一致性。

  • 共识算法的特性有哪些?
    1.在网络延迟、分区和包丢失、复制和重排序条件下保证安全性(永远返回正确的结果)
    2.只要大多数服务器可操作、可通信那么就是完全可用的
    3.不依赖时间来保证一致性
    4.少数速度较慢的服务器不影响整体系统性能

  • ※raft如何实现一致性?
    首先选举一位leader leader有管理复制日志的责任,接受来自客户的日志条目,在其它服务器上复制它们。
    这样就把一致性问题分为了三个子问题:leader选举,日志复制与一致性,安全性

  • brain split如何解决?
    在raft中,使用过半票决的方式解决。
    如果系统中有2*F+1个服务器,那么,最多可以接受F个服务器出现故障
    raft实际上更加依赖于过半服务,因为raft的每一个操作的过半服务器,必然有一个处于上一个操作的过半服务器中,新的过半服务器就会知道旧的过半服务器的term

  • raft在分布式数据库中的角色是什么?
    中间层,构建多副本日志。只有过半服务器拷贝了操作的副本,才能够继续执行数据库操作

  • raft应用层调用流程?
    客户端通过Start函数将请求存放在log中,commit之后,raft通过channel(ApplyCh)通知客户端成功。
    包括Log位置,term number等等信息。

Raft 2A

在这里插入图片描述

一些坑

  1. 选举没注意处理过期的Term 导致follower总是莫名其妙变成leader
  2. 选举没注意更新Term
  3. 不要用Time.timer 不熟的话很容易出错

完整代码

package raft//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, Term, isleader)
//   start agreement on a new log entry
// rf.GetState() (Term, isLeader)
//   ask a Raft for its current Term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//import ("math/rand"//	"bytes""sync""sync/atomic""time"//	"6.824/labgob""6.824/labrpc"
)// ApplyMsg
// as each Raft peer becomes aware that successive log Entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {CommandValid boolCommand      interface{}CommandIndex int// For 2D:SnapshotValid boolSnapshot      []byteSnapshotTerm  intSnapshotIndex int
}const (FOLLOWER = iotaCANDIDATELEADER
)
type LogEntry struct {Command interface{}Term    int
}//
// A Go object implementing a single Raft peer.
//
type Raft struct {mu        sync.Mutex          // Lock to protect shared access to this peer's statepeers     []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister          // Object to hold this peer's persisted stateme        int                 // this peer's index into peers[]dead      int32               // set by Kill()// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.//persistent for allrole			intcurrentTerm		intvotedFor		intlog 			[]*LogEntry//volatile for allcommitIndex		intlastApplied		int//volatile for leadersnextIndex		[]intmatchindex		[]intelectionTimer time.TimerlastElectionTimer 	time.TimeelectTime 		int64
}// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {var term intvar isleader bool// Your code here (2A).rf.mu.Lock()defer rf.mu.Unlock()term = rf.currentTermisleader = rf.role==LEADERreturn term, isleader
}
func (rf *Raft) getElectTime() int64 {rand.Seed(int64(rf.me)*1000)return rand.Int63n(100) + 300
}
func (rf *Raft) becomeCandidate() {_, err := DPrintf("%d change to candidate, it role is %d", rf.me,rf.role)if err != nil {return}rf.role = CANDIDATErf.currentTerm++rf.votedFor = rf.me//rf.persist()
}func (rf *Raft) becomeFollower(term int) {rf.role = FOLLOWERrf.currentTerm = termrf.votedFor = -1//rf.persist()//rf.lastElectionTimer = time.Now()_, err := DPrintf("%d change to follower with Term %d", rf.me, rf.currentTerm)if err != nil {return}
}func (rf *Raft) becomeLeader() {rf.role = LEADER//rf.persist()/*for i := 0; i < rf.PeersNum; i++ {rf.matchIndex[i] = 0rf.nextIndex[i] = len(rf.log)}*/go rf.pingLoop()_, err := DPrintf("%d change to leader", rf.me)if err != nil {return}
}func (rf *Raft) electionLoop() {for !rf.killed() {// 等待选举超时rf.mu.Lock()// 跳过leaderif rf.role == LEADER {_, err := DPrintf("%d role is leader,will skip election...", rf.me )if err != nil {return}rf.mu.Unlock()}else if time.Since(rf.lastElectionTimer).Milliseconds() > rf.electTime{_, err := DPrintf("%d time out!lastElectionTimer is %d," +"electTime is %d will get lock...", rf.me,time.Since(rf.lastElectionTimer).Milliseconds(),rf.electTime)if err != nil {return}//开始选举rf.becomeCandidate()// request vote from each Peer except itself// 发起选举args := RequestVoteArgs{rf.currentTerm,rf.me,rf.lastApplied,rf.lastApplied}var conMutex sync.Mutexcount := 1for server,_ := range rf.peers{if server==rf.me{continue}go func(server int){reply := RequestVoteReply{}ok := rf.SendRequestVote(server, &args, &reply)rf.mu.Lock()defer rf.mu.Unlock()_, err := DPrintf("%d send Request to %d", rf.me, server)if err != nil {return}// 注意!处理过期的termif ok==false||reply.Term<rf.currentTerm{_, err := DPrintf("%d return be", rf.me)if err != nil {return}return}// 更大的term,更新并且变成followerif reply.Term>rf.currentTerm{rf.currentTerm = reply.Termrf.becomeFollower(reply.Term)return}if reply.VoteGranted {conMutex.Lock()defer conMutex.Unlock()//  已经是leader了,跳过if rf.role==LEADER{return}count = count + 1if count> len(rf.peers)/2 {rf.becomeLeader()_, err := DPrintf("%d get %d ticket,become leader",rf.me, count)if err != nil {return}}}}(server)}rf.lastElectionTimer = time.Now()rf.electTime = rf.getElectTime()rf.mu.Unlock()}else{rf.mu.Unlock()}time.Sleep(80 * time.Millisecond)}
}func (rf *Raft) pingLoop() {for {rf.mu.Lock()// 如果不是leader退出循环if rf.role != LEADER {_, err := DPrintf("%d not leader!",rf.me)if err != nil {return}rf.mu.Unlock()return}args := AppendEntriesArgs{}args.Term = rf.currentTermargs.LeaderId = rf.mefor server,_ := range rf.peers{if server==rf.me{continue}rf.mu.Unlock()go func(server int){reply := AppendEntriesReply{}ok := rf.SendAppendEntries(server, &args, &reply)rf.mu.Lock()defer rf.mu.Unlock()if ok==false {return}// 注意!判断Termif reply.Term>rf.currentTerm{rf.becomeFollower(reply.Term)}_, err := DPrintf("leader %d send heart pack to %d",rf.me,server)if err != nil {return}}(server)rf.mu.Lock()}rf.mu.Unlock()time.Sleep(100*time.Millisecond)}
}//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
func (rf *Raft) persist() {// Your code here (2C).// Example:// w := new(bytes.Buffer)// e := labgob.NewEncoder(w)// e.Encode(rf.xxx)// e.Encode(rf.yyy)// data := w.Bytes()// rf.persister.SaveRaftState(data)
}//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {if data == nil || len(data) < 1 { // bootstrap without any state?return}// Your code here (2C).// Example:// r := bytes.NewBuffer(data)// d := labgob.NewDecoder(r)// var xxx// var yyy// if d.Decode(&xxx) != nil ||//    d.Decode(&yyy) != nil {//   error...// } else {//   rf.xxx = xxx//   rf.yyy = yyy// }
}//
// A service wants to switch to snapshot.  Only do so if Raft hasn't
// have more recent info since it communicate the snapshot on applyCh.
//
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {// Your code here (2D).return true
}// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {// Your code here (2D).}//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {// Your data here (2A, 2B).Term			intCandidateId		intLastLogIndex 	intLastLogTerm		int
}//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {// Your data here (2A).Term 	intVoteGranted	bool
}func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (2A, 2B).rf.mu.Lock()defer rf.mu.Unlock()_, err := DPrintf("%d RequestVote get from %d", rf.me,args.CandidateId)if err != nil {return}// 拒绝投票if args.Term<=rf.currentTerm{_, err := DPrintf("confuse vote to %d",args.CandidateId)if err != nil {return}reply.VoteGranted = falsereply.Term = rf.currentTermreturn}//无论如何,更新Termreply.Term = rf.currentTerm// 转换为followerif args.Term > rf.currentTerm{_, err := DPrintf("%d become to follower because of bigger term",rf.me)if err != nil {return}//转换为followerrf.becomeFollower(args.Term)}//每个任期只能投票给一个人if rf.votedFor == -1 {_, err := DPrintf("%d vote to %d",rf.me, args.CandidateId)if err != nil {return}reply.VoteGranted = truerf.votedFor = args.CandidateId// 授予投票也要更新时间rf.lastElectionTimer = time.Now()// 投票之后再次更新Term!reply.Term = rf.currentTerm}rf.persist()
}func (rf *Raft) SendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {ok := rf.peers[server].Call("Raft.RequestVote", args, reply)return ok
}type AppendEntriesArgs struct {Term intLeaderId intPrevLogIndex intPrevLogTerm	[]intEntries		[]intLeaderCommit	int
}type AppendEntriesReply struct{Term    intSuccess bool
}func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply){rf.mu.Lock()defer rf.mu.Unlock()// 小于当前任if args.Term<rf.currentTerm {reply.Term = rf.currentTermreply.Success = falsereturn}// 发现更大的任期,则转为该任期的followerif args.Term > rf.currentTerm {rf.becomeFollower(args.Term)}// 正确的心跳包reply.Success = truerf.lastElectionTimer = time.Now()//日志操作rf.persist()
}
func (rf *Raft) SendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)return ok
}
func (rf *Raft) Start(command interface{}) (int, int, bool) {index := -1term := -1isLeader := true// Your code here (2B).return index, term, isLeader
}//
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {atomic.StoreInt32(&rf.dead, 1)// Your code here, if desired.
}func (rf *Raft) killed() bool {z := atomic.LoadInt32(&rf.dead)return z == 1
}// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {for rf.killed() == false {// Your code here to check if a leader election should// be started and to randomize sleeping time using// time.Sleep().}
}// Make
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = me// Your initialization code here (2A, 2B, 2C).//一开始的角色是FOLLOWERrf.role = FOLLOWERrf.votedFor = -1rf.commitIndex = 0rf.lastApplied = 0rf.currentTerm = 0// initialize from state persisted before a crashrf.readPersist(persister.ReadRaftState())// start ticker goroutine to start elections//go rf.ticker()//设置选举过期时间rf.lastElectionTimer = time.Now()rf.electTime = rf.getElectTime()//开始选举循环go rf.electionLoop()return rf
}

这篇关于MIT 6.824 2A Raft Election的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/255348

相关文章

ZOJ 3715 Kindergarten Election

题意: n个人投票  唯一一个票数最多的人当选  1想当选  他可以通过给别人糖让不选他的人选他  问  最少需要多少糖 思路: 由于n比较小  可以枚举1当选时得了多少票  这样就可以贪心的使用糖 如果1当选时有i票  那么所有人都要先保证选票数<i  而且还要保证至少一个人<i-1  因为1还会投出一票 保证上述条件下  如果1票数已经超过i  则说明这次枚举是失败的  如果不

利用Go语言模拟实现Raft协议

近来学习到区块链,想要模拟实现 Raft 协议。但是发现网上教程很杂,或者说很多教程并不适合于新手从零开始进行实现。   本文将从头开始复现个人模拟实现 Raft 的过程,完成后整个模拟后,读者应该学会 Go 语言的基本语法、Rpc 编程的基本概念与用法、简易 Raft 协议的过程。   系统实现:本地 Raft 节点注册,Raft 节点的投票和选举,心跳监听,超时选举,Http监听,日志复

代码开源许可证傻傻分不清 Apache MIT GPL的区别

https://www.ruanyifeng.com/blog/2011/05/how_to_choose_free_software_licenses.html

hashicorp/raft 介绍与源代码分析(三): 集群节点恢复介绍

集群节点恢复 各种情况下,重新选主成功后,落后的 follower 需要赶上 leader 的状态: follower 已经落地的 log ,需要校对。与 leader 不一致的,直接丢弃follower 从头,或从最近的本地镜像中恢复,并追赶应用 log 到状态机 我们分析几种异常情况下,集群是如何自动恢复正常的: 1 个 follower 重启了,该 follower 如何最终使自己

hashicorp/raft 介绍与源代码分析(二): 领导人选举(二)

回顾 上章提到,基于节点的 keyCurrentTerm 、LastLogTerm 、 LastLogIndex 3 个持久化数据,在选举时,可以确定领导者 选择领导者的依据是哪个节点 log 最全,选谁 但是有附加条件的,该节点 log 最全,并且其他节点已经应用到状态机的 log ,该节点必须有 因此,不是所有情况下选举一定能成功的 最坏的情况下,找不到符合条件的 log 落地日志拥

Paxos、Raft不是一致性算法/协议?

点击上方“朱小厮的博客”,选择“设为星标” 后台回复”加群“获取公众号专属群聊入口 欢迎跳转到本文的原文链接:https://honeypps.com/architect/consistency-is-not-consensus/ 作为互联网中的一员,我们时常沉浸在“分布式”的氛围当中——高可用、高可靠、高性能等等词汇随处可见,CAP、BASE、2PC、Paxos、Raft等等名词也能信手捏来

MIT 6.5940 EfficientML.ai Fall 2023: Lab 1 Pruning

EfficientML.ai Lec 3 - Pruning and Sparsity (Part I) MIT 6.5940, Fall 2023, Zoom 本文是EfficientML.ai Fall 2023课程作业1练习答案,在本次练习里将会对经典的分类神经网络进行剪枝处理,减少模型大小和延迟。The goals of this assignment are as fo

MIT的10门免费线上课,YYDS!

大家好,我是小书童! 今天给大家推荐10门 MIT 线上课程,无需付费,千万不要错过,抓紧学习起来。 1、计算机科学和Python编程简介 通过这个课程,将会学习到 计算的概念Python编程语言一些简单的算法测试和调试算法复杂性的非正式介绍数据结构 课程链接: https://www.edx.org/learn/computer-science/massachusetts-institut

【MIT-BEVFusion代码解读】第四篇:融合特征fuser和解码特征decoder

文章目录 1. fuser模块2. decoder模块2.1 backbone模块2.2 neck模块 BEVFusion相关的其他文章链接: 【论文阅读】ICRA 2023|BEVFusion:Multi-Task Multi-Sensor Fusion with Unified Bird‘s-Eye View RepresentationMIT-BEVFusion训练

在 Mac 上安装 MIT Scheme 解释器的命令行方法

在 Mac 上安装 MIT Scheme 解释器的命令行方法 步骤如下: 第 1 步,安装 brew 。确保计算机已经连上了网络,然后打开 Mac 上的终端,输入如下的命令:   ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"       第 2 步,