本文主要是介绍利用Go语言模拟实现Raft协议,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
近来学习到区块链,想要模拟实现 Raft 协议。但是发现网上教程很杂,或者说很多教程并不适合于新手从零开始进行实现。
本文将从头开始复现个人模拟实现 Raft 的过程,完成后整个模拟后,读者应该学会 Go 语言的基本语法、Rpc 编程的基本概念与用法、简易 Raft 协议的过程。
系统实现:本地 Raft 节点注册,Raft 节点的投票和选举,心跳监听,超时选举,Http监听,日志复制。
Tips:本文是按照作者实现过程进行复现,因此如果想要学习可以通篇慢慢读下去,全篇大概1.2W字左右。如果需要直接的代码可以参考以下链接,每个部分都放在不同的文件夹中,理论上可以直接运行。不拒绝白嫖代码,但如果有用请点一个星。
Gitee 地址:https://gitee.com/moisten-white/go-raft
GitHub 地址:https://github.com/huang-juanjuan/Go-Raft
文章目录
- 一、Raft 关键算法
- 二、创建 Raft 节点组
- 2.1 Raft 节点的创建
- 2.2 Rpc 服务注册
- 2.3 Peers 的构建
- 2.4 测试主函数
- 三、投票功能的实现
- 3.1 阻塞以及心跳检测功能
- 3.2 判断能否成为 Candidate
- 3.3 Candidate 进行选举
- 3.4 节点回应投票
- 3.5 Candidate 收集投票
- 3.6 选举定时器
- 3.7 测试主函数
- 四、心跳检测
- 4.1 Leader 发送心跳消息
- 4.2 Follower 回应心跳消息
- 4.3 心跳定时器
- 4.4 测试结果
- 五、创建日志
- 5.1 监听 Http
- 5.2 取消监听
- 5.3 测试结果
- 六、日志复制
- 6.1 Leader 广播新日志
- 6.2 Leader 广播日志复制命令
- 6.3 Follower 回应日志复制
- 6.4 Leader 更改日志复制命令
- 6.5 Follower 复制日志
- 6.6 测试结果
- 七、日志提交
- 八、小结
一、Raft 关键算法
本文按照主要 9 个算法模拟实现 Raft 协议。
这一步很关键,对于第一次尝试的初学者,最开始往往不知道从何处下手,这也是网上很多 Raft 协议模拟的文章的问题。许多文章按照整个 Raft 实现的思路,只有完成所有代码才能知道项目是否完成,这对初学者来说是非常不友好的。
本文将按照本人实现的过程,分成若干个可以验证的部分进行讲解,读者可以同时完成每个一级标题后进行验证代码的正确性。
二、创建 Raft 节点组
2.1 Raft 节点的创建
初学者针对 Raft 实现时,应该针对当前需要一步步进行舔砖加瓦。对于 Raft 结构体的元素,一开始包含基本的 ID、Peers、currentRole 等即可,而 log 可以先用 []string 进行代替。log 具体的结构体等内容,后续再进行逐步添加。以下是个人最开始使用的 Raft 结构体,以及如何进行初始化,供参考:
// Raft 结构 type Raft struct { mu sync.RWMutex // 互斥锁 id string // 端口号peers []string // 所有节点的端口号 currentTerm int // 当前 Term 号 currentRole Role // 当前角色 currentLeader string // 当前 Leader 的端口号 Logs []string // 日志votedFor string // 当前 Term 中给谁投了票 voteReceived []string // 收到的同意投票的端口号sentLength map[string]int // 每个节点日志复制的插入点 ackedLength map[string]int // 每个节点已接收的日志长度electionTimer *time.Timer // 选举定时器 heartBeatTimer *time.Ticker // 心跳计时器 lastHeartBeatTime int64 // 上次收到心跳的时间 timeout int // 心跳超时时间 }// NewRaft 创建并初始化一个新的 Raft 实例 func NewRaft(id string) *Raft { rf := &Raft{id: id,peers: []string{},currentTerm: 0,currentRole: Follower,currentLeader: "null",Logs: []string{},votedFor: "null",voteReceived: []string{},sentLength: make(map[string]int),ackedLength: make(map[string]int),electionTimer: time.NewTimer(time.Duration(rand.Intn(5000)+5000) * time.Millisecond),heartBeatTimer: time.NewTicker(1000 * time.Millisecond),lastHeartBeatTime: time.Now().UnixMilli(),timeout: 5, } return rf }
先有了 Raft 的初始化,我们先进行的尝试是创建 Raft 节点,并使若干个节点之间能够相互更新,形成一个群组。在学习过程中,本人了解到的是 Raft 节点有一个上位节点作为中心节点,用于 Term 号的广播和维护 Peers 记录所有节点端口号。
本人在此的思路是“使用命令行参数,在创建节点的时候指定节点的端口号,并指定中心节点的端口号,新节点通过和指定的中心节点通信创建自身 Peers,中心节点广播通知已有节点添加新节点”。这样便能在中心节点挂掉时,能够继续维护节点组的运行。同时这种方法对于初学者比较友好,省略了构建中心节点的不同属性、方法和接口等。
2.2 Rpc 服务注册
为了能实现广播等功能,我们需要使用 Rpc 编程,对于初学者来说,这部分也是最难入门的一点。
首先是如何理解 Rpc 编程,Rpc 全程 Remote Procedure Call,即“远程过程调用”。本人的理解是,把 Raft 节点想象成若干个物理上分散的节点,每个节点就像一个实体类,Rpc 函数就是类的方法,Rpc 就是连接到实体类,通过连接调用实体类的方法。
下列代码中 client 就相当于实体类的一个引用,使用 client.call 调用 Raft 节点的 ReplyVote 函数。
client, err := rpc.DialHTTP("tcp", peer) if err != nil {log.Println("Dialing error: ", err)rf.mu.Lock()rf.peers = append(rf.peers[:index], rf.peers[index+1:]...)rf.mu.Unlock()return } var reply RequestVoteReply err = client.Call("Raft.ReplyVote", args, &reply) if err != nil {log.Println("RPC error: ", err)return }
本人在注册 Rpc 时也经历了非常长的时间去理解,此处借鉴了另一个人的用法,同时注册了 Http 请求的监听,便于之后的日志的创建。
借鉴地址:https://github.com/corgi-kx/blockchain_consensus_algorithm/tree/master/raft
// startRaftServer 启动一个基于 HTTP 的 RPC 服务器,用于处理 Raft 节点之间的 RPC 请求。
func startRaftServer(rf *Raft, port string) {// 将 Raft 实例 rf 注册为一个 RPC 服务,以便其他节点可以通过 RPC 调用 rf 的方法。err := rpc.Register(rf)if err != nil {log.Panic(err)}// 将 RPC 服务绑定到 HTTP 协议上,允许通过 HTTP 协议处理 RPC 调用。// 这会为 RPC 服务创建一个 "/rpc" 端点,并监听来自 HTTP 请求的 RPC 调用。rpc.HandleHTTP()// 在指定的端口上启动 TCP 监听器,以便接收来自其他 Raft 节点的连接请求。listener, err := net.Listen("tcp", ":"+port)if err != nil {log.Fatalf("Error starting server: %s", err)}// 使用 HTTP 服务器处理通过监听器接收到的连接。// nil 代表使用默认的 HTTP 请求多路复用器 (http.DefaultServeMux) 来分发请求。// 这个操作在一个新的 Go routine 中执行,以便服务器可以并行处理多个请求。go http.Serve(listener, nil)
}
此处在讲解一下 go function() 的用法,本人的理解就是将 go routine 当做一个线程,go function() 就是启动了一个 function() 函数作为线程运行,此时原本调用 go function() 的函数即使退出也不影响 function() 的运行,除非整个程序的终止或满足退出条件。还可以通过信号机制进行管理,但是此项目并未使用。
2.3 Peers 的构建
这一小节是这部分最关键的内容,即对于一个已有的节点组,新节点如何加入这个群组。按照上文提到的方法,关键在于设置一个中心节点,让新节点通过中心节点得到已有节点,再通过中心节点让所有已有节点能更新新节点。
为此我们需要两个函数,一个用于已有节点进行更新,一个用于新节点的注册。
首先是已有节点进行更新,即添加一个新节点的端口号,在本人的 Raft 结构体中可以看到 Peers 是一个 string 数组用于记录端口号,添加新节点的过程其实类似于数组的添加,但是需要注意的是避免重复添加的过程,因此实际上可以利用 map 进行记录。
以下为添加节点的 Rpc 函数,对于 Rpc 函数我们需要注意的是,函数名前面的 (rf *Raft),表示这是一个 Raft 的函数。Rpc 函数的返回值也必须为 error,其参数也必须为两个指针类型,一个用于传参,一个可以用于记录返回值,因此通常将两个参数用结构体进行包装。
// 定义请求和响应结构 type AddPeerArgs struct { // 新节点的端口号NewPeer string }type AddPeerReply struct { // 更新后的新群组 peersPeers []string }// Rpc 函数用于广播通知已有节点添加新节点 func (rf *Raft) AddPeer(args *AddPeerArgs, reply *AddPeerReply) error { // 先对 rf 上锁,避免出现广播中 peers 改变导致出现重复添加的情况rf.mu.Lock() // defer 表示函数退出时再执行解锁操作defer rf.mu.Unlock()// 遍历 peers 检查是否已经存在该 peerfor _, peer := range rf.peers {if peer == args.NewPeer {fmt.Printf("Node %s: Peer %s already exists\n", rf.id, args.NewPeer)reply.Peers = rf.peersreturn nil}}// 如果不存在则添加rf.peers = append(rf.peers, args.NewPeer)reply.Peers = rf.peers// 打印更新后的 peers 列表,此处可以作为验证性打印,检查是否正确更新节点fmt.Printf("Node %s updated peers: %v\n", rf.id, rf.peers)return nil }
以下为新节点的注册函数,新节点将端口号包装后,调用中心节点的该函数进行注册,通过 reply 将已有的所有节点告知新节点进行注册,再将添加了新节点的端口号广播给已有节点进行更新。
// 该方法会将新节点添加到当前节点的 peers 列表中,并将新节点信息广播给其他节点。 func (rf *Raft) RegisterNode(args *AddPeerArgs, reply *AddPeerReply) error {rf.mu.Lock() defer rf.mu.Unlock() // 检查是否已经存在该 peerfor _, peer := range rf.peers {if peer == args.NewPeer {fmt.Printf("Node %s: Peer %s already exists\n", rf.id, args.NewPeer)reply.Peers = rf.peersreturn nil}}// 如果不存在则添加rf.peers = append(rf.peers, args.NewPeer)// 打印当前节点的 peers 列表,用作调试fmt.Printf("Node %s updated peers: %v\n", rf.id, rf.peers)// 将新节点信息广播给当前节点的其他 peersfor _, peer := range rf.peers {// 跳过新节点本身和当前节点if peer == args.NewPeer || peer == "localhost:"+rf.id {continue}// 尝试通过 RPC 连接到其他节点client, err := rpc.DialHTTP("tcp", peer)if err != nil {log.Println("Dialing:", err) // 如果连接失败,记录错误并继续下一个节点continue}// 准备将新节点添加到其他节点的 peers 列表中addPeerArgs := &AddPeerArgs{NewPeer: args.NewPeer}var addPeerReply AddPeerReply// 调用其他节点的 AddPeer 方法,将新节点信息发送过去err = client.Call("Raft.AddPeer", addPeerArgs, &addPeerReply)if err != nil {log.Println("RPC error:", err)}}return nil // 方法执行成功,返回 nil 表示没有错误 }
2.4 测试主函数
在完成上面三个小节之后,我们的系统应该能基本做到构建一个 Raft 节点组,能实现新节点的添加,这部分可以通过如下的主函数进行测试。需要注意的是,在广播新节点端口号时,在端口号前添加了 “localhost:”,使本地节点上可以相互进行连接通信。
func main() {// 定义命令行参数:端口号和中心节点的地址 // flag 使用 import 导入portPtr := flag.String("port", "0", "端口号")centralNodePtr := flag.String("central", "localhost:8040", "中心节点(默认 localhost:8040 )")flag.Parse() // 解析命令行参数// 检查是否提供了有效的端口号,如果端口号为 "0",程序退出if *portPtr == "0" {fmt.Println("请确认端口号")os.Exit(1)}// 创建一个新的 Raft 实例,使用提供的端口号rf := NewRaft(*portPtr)// 启动 Raft 服务器,监听 RPC 请求go startRaftServer(rf, *portPtr)// 尝试通过 RPC 连接到中心节点client, err := rpc.DialHTTP("tcp", *centralNodePtr)if err != nil {log.Fatalf("Error connecting to central node: %s", err) // 如果连接失败,打印错误信息并退出程序}// 创建请求参数,将当前节点的信息传递给中心节点args := &AddPeerArgs{NewPeer: "localhost:" + *portPtr}var reply AddPeerReply// 调用中心节点的 RegisterNode 方法,将当前节点注册到集群中err = client.Call("Raft.RegisterNode", args, &reply)if err != nil {log.Fatalf("RPC error: %s", err) // 如果调用失败,打印错误信息并退出程序}// 更新当前节点的 peers 列表,使用从中心节点获取的 peers 列表rf.mu.Lock()rf.peers = reply.Peersfmt.Printf("Node %s initial peers: %v\n", rf.id, rf.peers)rf.mu.Unlock()// 阻塞 main 函数,防止程序退出select {} }
在文件目录下使用 go build -o test.exe 后生成可执行文件,调用验证 Peers 能否正确更新,命令行参数的使用,以及是否会出现重复添加等情况。
三、投票功能的实现
在这部分,我们将实现基本的投票功能。当然,由于心跳检测功能还不够完善,所以还不能算是一次完整的选举过程,但为了尽量每一部分都可以验证,本节还是非常有必要的。
3.1 阻塞以及心跳检测功能
在投票功能中,一个 Follower 首先需要通过变成 Candidate 进行收集投票才能进一步变为 Leader,因此我们需要考虑什么情况下变成 Candidate。
因此我们需要修改主函数,让 Raft 节点在创立后循环判断能否成为 Candidate;如果不能,是否收到心跳消息,这部分的伪代码如下:
// 循环的标签 Circle:for {if rf 能否变为 Candidate {// 成为候选人节点后,向其他节点请求选票进行选举rf 开始进行选举rf.mu.Lock()if rf 选举成功 {fmt.Printf("Node %s has become the leader\n", rf.id)rf.mu.Unlock()break} else {// 选举过程中失败,此时可能是已有更新的 Term 或是选举超时// 如果出现更新的 Term,则下一次判断能否成为 Candidate 时退出循环// 如果选举超时或群组只有当前节点一个节点,则重复进行选举rf.mu.Unlock()}} else {// 变为 Candidate 失败break}}// 进行心跳检测for {// 5秒检测一次time.Sleep(5 second)rf.mu.Lock()if rf 不是领导者 && rf 收到过心跳消息 && 上一次心跳消息距离现在已超时 {fmt.Printf("心跳检测超时,已超过%d秒\n", rf.timeout)fmt.Println("即将重新开启选举")// 重新初始化 Raft 节点信息,准备变为 Candidategoto Circle} else {rf.mu.Unlock()}}
3.2 判断能否成为 Candidate
在上一小节中我们提到如何判断能否成为 Candidate,为此我们需要一个专门的函数进行处理。在这一步,我们需要使用随机退避原则处理一种会发生活锁现象的情况。
假设现有两个节点A、B,A 启动后变为 Candidate,随后 B 启动后变为 Candidate。由于开始收集投票时,节点首先会为自己投票,因此 A.VoteFor = A,B.VoteFor = B。对于A 发起的选举,B 已给自己投过票,因此拒绝投票,同理 A 也因此拒绝给 B 投票。于是 A、B 在成为 Candidate 的过程中都失败了,相同时间的心跳检测后,上述过程循环进行,也就产生了活锁。
为解决矛盾,应该设置选举前,随机休眠一段时间然后重新开始选举,避免term号不停更新。此处需要注意的是休眠随机时间的位置,不可放在上锁之后。原因在于之后收集投票,为了保证安全也是要对数据上锁保护的,如果此时在上锁后进行休眠,则之后收集投票会被阻塞,导致收集投票时,节点已经成为 Candidate 了,休眠因此是无意义的。
// 修改节点为候选人状态,返回值不是 error,说明函数可以类比为 Raft 类的成员函数 // 可以使用rf.becomCandidate() 进行调用,可以访问 rf 结构体的所有成员 func (rf *Raft) becomeCandidate() bool {//休眠随机时间后,再开始成为候选人r := rand.Int63n(3000) + 1000time.Sleep(time.Duration(r) * time.Millisecond)rf.mu.Lock()defer rf.mu.Unlock()// 只能从 Follwer 变为 Candidate,并且当前 Term 中没有 Leader 且节点未投过票if rf.currentRole == Follower && rf.currentLeader == "null" && rf.votedFor == "null" {rf.currentRole = Candidaterf.votedFor = rf.idrf.currentTerm += 1rf.voteReceived = append(rf.voteReceived, "localhost:"+rf.id)fmt.Println("本节点已变更为候选人状态")return true} else {return false} }
3.3 Candidate 进行选举
当节点从 Follower 变为 Candidate 后,即将开始进行选举。这个过程要对当前 Peers 中的所有节点收集投票,每收集一个节点的结果,就判断是否需要进行退出选举过程。可以看到
在这里我们需要使用一个锁的组进行控制,确保先完成选举投票过程,再进行下一步的过程。
// 收集投票信息结构体 type RequestVoteArgs struct {NodeId string // 候选人的IDTerm int // 发起请求节点的 TermLastLogIndex int // 最新的日志索引LastLogTerm int // 最新日志的 Term }type RequestVoteReply struct {NodeId string // Follower 的 IDTerm int // 响应节点的 TermVoteGranted bool // 是否同意投票 }func (rf *Raft) startElection(args *RequestVoteArgs) {// 开始选举时,同时开始选举定时器rf.resetElectionTimer()fmt.Print("开始选举Leader\n")var wg sync.WaitGroupvar mu sync.Mutex// 决定选举是否中断cancelled := falsefor i := 0; i < len(rf.peers); i++ {peer := rf.peers[i]if peer == "localhost:"+rf.id {continue}// 添加一把锁wg.Add(1)go func(peer string, index int) {// 退出时解开一把锁defer wg.Done()// 遇到更新的 Term 时,节点会从 Candidate 退出,此时结束选举if rf.currentRole != Candidate {return}// 调试信息打印fmt.Printf("节点 %s 向 %s 发起投票请求\n", rf.id, peer)// 连接到远程 Raft 节点client, err := rpc.DialHTTP("tcp", peer)if err != nil {log.Println("Dialing error: ", err)rf.mu.Lock()// 如果连接失败则从 Peers 中删除对应 peerrf.peers = append(rf.peers[:index], rf.peers[index+1:]...)rf.mu.Unlock()return}var reply RequestVoteReply// 远程调用节点回应是否投票err = client.Call("Raft.ReplyVote", args, &reply)if err != nil {log.Println("RPC error: ", err)return}// 收到回应后进行收集投票的判断if rf.CollectVotes(&reply) {mu.Lock()// 当收集到过半票数时返回为 true,此时修改 cancelled 为 true 进行退出cancelled = truemu.Unlock()return} else {rf.mu.Lock()// 如果节点变为 Follower 说明已有更新的 Term,此时退出选举if rf.currentRole == Follower {fmt.Println("结束收集投票")rf.mu.Unlock()return} else {// 继续收集投票,此时不一定收集完了投票,所以不需要退出rf.mu.Unlock()}}}(peer, i)}// 等待所有的锁都解开,即收集了所有的投票 // 收集所有投票也是为了避免收集了部分投票,但是余下的节点中存在更新的 Termwg.Wait()rf.mu.Lock()defer rf.mu.Unlock()// 无论是否选举成功都结束选举定时器rf.electionTimer.Stop()if cancelled {fmt.Println("选举成功")for _, follower := range rf.peers {if follower == "localhost:"+rf.id {continue}rf.sentLength[follower] = len(rf.Logs)rf.ackedLength[follower] = 0fmt.Println("replicatedLogs", follower)}} else {fmt.Println("选举失败")rf.currentRole = Follower// 如果选举中发现更新的 Term,则会立刻修改 currentLeader 和 votedFor// 这种情况下是不需要重新初始化当前节点的,否则会有更新的 Term,但是当前节点拒绝投票// 而后当前节点在下一轮又重新进入选举,重复上述过程,导致始终无法出现 Leader,出现活锁if rf.currentLeader == rf.id {rf.currentLeader = "null"}if rf.votedFor == rf.id {rf.votedFor = "null"}rf.voteReceived = []string{}} }
3.4 节点回应投票
在上一小节我们远程调用了 Peers 中节点的 ReplyVote 函数,相当于 Raft 算法中的 Raft-2,从这里我们开始将用到 log 结构体。
// log 结构体 type LogEntry struct {Term int // 日志生成时的 Term号Index int // 日志的序号Command string // 日志的内容 }// Raf-2 func (rf *Raft) ReplyVote(args *RequestVoteArgs, reply *RequestVoteReply) error {rf.mu.Lock() defer rf.mu.Unlock() // 打印消息,表示收到来自其他节点的投票请求。fmt.Printf("节点 %s 收到来自 %s 的投票请求\n", rf.id, args.NodeId)// 确定当前节点最后一条日志的任期号。myLogTerm := 0if len(rf.Logs) > 0 {myLogTerm = rf.Logs[len(rf.Logs)-1].Term}// 检查候选节点的日志是否比当前节点的日志更新。// 这是为了确保候选节点的日志至少与当前节点的日志一样完整。logOK := args.LastLogTerm > myLogTerm || (args.LastLogTerm == myLogTerm && args.LastLogIndex >= len(rf.Logs))// 检查候选节点的任期是否大于当前节点的任期 // 或者如果它们相等,但当前节点没有投票给其他候选人或已经投票给该候选节点。termOK := args.Term > rf.currentTerm || (args.Term == rf.currentTerm && (rf.votedFor == "null" || rf.votedFor == args.NodeId))// 如果日志和任期条件都满足,则授予投票。if termOK && logOK {rf.currentTerm = args.Term // 将当前节点的任期更新为候选节点的任期。rf.currentRole = Follower // 将当前节点的角色更改为跟随者。rf.votedFor = args.NodeId // 记录候选节点的 ID 作为当前节点投票的对象。fmt.Print("接受投票\n")reply.VoteGranted = true // 在回复中设置投票授予标志为 true。} else {fmt.Print("拒绝投票\n")reply.VoteGranted = false // 在回复中设置投票授予标志为 false。}// 将回复的 NodeId 和 Term 设置为当前节点的 ID 和任期。reply.NodeId = rf.idreply.Term = rf.currentTermreturn nil }
3.5 Candidate 收集投票
这部分和 3.3 节不同,3.3 节管理的是选举的过程,本小节则关注于收到 3.4 节的回应后,Candidate 进行统计处理的过程。在 Raft 关键算法中则对应于 Raft-3,需要注意的是在下列函数中,Candidate 有可能会变为 Follower,并修改了 votedFor。这也就是 3.3 节中的 startElection() 函数中,为什么要在选举失败后添加两个 if 语句的原因。
// Raft-3 func (rf *Raft) CollectVotes(reply *RequestVoteReply) bool {rf.mu.Lock() defer rf.mu.Unlock() // 检查当前节点是否仍是候选者,并且收到的投票回复的任期与当前节点的任期一致且授予了投票。if rf.currentRole == Candidate && reply.Term == rf.currentTerm && reply.VoteGranted {// 将投票给当前节点的节点ID添加到已收到的投票列表中。rf.voteReceived = append(rf.voteReceived, reply.NodeId)// 如果收到的投票数量超过总节点数的一半,当前节点将成为领导者。if len(rf.voteReceived) > len(rf.peers)/2 {rf.currentRole = Leader // 将当前节点的角色设置为领导者。rf.currentLeader = rf.id // 设置当前节点为领导者。fmt.Println("已获得超过二分之一票数")return true // 返回 true 表示当前节点已成为领导者。}} else if reply.Term > rf.currentTerm {// 如果收到的投票回复包含比当前节点更高的任期,则当前节点放弃竞选,变为跟随者。fmt.Println("存在更新的 Term")rf.currentTerm = reply.Term // 更新当前节点的任期。rf.currentRole = Follower // 将当前节点的角色更改为跟随者。rf.votedFor = "null" // 清除当前节点的投票记录。return false // 这里的返回是因为选举失败导致} // 这里的返回是因为选举的票数未达到 // 或是在另一个 goroutine 中变为了 Followerreturn false }
3.6 选举定时器
首先说明,在本项目中,这部分难以验证,原因在于无法模拟网络拥塞等情况,即连接上一个 Raft 节点后,选举时间非常快,也无法阻塞一个节点,如果取消一个节点的运行,那么 Candidate 在无法连接上远程节点后,就会从本地 Peers 中删掉对应的 peer。因此本小节仅供参考 Raft-1 算法。
// Raft-1 func (rf *Raft) initRaft() {rf.mu.Lock() defer rf.mu.Unlock()// 增加当前的任期号,表示进入新的选举周期。rf.currentTerm += 1// 将当前节点的角色设置为候选者,以便发起选举。rf.currentRole = Candidate// 将当前节点的 ID 设置为已投票的对象,表示为自己投票。rf.votedFor = rf.id// 重置收到的选票列表,仅包含当前节点自己的一票。rf.voteReceived = []string{rf.id} }// 启动选举超时计时器,并在选举超时的情况下重新初始化 Raft 状态。 func (rf *Raft) electionTimerStart() {// 监听选举计时器的触发信号,每当计时器触发时进入循环。for range rf.electionTimer.C {rf.mu.Lock()// 检查当前节点的角色是否不是候选者,如果不是,解锁互斥锁并跳过此次循环。if rf.currentRole != Candidate {rf.mu.Unlock() continue // 跳过此次循环,等待下一次计时器触发。}// 如果当前节点是候选者且计时器触发,表示选举超时。fmt.Println("选举超时")rf.mu.Unlock() // 重新初始化 Raft 状态,以准备再次发起选举。rf.initRaft()} }// 重置选举超时计时器,使其在随机时间后再次触发。 func (rf *Raft) resetElectionTimer() {rf.mu.Lock() // 锁定互斥锁,以确保对共享状态的线程安全访问。defer rf.mu.Unlock() // 在函数结束时自动解锁互斥锁。// 生成一个随机的超时时间,范围在 5 到 10 秒之间。timeout := time.Duration(rand.Intn(5000)+5000) * time.Millisecond// 停止当前的选举计时器。rf.electionTimer.Stop()// 使用新的超时时间重置选举计时器。rf.electionTimer.Reset(timeout) }
3.7 测试主函数
完成上面的内容后,我们将 3.1 的伪代码进行填充,得到如下主函数,此时启动程序,我们可以基本实现投票选举的过程,对于心跳检测我们将在下一节中进行实现。
func main() {portPtr := flag.String("port", "0", "端口号")centralNodePtr := flag.String("central", "localhost:8040", "中心节点(默认 localhost:8040 )")flag.Parse()if *portPtr == "0" {fmt.Println("请确认端口号")os.Exit(1)}rf := NewRaft(*portPtr)go startRaftServer(rf, *portPtr)client, err := rpc.DialHTTP("tcp", *centralNodePtr)if err != nil {log.Fatalf("Error connecting to central node: %s", err)}args := &AddPeerArgs{NewPeer: "localhost:" + *portPtr}var reply AddPeerReplyerr = client.Call("Raft.RegisterNode", args, &reply)if err != nil {log.Fatalf("RPC error: %s", err)}rf.mu.Lock()rf.peers = reply.Peersfmt.Printf("Node %s initial peers: %v\n", rf.id, rf.peers)rf.mu.Unlock()go rf.electionTimerStart()rf.electionTimer.Stop()Circle:for {if rf.becomeCandidate() {// 成为候选人节点后,向其他节点请求选票进行选举rf.mu.Lock()args := &RequestVoteArgs{Term: rf.currentTerm,NodeId: rf.id,LastLogIndex: len(rf.Logs),LastLogTerm: 0,}if len(rf.Logs) > 0 {args.LastLogTerm = rf.Logs[len(rf.Logs)-1].Term}rf.mu.Unlock()rf.startElection(args)rf.mu.Lock()if rf.currentRole == Leader {fmt.Printf("Node %s has become the leader\n", rf.id)rf.mu.Unlock()break} else {rf.mu.Unlock()}} else {break}}// 进行心跳检测for {// 5秒检测一次time.Sleep(time.Millisecond * 5000)rf.mu.Lock()if rf.currentRole != Leader && rf.lastHeartBeatTime != 0 && (time.Now().UnixMilli()-rf.lastHeartBeatTime) > int64(rf.timeout*1000) {fmt.Printf("心跳检测超时,已超过%d秒\n", rf.timeout)fmt.Println("即将重新开启选举")rf.currentRole = Followerrf.currentLeader = "null"rf.votedFor = "null"rf.voteReceived = []string{}rf.lastHeartBeatTime = 0rf.mu.Unlock()goto Circle} else {rf.mu.Unlock()}} }
四、心跳检测
在第三节最后的测试中我们很明显发现,即时一个节点选举成功了,其他节点由于接收不到心跳消息,所以过一段时间后也会开始重新进行选举。在本节,我们将晚上心跳信息的发送和检测,实现完整的选举成功过程。
4.1 Leader 发送心跳消息
心跳消息的目的在于 Leader 告知 Follower 自身的存在,并更新 Term,同时监听是否出现了更新 Term 的 Leader,保证系统中只存在一个 Leader。显然,告知其他节点并收到回应是需要 Rpc 调用的,故设计函数如下。
// Leader 发送的心跳消息结构体 type HeartBeatArgs struct {Term int // 领导者的任期LeaderId string // 领导者的ID }// Follower 回应心跳消息的结构体 type HeartBeatReply struct {Term int // 跟随者的当前任期Success bool // 心跳回应 }func (rf *Raft) SendHeartBeat() {rf.mu.Lock()fmt.Println(rf.id + " start heartBeat")// 如果当前节点不是领导者,则退出心跳发送// 例如收到了 A1 节点的消息,发现有更新的 Term,于是在给 A2 节点发送消息前变为了 Followerif rf.currentRole != Leader {// 设置心跳时间标志不为0,如果节点经历过主函数中的心跳监听超时,则 lastHeartBeatTime = 0// 而当前 Leader 变为 Follower 之后,可能在未收到心跳消息前,其他的 Leader 就挂掉了// 主函数中限制了 lastHeartBeatTime != 0 才能重新选举,导致本节点始终处于监听心跳状态rf.lastHeartBeatTime = 1 rf.mu.Unlock() // 解锁return}// 由于 Leader 在无法连接到指定 peer 时,会将其从本地 Peers 删除// 因此如果当前节点是集群中唯一的节点,降级为跟随者if len(rf.peers) == 1 {rf.lastHeartBeatTime = 1 // 更新心跳时间标志rf.currentLeader = "null" // 清空当前领导者rf.votedFor = "null" // 清空投票记录rf.currentRole = Follower // 降级为跟随者角色rf.mu.Unlock() // 解锁return}rf.mu.Unlock() // 解锁,准备并行处理心跳发送// 构造心跳请求参数args := &HeartBeatArgs{Term: rf.currentTerm, // 当前任期LeaderId: rf.id, // 领导者ID}// 遍历所有的节点,向它们发送心跳for i := 0; i < len(rf.peers); i++ {peer := rf.peers[i]if peer == "localhost:"+rf.id {// 跳过当前节点本身continue}// 验证 Leader 给谁发送了心跳消息fmt.Printf("%s send heartbeat to %s\n", rf.id, peer)// 使用并行的goroutine来发送心跳,以提高效率go func(peer string, index int) {client, err := rpc.DialHTTP("tcp", peer) // 建立与目标节点的RPC连接if err != nil {log.Println("Dialing error:", err) // 连接错误处理rf.mu.Lock()// 如果连接失败,从节点列表中移除该节点rf.peers = append(rf.peers[:index], rf.peers[index+1:]...)rf.mu.Unlock()return}var reply HeartBeatReply// 发送心跳消息,并等待回复err = client.Call("Raft.ReceiveHeartBeat", args, &reply)if err != nil {log.Println("RPC error:", err) return}rf.mu.Lock()defer rf.mu.Unlock()// 如果收到的回复任期大于当前节点的任期,降级为跟随者if reply.Term > rf.currentTerm {rf.currentTerm = reply.Term // 更新当前任期rf.currentRole = Follower // 降级为跟随者角色rf.votedFor = "null" // 清空投票记录rf.currentLeader = "null" // 清空当前领导者}}(peer, i)} }
4.2 Follower 回应心跳消息
Follower 的回应则相对简单,只需要更新相应信息并把相关信息回应即可。
func (rf *Raft) ReceiveHeartBeat(args *HeartBeatArgs, reply *HeartBeatReply) error {rf.mu.Lock()defer rf.mu.Unlock()// 如果接收到的心跳消息中的任期小于当前节点的任期,拒绝心跳if args.Term < rf.currentTerm {reply.Term = rf.currentTerm // 返回当前节点的任期reply.Success = false // 标记心跳处理失败return nil // 直接返回,不进行进一步处理}// 打印接收到心跳消息的日志信息fmt.Printf("节点 %s 收到来自 %s 的心跳消息\n", rf.id, args.LeaderId)// 更新节点的状态为跟随者,并同步心跳消息中的任期和领导者信息rf.currentTerm = args.Term // 更新当前节点的任期为心跳消息中的任期rf.currentLeader = args.LeaderId // 更新当前领导者为心跳消息中的领导者IDrf.currentRole = Follower // 将当前节点角色设置为跟随者rf.lastHeartBeatTime = time.Now().UnixMilli() // 记录接收到心跳的时间戳// 设置回复消息的内容,表示心跳消息处理成功reply.Term = rf.currentTerm // 返回当前节点的任期reply.Success = true // 标记心跳处理成功return nil }
4.3 心跳定时器
对于 Leader 而言,需要定时发送心跳消息,为此设置一个定时器,每间隔一段时间,检测该节点是否为 Leader,如果是则开始发送心跳消息,需要注意的是此处的判断间隔需要小于之前在主函数中设置的时间,完成函数后,在主函数创建节点后,将心跳定时器启动为 goroutine 即可。
// 心跳定时器 func (rf *Raft) heartBeatTimerStart() {// 通过循环不断地监听心跳定时器for range rf.heartBeatTimer.C {rf.mu.Lock() // 如果当前节点的角色不是领导者,则跳过本次心跳发送if rf.currentRole != Leader {rf.mu.Unlock() // 解锁continue // 跳过当前循环,继续监听下一次定时器触发}rf.mu.Unlock() // 如果是领导者,解锁并继续进行心跳发送rf.SendHeartBeat() // 调用发送心跳消息的函数} }
4.4 测试结果
在主函数中 go rf.heartBeatTimerStart() 放在 go rf.electionTimerStart() 之上即可,此外主函数无其他改动,在此给出测试结果图。
五、创建日志
5.1 监听 Http
在 2.2 节中,我们提到在注册服务时,我们还注册了 Http 的监听。因此我们在 Raft 结构体中再添加一个 http.serve 元素,于是对于日志消息,我们可以通过 Http 发送一个字符串模拟日志的命令。同样需要注意的是,Http 的监听只能注册一次,因此设置一个标记符号用于标记,避免重复注册。
// getRequest 处理 HTTP 请求的回调函数 func (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {// 解析请求的表单数据request.ParseForm()// 检查请求的 URL 是否包含 "message" 参数,并且当前有领导者节点// 例如,http://localhost:8080/req?message=ohmygodif len(request.Form["message"]) > 0 && rf.currentLeader != "null" {message := request.Form["message"][0] // 提取消息内容m := LogEntry{Command: message, // 创建一个日志条目}// 添加到本地日志rf.Logs = append(rf.Logs, m)fmt.Println("http监听到了消息")writer.Write([]byte("ok!!!")) // 向客户端返回确认消息} }// HandleFuncFlag 确保 http.HandleFunc 只被调用一次的标志变量 var HandleFuncFlag = true// httpListen 启动 HTTP 服务器监听指定端口 func (rf *Raft) httpListen() {// 创建一个 http.Server 实例,监听端口 8080rf.server = &http.Server{Addr: ":8080", // 服务器监听的地址和端口Handler: nil, // 使用默认的 http.DefaultServeMux 处理请求}// 如果 HandleFuncFlag 为 true,设置处理函数if HandleFuncFlag {HandleFuncFlag = false // 设置标志,确保此代码块只执行一次http.HandleFunc("/req", rf.getRequest) // 注册路径 "/req" 的处理函数}// 启动 HTTP 服务器,使用 goroutine 以非阻塞方式运行go func() {fmt.Println("监听8080")if err := rf.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {fmt.Println("Server error:", err) // 如果服务器出错且不是正常关闭,记录错误}}() }
5.2 取消监听
我们知道日志是由 Leader 进行广播复制的,因此日志只能由 Leader 产生,所以我们针对上面的 http 监听,要选在节点成为 Leader 后开始,并且当节点从 Leader 退出后,取消监听。需要注意的是,取消监听后服务依旧存在,因此上一小节中需要设置一个标志符号避免重复注册服务。
// stopListening 停止 HTTP 服务器的监听 func (rf *Raft) stopListening() {// 创建一个带有 1 秒超时的上下文,用于控制服务器的关闭操作,避免被阻塞ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)defer cancel() // 在函数退出时自动取消上下文// 尝试关闭服务器,传入上下文控制关闭的超时时间if err := rf.server.Shutdown(ctx); err != nil {// 如果关闭过程中发生错误,输出错误信息fmt.Println("Shutdown error:", err)} else {// 如果关闭成功,输出提示信息fmt.Println("监听已停止")} }
5.3 测试结果
我们提到 http 监听,要选在节点成为 Leader 后开始,并且当节点从 Leader 退出后,取消监听。所以我们启动监听应该设置在 starElection() 函数中选举成功后。而一个节点如果要从 Leader 状态退出,那么只能在收到心跳消息的回应时发现存在更新的 Term 才会退出,所以我们要在此设置取消监听。
完成上述内容后,启动程序,当我们在浏览器输入“http://localhost:8080/req?message=第一条日志”时,可以看到当前 Leader 的日志多出一条 “第一条日志”,但是 Follower 并未进行监听。结束进程后,新的 Leader 将重新开始监听,结果如下。
图6
六、日志复制
这部分内容,我们将实现完整的日志复制,包括 Leader 收到新消息后进行广播,新节点加入后如何复制日志等,这部分的难点在于 Raft-5、Raft-6、Raft-8 之间的循环调用。
6.1 Leader 广播新日志
在上一大节中,我们已经完成了 Leader 监听 http 获取新日志,但是只保存到了本地,因此我们需要使用 Rpc 调用其他 Follower 也能及时更新本地日志,这里需要将原本 getRequest() 函数中本地日志添加的代码换成以 goroutine 形式调用下列函数。
// Raft-4 func (rf *Raft) Boradcast(newLog LogEntry) {rf.mu.Lock()// 如果当前节点是领导者if rf.currentRole == Leader {// 设置日志条目的任期和索引,并将其添加到日志中newLog.Term = rf.currentTermnewLog.Index = len(rf.Logs) + 1rf.Logs = append(rf.Logs, newLog)rf.mu.Unlock()// 异步地将日志复制给其他所有节点for i := 0; i < len(rf.peers); i++ {peer := rf.peers[i]// 跳过自己节点的复制if peer == "localhost:"+rf.id {continue}// 启动协程复制日志到其他节点go rf.Replicating(peer)}} else {rf.mu.Unlock()} }
6.2 Leader 广播日志复制命令
这一小节为 Raft-5,是由 Leader 广播日志日志的命令,通过询问 Follower,逐步确定每个 Follower 需要复制的日志,这部分内容关键点在于修改 sentLength 来确定日志复制的节点。同时为了后续日志复制完成后的提交,在 Raft 结构体中再添加一个 CommitLength 元素。
// Leader 发起日志复制命令 type LogRequestArgs struct {LeaderId string // Leader 的 IdCommitLength int // Leader 已经提交的日志Term int // Leader 当前 Term 号LogLength int // 日志长度LogTerm int // 日志复制点的 TermEntries []LogEntry // 日志列表 }// Follower 回应命令复制 type LogReplyArgs struct {NodeId string // Follower 的 IdCureentTerm int // Foller 当前的 TermAck int // 接收复制后的日志长度Flag bool // 是否接收复制 }// Raft-5 func (rf *Raft) Replicating(peer string) error {// 获取已发送日志的索引i := rf.sentLength[peer]// 获取当前日志的末尾索引ei := len(rf.Logs) - 1prevLogTerm := 0// 选择需要复制的日志条目var entries []LogEntryif ei >= i {entries = rf.Logs[i : ei+1]} else {entries = []LogEntry{}}// 如果不是第一条日志,则获取前一条日志的任期if i > 0 {prevLogTerm = rf.Logs[i-1].Term}// 与指定节点建立 RPC 连接client, err := rpc.DialHTTP("tcp", peer)if err != nil {log.Println("Dialing error: ", err)return nil}// 准备 RPC 请求的参数var reply LogReplyArgsargs := &LogRequestArgs{LeaderId: "localhost:" + rf.id,CommitLength: rf.CommitLength,Term: rf.currentTerm,LogLength: i,LogTerm: prevLogTerm,Entries: entries,}// 通过 RPC 发送日志条目给指定节点err = client.Call("Raft.Replying", args, &reply)if err != nil {log.Println("RPC error: ", err)return nil}return nil }
6.3 Follower 回应日志复制
在日志复制中,关键点在于如何找到日志复制的切入点,通过在 Follower 从后向前逐个检测中,找到最合适的日志切入点开始复制。如果找到了切入点,则同时开启 Follower 的日志复制,即 AppendEntries() 函数。
// Raft-6 func (rf *Raft) Replying(args *LogRequestArgs, reply *LogReplyArgs) error {// 锁定 Raft 实例以防止并发修改rf.mu.Lock()defer rf.mu.Unlock()// 如果请求中的任期比当前节点的任期更高,更新任期并将当前角色设为 Followerif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = "null"rf.currentRole = Followerrf.currentLeader = args.LeaderId}// 如果任期相同且当前节点角色为 Candidate,更新角色为 Followerif args.Term == rf.currentTerm && rf.currentRole == Candidate {rf.currentRole = Followerrf.currentLeader = args.LeaderId}// 验证日志条目logOk := (len(rf.Logs) >= args.LogLength) && (args.LogLength == 0 || args.LogTerm == rf.Logs[args.LogLength-1].Term)// 准备响应reply.NodeId = "localhost:" + rf.idreply.CureentTerm = rf.currentTerm // 注意:此处 `CureentTerm` 可能有拼写错误,建议修正为 `CurrentTerm`reply.Ack = 0reply.Flag = false// 如果任期相同且日志条目有效,追加日志并设置确认号和标志if args.Term == rf.currentTerm && logOk {rf.AppendEntries(args.LogLength, args.CommitLength, &args.Entries)ack := args.LogLength + len(args.Entries)reply.Ack = ackreply.Flag = true}// 远程调用 Leader 节点的 Raft-8 对回应进行处理client, err := rpc.DialHTTP("tcp", args.LeaderId)if err != nil {log.Println("Dialing error: ", err)return nil}flag := false // 并无实际作用,仅作为参数占位err = client.Call("Raft.ReceivingAck", &reply, &flag)if err != nil {log.Println("RPC error: ", err)return nil}return nil }
6.4 Leader 更改日志复制命令
在 Leader 收到 Follower 的回应后,要根据回应对日志复制的命令做出一定修改,逐步找到每个 Follower 日志复制的切入点。在这一步中,如果 Follower 没有开始日志复制,Leader 则将日志复制的切入点提前一位,然后再次调用 Replicating 尝试进行复制;反之,则将 Follower 节点的复制切入点记录,并开始提交日志进程。
// Raft 8 func (rf *Raft) ReceivingAck(reply *LogReplyArgs, flag *bool) error {rf.mu.Lock()defer rf.mu.Unlock()// 检查响应的任期是否与当前节点的任期匹配,并且节点角色是 Leaderif reply.CureentTerm == rf.currentTerm && rf.currentRole == Leader {// 如果响应有效且确认号大于或等于当前确认长度,则更新发送和确认长度if reply.Flag && reply.Ack >= rf.ackedLength[reply.NodeId] {rf.sentLength[reply.NodeId] = reply.Ackrf.ackedLength[reply.NodeId] = reply.Ackfmt.Println("开始提交日志")// go rf.CommitEntries() // 提交日志的操作可以在这里被调用} else if rf.sentLength[reply.NodeId] > 0 {// 如果响应无效或确认号不足,减少发送长度并重新尝试复制日志rf.sentLength[reply.NodeId] = rf.sentLength[reply.NodeId] - 1go rf.Replicating(reply.NodeId)}} else if reply.CureentTerm > rf.currentTerm {// 如果响应中的任期大于当前任期,更新节点状态为 Followerrf.currentTerm = reply.CureentTermrf.currentRole = Followerrf.votedFor = "null"}return nil }
6.5 Follower 复制日志
在 6.3 节中,如果节点接受了日志复制命令,则将会从切入点后,丢弃之前的无效日志,并复制当前 Leader 的日志条目,这一步也对应于 Raft-7 算法。
// Raft-7 func (rf *Raft) AppendEntries(logLength int, leaderCommit int, entries *[]LogEntry) {// 检查是否需要截断现有日志条目if len(*entries) > 0 && len(rf.Logs) > logLength {// 如果现有日志的条目与新条目的条目不匹配,则截断现有日志if rf.Logs[logLength].Term != (*entries)[0].Term {rf.Logs = rf.Logs[:logLength]}}// 将新的日志条目追加到现有日志中if logLength+len(*entries) > len(rf.Logs) {startIndex := logLengthrf.Logs = append(rf.Logs, (*entries)[startIndex:]...)}// 更新提交日志的长度if leaderCommit > rf.CommitLength {// 打印正在提交的日志条目索引for i := rf.CommitLength; i < leaderCommit; i++ {fmt.Println("Follower Commit Log ", i)}// 更新提交长度rf.CommitLength = leaderCommit} }
6.6 测试结果
在这里我们已经完成了通常情况下的日志复制,但是对于新加入群组的节点我们还没有进行处理,因此新节点还无法获取到日志。为此我们需要在 2.3 节的 RegisterNode 添加判断,如果新节点选择的中心节点为 Leader,则在 RegisterNode 函数中对新节点进行日志复制;反之,则由中心节点广播到已有节点中进行查询,即在 AddPeer 函数中检测进行添加 Peer 操作的节点是否是 Leader,若是则对新节点进行日志复制。
为此我们需要在 2.3 节的 RegisterNode 添加判断,如果新节点选择的中心节点为 Leader,则在 RegisterNode 函数中对新节点进行日志复制。反之,则由中心节点广播到已有节点中进行查询,即在 AddPeer 函数中检测进行添加 Peer 操作的节点是否是 Leader,若是,则对新节点进行日志复制。日志复制的过程则是直接调用 Replicating 函数。
在这个过程中我们可以发现,之前 RegisterNode 中,检测到新节点存在就直接返回的操作是不可行的。举例来说,假设现在有 A、B、C 三个节点,其中 A 是 Leader,由 A 向其他两个节点发送心跳消息。此时 A 挂掉,B 和 C 监听到没有心跳消息之后开始重新选举,并且 B 成为了新的 Leader,此时 B 发现无法连接上 A 节点,于是将其从本地 peers 中删除,但是 C 节点并对 A 节点连接,因此 C 的 peers 中还保留了 A 的端口号。之后 A 节点选择 C 节点作为中心节点,C 节点发现 A 节点存在后,在 RegisterNode 函数直接返回,因此 B 节点无法得知 A 节点的信息,A 节点也无法加入群组。为了解决这个问题,在 RegisterNode 函数中,应该修改为本地 peers 中如果存在新节点的端口号,则本地不添加,但是仍然进行广播告知其他节点。
此外,假设现有 A、B 节点正在进行选举阶段,并且在此之前两个节点中有日志消息存在。此时 C 节点尝试加入群组,并且成功了,但是由于选举过程中没有 Leader,导致 C 节点并未收到日志复制。所以为了解决此问题,需要在每个节点成功当选 Leader 之后,对 peers 中的每个节点进行日志复制,一方面可以更新日志,丢弃无效日志,另一方面可以避免选举过程中节点加入未进行日志复制的情况。
完成上述内容后,系统已经基本完整,对于投票和日志复制功能基本完善,部分测试结果如下。
七、日志提交
日志的提交,本项目通过打印参数来进行代替,仅供参考。
在 6.4 节的 ReceivingAck 函数中,当 Leader 收到的回应是接受日志复制时,我们将开始进行日志的提交判定。当一个日志被超过半数 Follower 复制的时候,即可提交。
值得一提的是,在我根据所学内容探究时,发现 Follower 是在 Leader 监听到新日志时,检测 Leader 的日志提交数量,但是 Leader 是在 Follower 接受日志复制后才进行提交,这意味着 Leader 的 CommitLength 会比 Follower 的大一个。但是当有新节点加入时,由于此时对新节点的日志复制不是在监听到新日志后,所以 Leader 的 CommitLength 不会发生改变,此时新节点和 Leader 的 CommitLength 就是相同的。
// Acks 计算并返回已经确认了指定日志长度 (lengths) 的节点数量。 // 如果一个节点的日志长度等于或超过指定的 lengths,或者它已经确认了指定的日志长度,则计入返回值。 func (rf *Raft) Acks(lengths int) int {rf.mu.Lock() // 加锁以确保并发安全defer rf.mu.Unlock() // 函数退出时自动解锁ret := 0 // 记录满足条件的节点数量for _, peer := range rf.peers { // 遍历所有节点// 如果是当前节点且日志长度大于或等于指定长度,或其他节点已经确认了该长度if (peer == "localhost:"+rf.id && len(rf.Logs) >= lengths) || rf.ackedLength[peer] >= lengths {ret += 1 // 满足条件的节点数量加1}}return ret // 返回满足条件的节点数量 }// Raft-9 func (rf *Raft) CommitEntries() {minAcks := len(rf.peers)/2 + 1 // 多数节点数量,即需要多少节点确认日志才能提交ready := []int{} // 用于记录可以提交的日志索引// 检查从第1条到当前最后一条日志,判断哪些日志已经被大多数节点确认for i := 1; i <= len(rf.Logs); i++ {if rf.Acks(i) >= minAcks { // 如果确认的节点数量大于或等于多数节点ready = append(ready, i) // 将该日志索引添加到 ready 列表中}}// 找到最大可提交的日志索引maxReady := max(ready)// 如果有可提交的日志,并且它的索引大于当前的提交索引,并且该日志是在当前任期产生的if len(ready) > 0 && maxReady > rf.CommitLength && rf.Logs[maxReady-1].Term == rf.currentTerm {for i := rf.CommitLength; i < maxReady; i++ { // 提交所有可提交的日志项fmt.Println("Leader Commit Log ", i) // 打印日志提交的索引}rf.CommitLength = maxReady // 更新提交的日志索引fmt.Println(rf.id, rf.CommitLength) // 打印当前节点 ID 和新的提交索引} }// max 返回一个整数数组中的最大值。 // 如果数组为空,则返回 0。 func max(arr []int) int {if len(arr) == 0 {return 0 // 如果数组为空,返回 0}maxVal := arr[0] // 假定第一个元素为最大值for _, val := range arr { // 遍历数组if val > maxVal { // 如果发现更大的值maxVal = val // 更新最大值}}return maxVal // 返回数组中的最大值 }
部分测试结果如下。
八、小结
在本次实现 Go-Raft 协议过程中,本人其实收获了很多东西。
首先是不能盲目在网上找资料,资料都是参差不齐的,特别是学习到现在一些协议的实现,能真正有效借鉴的文章已经比较难找到了。个人在前期从 GitHub 上到处翻找有小半个月有余,但是最后自己从头实现起来也就一周不到的时间。个人认为,如果已经知道了想完成的代码的原理,不妨一点点尝试钻研,遇到不会的去搜索如何实现某个功能,这样反而能更快的完成项目。
此外,对于项目的复盘和测试是非常重要的,在写这篇博客前,本人已经完成了一遍实现,当时粗略测试下来没有什么问题,但是实际写博客过程中,为了截图我重头实现了一遍,这下就发现了很多问题。特别是对于一些类似于选举期间无 Leader 的情况,或是 Leader 短时间挂掉再恢复的情况,或多或少都出现了一些 Bug,甚至有一些功能在第一、二、三、四部分都完成得不错,但是到了第五部分才忽然发现问题。但是为了避免整体逻辑的问题,并未做出修改,不过在 GitHub 和 Gitee 上对每个大节的测试代码均提交到了不同的文件夹,可以自行按照需求提取。
如果在拉取代码后发现了部分运行错误,也许可以尝试一下用下个部分的代码,也欢迎各位提问,如若看到必定尽快回答。
这篇关于利用Go语言模拟实现Raft协议的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!