本文主要是介绍MIT6.5840-2023-Lab2B: Raft-Log,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前置知识
见上一篇 Lab2A。
实验内容
实现 RAFT,分为四个 part:leader election、log、persistence、log compaction。
实验环境
OS:WSL-Ubuntu-18.04
golang:go1.17.6 linux/amd64
Part 2B: log
这部分主要实现添加新的 log。
日志匹配特性(Log Matching Property):
- 如果在不同的日志中的两个条目拥有相同的索引和任期号,那么他们存储了相同的指令。
- 如果在不同的日志中的两个条目拥有相同的索引和任期号,那么他们之前的所有日志条目也全部相同。
第一个特性来自这样的一个事实,领导人最多在一个任期里在指定的一个日志索引位置创建一条日志条目,同时日志条目在日志中的位置也从来不会改变。第二个特性由附加日志 RPC 的一个简单的一致性检查所保证。在发送附加日志 RPC 的时候,领导人会把新的日志条目前紧挨着的条目的索引位置和任期号包含在日志内。如果跟随者在它的日志中找不到包含相同索引位置和任期号的条目,那么他就会拒绝接收新的日志条目。一致性检查就像一个归纳步骤:一开始空的日志状态肯定是满足日志匹配特性的,然后一致性检查在日志扩展的时候保护了日志匹配特性。因此,每当附加日志 RPC 返回成功时,领导人就知道跟随者的日志一定是和自己相同的了。
Start:
向 leader 添加log;
客户端的每一个请求都包含一条被复制状态机执行的指令。领导人把这条指令作为一条新的日志条目附加到日志中去(此时请求返回指令在日志中的 index,但不保证指令被提交;Start 函数返回了,随着时间的推移,对应于这个客户端请求的 ApplyMsg 从applyCh channel 中出现在了 key-value 层。只有在那个时候,key-value 层才会执行这个请求,并返回响应给客户端),然后并行地发起 AppendEntries RPCs 给其他的服务器,让他们复制这条日志条目。当这条日志条目被安全地复制,领导人会应用这条日志条目到它的状态机中然后把执行的结果返回给客户端。如果跟随者崩溃或者运行缓慢,再或者网络丢包,领导人会不断的重复尝试 AppendEntries RPCs (尽管已经回复了客户端)直到所有的跟随者都最终存储了所有的日志条目。
func (rf *Raft) Start(command interface{}) (int, int, bool) {index := -1term := -1isLeader := true// Your code here (2B).rf.mu.Lock()term = rf.currentTermisLeader = rf.state == Leaderif !isLeader {rf.mu.Unlock()return index, term, isLeader}log.Println(rf.me, "Start", "rf.currentTerm", rf.currentTerm, "command", command, "len(rf.log)", len(rf.log))rf.log = append(rf.log, Entry{Command: command,Term: term,})rf.nextIndex[rf.me] = len(rf.log)rf.matchIndex[rf.me] = len(rf.log) - 1rf.persist()index = len(rf.log) - 1rf.mu.Unlock()go rf.replicateLog() // replicate log to followerreturn index, term, isLeader
}
commit:
更新 commitIndex。
领导人知道一条当前任期内的日志记录是可以被提交的,只要它被存储到了大多数的服务器上。Raft 永远不会通过计算副本数目的方式去提交一个之前任期内的日志条目。只有领导人当前任期里的日志条目通过计算副本数目可以被提交;一旦当前任期的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。
func (rf *Raft) commit() {rf.mu.Lock()defer rf.mu.Unlock()if rf.state != Leader {return}for N := len(rf.log) - 1; N > rf.commitIndex; N-- {if rf.log[N].Term != rf.currentTerm {return}num := 0for i := 0; i < len(rf.matchIndex); i++ {if rf.matchIndex[i] >= N {num++}}if num > len(rf.peers)/2 {rf.commitIndex = Nlog.Println(rf.me, "commit", "rf.commitIndex", rf.commitIndex, "len(rf.log)", len(rf.log))return}}
}
apply:
当 commitIndex 落后 lastApplied,向 applyCh 传输 command。
func (rf *Raft) apply() {for rf.killed() == false {rf.mu.Lock()if rf.commitIndex > rf.lastApplied {savedLastApplied := rf.lastAppliedlogEntries := rf.log[rf.lastApplied+1 : rf.commitIndex+1]rf.lastApplied = rf.commitIndexfor i, entry := range logEntries {log.Println(rf.me, "apply", "rf.commitIndex", rf.commitIndex, "rf.lastApplied", rf.lastApplied, "command", entry.Command)rf.applyCh <- ApplyMsg{CommandValid: true,Command: entry.Command,CommandIndex: savedLastApplied + i + 1,}}}// if rf.commitIndex > rf.lastApplied {// log.Println(rf.me, "apply", "rf.commitIndex", rf.commitIndex, "rf.lastApplied", rf.lastApplied, "command", rf.log[rf.lastApplied+1].Command)// rf.lastApplied++// rf.applyCh <- ApplyMsg{// CommandValid: true,// Command: rf.log[rf.lastApplied].Command,// CommandIndex: rf.lastApplied,// }// }rf.mu.Unlock()time.Sleep(time.Duration(10) * time.Millisecond)}
}
这篇关于MIT6.5840-2023-Lab2B: Raft-Log的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!