本文主要是介绍【go-libp2p源码剖析】DHT Kademlia 迭代查询,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 简介
- 总体流程
- 主要结构体
- 主要函数解析说明
- runLookupWithFollowup
- runQuery
- run
- recordPeerIsValuable
- constructLookupResult
- spawnQuery
- queryPeer
- dialPeer
- updateState
- isReadyToTerminate
- isLookupTermination
- isStarvationTermination
- GetClosestNInStates
简介
query是整个dht的核心,这里我们称之为迭代查询。dht routing中几乎所有方法都有调用它,如FindPeer、FindProviders、GetValue、PutValue、Provide,因此理解query是理解整个dht的关键。
总体流程
- 首先根据key值,从本地路由表中获取最近的k(默认20)个节点作为种子peer。
- 再从种子peer截取alpha(默认10)个peer,这alpha个peer我们称之为查询peer。
- 每个查询peer启动一个task,发起rpc查询请求,发起rpc查询请求前会先拨号。每个task执行有慢有些快,最后会等待所有task都执行完。就算把这些查询peer都查完可能也不满足循环退出条件(举个栗子,这10个peer可能有8个离线,1个rpc查询失败,只有1个查询成功,那么这个时候需要根据这个查询成功的peer去迭代查询)。
- 查询peer在某些类型(如GET_VALUE, GET_PROVIDERS, FIND_NODE)rpc请求中会把离它最近的peer作为响应消息发回(也可以通过GetClosestPeers获取最近的节点),这些离查询peer最近的peer这我们称之为新peer。
- 一个peer可能有多个地址,新peer可能已经在本地peerstore中,但是本地peerstore的peer地址和上一步查询到的新peer的地址可能不一样(本地可能不是最新的),合并它们将这些已知的地址都加入本地peerstore(下次迭代查询发起rpc查询前会先先拨号,拨号需要地址)
- 将这些新的peer发回到chan,这时新peer转换成查询peer,准备第二次查询。最外面的循环里首先会对查询peer的状态进行更新,再对状态为PeerHeard的新peer继续启动的task,发起rpc查询请求。依次反复直到满足退出条件,整个迭代查询结束。
- 将排序后的结果返回
迭代查询退出条件:
stopFn 调用者通过闭包传入,如GetClosestPeers始终返回false
isLookupTermination 查到了beta(默认3)个peer
isStarvationTermination 没有peer可查
时序图如下:
主要结构体
- query代表一个DHT查询
- QueryPeerset维护Kademlia异步查找的状态。查找状态是一组peer,每个peer都标记有一个peer状态(queryPeerState)。
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
type stopFn func() booltype query struct {// 每个查询的唯一标识id uuid.UUID// 要查找的目标keykey string// 查询上下文ctx context.Contextdht *IpfsDHT// 查询设定种子peerseedPeers []peer.ID//查询耗费的时间(成功的查询)peerTimes map[peer.ID]time.Duration// 查询已知的一组peer及其各自的状态。queryPeers *qpeerset.QueryPeerset// 如果查询终止了,会将terminated置为trueterminated bool// waitGroup确保在所有查询goroutine完成之前查找不会结束。waitGroup sync.WaitGroup// 将用于查询单个peer的函数queryFn queryFn// 用于确定是否应停止查询stopFn stopFn
}type lookupWithFollowupResult struct {peers []peer.ID // the top K not unreachable peers at the end of the querystate []qpeerset.PeerState // the peer states at the end of the query// indicates that neither the lookup nor the followup has been prematurely terminated by an external condition such// as context cancellation or the stop function being called.completed bool
}type QueryPeerset struct {// 正在搜索的keykey ks.Key// 所有已知的peersall []queryPeerState// 如果所有peer已排序,则sorted为true sorted bool
}type queryPeerState struct {id peer.ID//距referredBy的距离,用于排序distance *big.Intstate PeerStatereferredBy peer.ID
}
//查询结果(传递到chan),每个查询都有一个结果,会调用updateState更新
type queryUpdate struct {cause peer.IDqueried []peer.IDheard []peer.IDunreachable []peer.IDqueryDuration time.Duration
}
主要函数解析说明
runLookupWithFollowup
runLookupWithFollowup是整个迭代查询的入口。
- 调用runQuery启动迭代查询任务
- 从runQuery返回结果中将状态为PeerHeard、PeerWaiting的peer筛选出来。可能经过了几轮迭代查询后迭代退出条件已经满足,但已经收到了新的peer,还没来得及调用spawnQuery发起任务此时就会存在PeerHeard状态的peer。
这里的过滤PeerWaiting的peer应该是多余的。可能已经启动了spawnQuery还没查询完,但退出迭代条件已经满足。举个栗子:10个query,有3个query已经查询成功,但其他的7个query还没查询完,这时调用了terminate取消了context,那么这7个PeerWaiting的peer状态就为变为PeerUnreachable或PeerQueried。执行runQuery后不会再存在PeerWaiting状态的peer,因为run中有执行waitGroup.Wait方法会等待所有查询结果,查询要么成功要么失败,就算取消context整个queryPeer方法也会照样执行(始终会将queryUpdate消息发回chan)。
- 如果没有状态为PeerHeard、PeerWaiting的peer,则说明查询已经结束。
- 如果ctx出错或stopFn条件满足也说明查询结束
- 对状态为PeerHeard、PeerWaiting的peer再做一次查询(启动协程,调用queryFn),收尾工作,前面做到一半的工作不能不做完。
- 从chan doneCh查询结果,有几个peer接收几次
- 如果stopFn满足条件或ctx完成则退出doneCh循环
- 如果completed仍为false,则将chan doneCh的消息取完(阻塞等待查询完成)
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {// run the querylookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn)if err != nil {return nil, err}queryPeers := make([]peer.ID, 0, len(lookupRes.peers))for i, p := range lookupRes.peers {if state := lookupRes.state[i]; state == qpeerset.PeerHeard || state == qpeerset.PeerWaiting {queryPeers = append(queryPeers, p)}}if len(queryPeers) == 0 {return lookupRes, nil}// return if the lookup has been externally stoppedif ctx.Err() != nil || stopFn() {lookupRes.completed = falsereturn lookupRes, nil}doneCh := make(chan struct{}, len(queryPeers))followUpCtx, cancelFollowUp := context.WithCancel(ctx)defer cancelFollowUp()for _, p := range queryPeers {qp := pgo func() {_, _ = queryFn(followUpCtx, qp)doneCh <- struct{}{}}()}// wait for all queries to complete before returning, aborting ongoing queries if we've been externally stoppedfollowupsCompleted := 0
processFollowUp:for i := 0; i < len(queryPeers); i++ {select {case <-doneCh:followupsCompleted++if stopFn() {cancelFollowUp()if i < len(queryPeers)-1 {lookupRes.completed = false}break processFollowUp}case <-ctx.Done():lookupRes.completed = falsecancelFollowUp()break processFollowUp}}if !lookupRes.completed {for i := followupsCompleted; i < len(queryPeers); i++ {<-doneCh}}return lookupRes, nil
}
runQuery
- 调用dht.routingTable.NearestPeers获取key最近的20个peer作为种子peer(也就是从本地路由表获取最近的peer)
- 根据key和20个seedpeer构建query
- 调用query.run 等待结果 (用waitGroup等待所有查询完成)
- 更新最有价值的peer
- 构造查询结果并返回
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {// pick the K closest peers to the key in our Routing table.targetKadID := kb.ConvertKey(target)seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)if len(seedPeers) == 0 {......return nil, kb.ErrLookupFailure}q := &query{id: uuid.New(),key: target,ctx: ctx,dht: dht,queryPeers: qpeerset.NewQueryPeerset(target),seedPeers: seedPeers,peerTimes: make(map[peer.ID]time.Duration),terminated: false,queryFn: queryFn,stopFn: stopFn,}// run the queryq.run()if ctx.Err() == nil {q.recordValuablePeers()}res := q.constructLookupResult(targetKadID)return res, nil
}
run
- 启动loop循环前,将20个seedpeer放进了queryUpdate的heard集合中 。
- 启动loop,监听queryUpdate,发现有更新消息,调用updateState更新peer状态
- 进入loop后,首先进入case update分支,将seedpeer加入queryPeers集合(QueryPeerset)中 ,此时seedpeer的状态还是heard。
- 紧接着计算启动的query任务数量:maxNumQueriesToSpawn=alpha - q.queryPeers.NumWaiting(),alpha默认为10,第一次循环进来waiting数量为0,maxNumQueriesToSpawn的值此时为10。
- 调用isReadyToTerminate检查查询是否需要终止、生成新的peer集合。依次判断stopFn/isStarvationTermination/isLookupTermination条件是否满足,如果满足则直接退出isReadyToTerminate,如果不满足退出条件则根据传入的maxNumQueriesToSpawn值,从queryPeers集合中取出状态为PeerHeard的节点 (第一次循环进来queryPeers里有20个seedpeer,那么这里只截取了前10个)。
- 根据isReadyToTerminate的返回结果决定是否需要需要调用terminate方法终止迭代查询。如果ready为true,则并退出run方法(唯一的退出run出口),如果没有终止,则循环qPeers调用spawnQuery发起查询(qPeers是上一步从queryPeers集合中截取的若干条记录) 。
- waitGroup等待所有spawnQuery任务完成 。
func (q *query) run() {pathCtx, cancelPath := context.WithCancel(q.ctx)defer cancelPath()alpha := q.dht.alphach := make(chan *queryUpdate, alpha)ch <- &queryUpdate{cause: q.dht.self, heard: q.seedPeers}// return only once all outstanding queries have completed.defer q.waitGroup.Wait()for {var cause peer.IDselect {case update := <-ch:q.updateState(pathCtx, update)cause = update.causecase <-pathCtx.Done():q.terminate(pathCtx, cancelPath, LookupCancelled)}// calculate the maximum number of queries we could be spawning.// Note: NumWaiting will be updated in spawnQuerymaxNumQueriesToSpawn := alpha - q.queryPeers.NumWaiting()// termination is triggered on end-of-lookup conditions or starvation of unused peers// it also returns the peers we should query next for a maximum of `maxNumQueriesToSpawn` peers.ready, reason, qPeers := q.isReadyToTerminate(pathCtx, maxNumQueriesToSpawn)if ready {q.terminate(pathCtx, cancelPath, reason)}if q.terminated {return}// try spawning the queries, if there are no available peers to query then we won't spawn themfor _, p := range qPeers {q.spawnQuery(pathCtx, cause, p, ch)}}
}
recordPeerIsValuable
如果没出错,则调用recordValuablePeers记录最有价值的peer :
- 对种子节点peerTimes做一个排序,获取到最小的查询花费时间,将这个设置为MVP时间
- 如果所有seedpeer的peerTimes时间<MVP时间*2,则认为这个节点标记为有价值的(即更新路由表中该节点的LastUsefulAt字段)
虽然只能计算peer之间的逻辑距离,但这个机制也能优化节点之间的查询性能。k桶中查询延迟小的peer,LastUsefulAt时间较新。
func (q *query) recordPeerIsValuable(p peer.ID) {if !q.dht.routingTable.UpdateLastUsefulAt(p, time.Now()) {// not in routing tablereturn}
}func (q *query) recordValuablePeers() {mvpDuration := time.Duration(math.MaxInt64)for _, p := range q.seedPeers {if queryTime, ok := q.peerTimes[p]; ok && queryTime < mvpDuration {mvpDuration = queryTime}}for _, p := range q.seedPeers {if queryTime, ok := q.peerTimes[p]; ok && queryTime < mvpDuration*2 {q.recordPeerIsValuable(p)}}
}
constructLookupResult
- 设置completed为true,如果isLookupTermination、isStarvationTermination都返回false,则置completed为false
- 通过queryPeers.GetClosestNInStates获取20个peer,它们的状态可能是PeerHeard、PeerWaiting、PeerQueried
- 调用kb.SortClosestPeers排序。这里貌似是多余的上面一步不是已经排序了?
- 返回lookupWithFollowupResult,里面的peers、state字段是个数组,各个peer的状态根据数组下标从state里获取
func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult {// determine if the query terminated earlycompleted := trueif !(q.isLookupTermination() || q.isStarvationTermination()) {completed = false}// extract the top K not unreachable peersvar peers []peer.IDpeerState := make(map[peer.ID]qpeerset.PeerState)qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)for _, p := range qp {state := q.queryPeers.GetState(p)peerState[p] = statepeers = append(peers, p)}// 下面四行代码感觉是多余。qp总数就是20,再截取20。GetClosestNInStates已经对peer排序了下面又排序。sortedPeers := kb.SortClosestPeers(peers, target)if len(sortedPeers) > q.dht.bucketSize {sortedPeers = sortedPeers[:q.dht.bucketSize]}res := &lookupWithFollowupResult{peers: sortedPeers,state: make([]qpeerset.PeerState, len(sortedPeers)),completed: completed,}for i, p := range sortedPeers {res.state[i] = peerState[p]}return res
}
spawnQuery
- 将被查询的peer状态设置为PeerWaiting
- waitGroup计数加1
- 启动一个协程调用queryPeer
func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID, ch chan<- *queryUpdate) {......q.queryPeers.SetState(queryPeer, qpeerset.PeerWaiting)q.waitGroup.Add(1)go q.queryPeer(ctx, ch, queryPeer)
}
queryPeer
-
记录一个查询开始时间startQuery 。
-
调用dht.dialPeer对该peer拨号,如果拨号失败则将该peer从路由表移除并发送一个queryUpdate消息,将该peerid填入queryUpdate的unreachable集合,这个peer的状态将由PeerWaiting变为PeerUnreachable 。
-
如果拨号成功,再调用queryFn(如果是GetClosestPeers,则实际调用的是dht.findPeerSingle),发送rpc查询请求到该peer,如果查询失败则和上一步一样首先将peer从路由表移除再将该peer状态改为PeerUnreachable 。
-
如果执行queryFn成功,则计算该查询花费的时间queryDuration(queryDuration计算mvp有用到),再调用dht.peerFound将该peer加入到路由表。
-
queryFn成功后,会返回newPeers,通过查询本地peerstore获取这些新的peer 当前addr信息,本地的peerstore存储的addr信息可能不是最新的,地址可能变化(如新增了地址),需要将获取的最新addrs信息重新加入到peerstore。最后调用dht.queryPeerFilter对这些新peer做一次过滤(默认queryPeerFilter为空总返回true)。如果新peer尚未连接到本节点,则将它们的addrs加入到AddrBook。下次迭代拨号时会使用这些地址 (这里地址不会重复,peerstore.AddAddrs有去重机制)。
-
将符合过滤条件的新的peer加入到saw集合中,构造一个新的queryUpdate消息,将saw添加到queryUpdate的heard集合中,同时带上queryDuration。这时会重新进入query.run的loop循环 。
-
调用waitGroup.Done(),run结束时会等待,也就是spawnQuery都执行完后,run才会退出。
func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID) {defer q.waitGroup.Done()dialCtx, queryCtx := ctx, ctxstartQuery := time.Now()// dial the peerif err := q.dht.dialPeer(dialCtx, p); err != nil {// remove the peer if there was a dial failure..but not because of a context cancellationif dialCtx.Err() == nil {q.dht.peerStoppedDHT(q.dht.ctx, p)}ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}}return}// send query RPC to the remote peernewPeers, err := q.queryFn(queryCtx, p)if err != nil {if queryCtx.Err() == nil {q.dht.peerStoppedDHT(q.dht.ctx, p)}ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}}return}queryDuration := time.Since(startQuery)// query successful, try to add to RTq.dht.peerFound(q.dht.ctx, p, true)// process new peerssaw := []peer.ID{}for _, next := range newPeers {if next.ID == q.dht.self { // don't add self.logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)continue}// add any other know addresses for the candidate peer.curInfo := q.dht.peerstore.PeerInfo(next.ID)next.Addrs = append(next.Addrs, curInfo.Addrs...)// add their addresses to the dialer's peerstoreif q.dht.queryPeerFilter(q.dht, *next) {q.dht.maybeAddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)saw = append(saw, next.ID)}}ch <- &queryUpdate{cause: p, heard: saw, queried: []peer.ID{p}, queryDuration: queryDuration}
}
dialPeer
- 如果peer已经连接到本节点直接退出
- 否则调用host.Connect发起连接(拨号)请求
func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {// short-circuit if we're already connected.if dht.host.Network().Connectedness(p) == network.Connected {return nil}......pi := peer.AddrInfo{ID: p}if err := dht.host.Connect(ctx, pi); err != nil {......return err}logger.Debugf("connected. dial success.")return nil
}
updateState
任务初始化时状态为PeerHeard,启动协程查询时设置为PeerWaiting,再根据每个协程执行结果将peer状态设置为PeerUnreachable或PeerQueried。可能的状态转化:PeerHeard->PeerWaiting->PeerUnreachable|PeerQueried。
- 如果queryUpdate状态为heard,则调用query.queryPeers.TryAdd方法尝试将peer加入query的queryPeers集合中,peer此时的初始状态为PeerHeard。 TryAdd不会将重复的值加入。
- 如果如果queryUpdate状态为queried,只有当peer的状态为PeerWaiting才更新为PeerQueried,并更新peer的peerTimes为queryDuration;
- 如果queryUpdate状态为unreachable,只有当peer的状态为PeerWaiting才更新为PeerUnreachable
func (q *query) updateState(ctx context.Context, up *queryUpdate) {if q.terminated {panic("update should not be invoked after the logical lookup termination")}......for _, p := range up.heard {if p == q.dht.self { // don't add self.continue}q.queryPeers.TryAdd(p, up.cause)}for _, p := range up.queried {if p == q.dht.self { // don't add self.continue}if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting {q.queryPeers.SetState(p, qpeerset.PeerQueried)q.peerTimes[p] = up.queryDuration} else {panic(fmt.Errorf("kademlia protocol error: tried to transition to the queried state from state %v", st))}}for _, p := range up.unreachable {if p == q.dht.self { // don't add self.continue}if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting {q.queryPeers.SetState(p, qpeerset.PeerUnreachable)} else {panic(fmt.Errorf("kademlia protocol error: tried to transition to the unreachable state from state %v", st))}}
}
isReadyToTerminate
- 如果依次满足stopFn、isStarvationTermination、isLookupTermination则终止查询
- 通过调用queryPeers.GetClosestInStates(qpeerset.PeerHeard),获取queryPeers集合中状态为PeerHeard的peer,只获取maxNumQueriesToSpawn个peer(即alpha - q.queryPeers.NumWaiting())
func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) {// give the application logic a chance to terminateif q.stopFn() {return true, LookupStopped, nil}if q.isStarvationTermination() {return true, LookupStarvation, nil}if q.isLookupTermination() {return true, LookupCompleted, nil}// The peers we query next should be ones that we have only Heard about.var peersToQuery []peer.IDpeers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard)count := 0for _, p := range peers {peersToQuery = append(peersToQuery, p)count++if count == nPeersToQuery {break}}return false, -1, peersToQuery
}//没有节点可查,很饥饿!
func (q *query) isStarvationTermination() bool {return q.queryPeers.NumHeard() == 0 && q.queryPeers.NumWaiting() == 0
}
isLookupTermination
- 调用queryPeers.GetClosestNInStates中获取beta个节点,这些节点状态可能是PeerHeard、PeerWaiting、PeerQueried
- 遍历获取到的beta(默认为3)个节点,如果最近的beta个节点状态不是PeerQueried则说明查询尚未完成。假设要查询的个数为3,返回的PeerHeard、PeerWaiting、PeerQueried的peer各一个,则必须等到PeerHeard、PeerWaiting状态的peer状态转为PeerQueried查询才算终止
func (q *query) isLookupTermination() bool {peers := q.queryPeers.GetClosestNInStates(q.dht.beta, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)for _, p := range peers {if q.queryPeers.GetState(p) != qpeerset.PeerQueried {return false}}return true
}
isStarvationTermination
很饥饿,没有peer可以迭代了。
func (q *query) isStarvationTermination() bool {return q.queryPeers.NumHeard() == 0 && q.queryPeers.NumWaiting() == 0
}
GetClosestNInStates
1.首先对queryPeers做一个排序,最近的排前面
2.遍历queryPeers将状态一致的peer加入result
func (qp *QueryPeerset) GetClosestNInStates(n int, states ...PeerState) (result []peer.ID) {qp.sort()m := make(map[PeerState]struct{}, len(states))for i := range states {m[states[i]] = struct{}{}}for _, p := range qp.all {if _, ok := m[p.state]; ok {result = append(result, p.id)}}if len(result) >= n {return result[:n]}return result
}
这篇关于【go-libp2p源码剖析】DHT Kademlia 迭代查询的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!