本文主要是介绍etcd系列-----raft网络层实现之rafthttp模块,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
raftexample示例中, raftNode.serveRaft()方法中有一段使用的HTTP库的代码,实现如下:
func (rc *raftNode) serveRaft() {url, err := url.Parse(rc.peers[rc.id-1])if err != nil {log.Fatalf("raftexample: Failed parsing URL (%v)", err)}
//创建stoppableListener实例, stoppableListener内嵌了net.TCPListener接口,它会与http. Server配合实现对当前节点的URL地址进行监听ln, err := newStoppableListener(url.Host, rc.httpstopc)if err != nil {log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)}err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)//创建http.Server实例select {case <-rc.httpstopc:default:log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)}close(rc.httpdonec)
}
这里创建了一个http.Server实例,它会通过传入的TCPListener实例监昕指定地址上的连接请求,当TCPListener实例通过Accept()方法监昕到新连接到来时,会创建对应的net.Conn实例(可以将其理解为一个单独的网络连接), http.Server 会为每个连接启动一个独立的后台goroutine,处理该连接上的请求。每个请求的处理逻辑封装在http.Server.Handler 中(创建http.Server实例时传入),流程如下图:
处理消息使用的Handler实例是由rafthttp.Transporter.Handler()方法创建的, 其中为多个路径绑定了对应的Handler实例:
func (t *Transport) Handler() http.Handler {
//创建pipelineHandler、 streamHandler和snapshotHandler三个实例,这三个实例都实现了http.Server.Handler接口,具体实现在本章后面介绍rafthttp.Transport时详细分析pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID):= http.NewServeMux()mux.Handle(RaftPrefix, pipelineHandler)mux.Handle(RaftStreamPrefix+"/", streamHandler)mux.Handle(RaftSnapshotPrefix, snapHandler)mux.Handle(ProbingPrefix, probing.NewHandler())return mux
}
这些handle是如何被调用的呢?首先是http.Server.Serve()方法,其中通过net.Listener接收新连接,并为每个连接启动一个单独的后台goroutine来处理该连接上的请求.
在http.Server.Serve()中首先调用ηet.Listener的Accept()函数将将net.Conn封装成http.conn,然后单独起一个goroutine处理新连接。进入http.conn.Serve()方法,该方法会读取客户端的请求创建serverHandler实例,并调用其ServeHTTP()方法处理请求。处理单个请求的核心就是serverHandler.ServeHTTP()方法,该方法会将请求交给绑定的http.Server.Handler实例进行处理,需要注意的是这里serverHandler.srv字段指向的就是前面创建的http.Server实例
func (sh serverHandler) ServeHTTP(rw ResponseWr工ter, req *Request) { //http.Server.Handler字段指向的就是前面通过Transport.Handler()方法创建的http.ServeMux实例handler := sh.srv.Handler if handler == nil {//如采http.Server未初始化Handler字段,则使用默认的ServeMux实例handler = DefaultServeMux}handler.ServeHTTP(rw, req) //调用http.ServeMux.ServeHTTP()方法进行处理
}
http.ServeMux.ServHttp()方法会根据请求的具体路径,选择合适的Handler 实例处理请求,该方法的具体实现如下:
func (mux *ServeMux) ServeHTTP(w ResponseWr工ter, r *Request) { h, := mux.Handler(r) // 根据请求路径选择对应的Handlerh.ServeHTTP(w, r) // 将请求交给Handler进行处理
}
//下面是ServeMux. Handler()方法的具体实现
func (mux *ServeMux) Handler (r *Request) (h Handler, pattern string) { //对CONNECT请求方法的处理(略)host : = stripHostPort(r.Host) //从请求中获取host信息path := cleanPath(r.URL.Path) // 从请求中获取请求路径return mux.handler(host, r .URL.Path) // 根据host信息和请求路径查找对应的Handler实例
}
//下面是ServeMux.handler()方法的具体实现
func (mux *ServeMux) handler (host, path string) (h Handler, pattern string) { if mux.hosts { //首先根据host和path的组合查找h, pattern= mux.match(host +path) }if h == nil { //只根据path查找h, pattern = mux.match(path) }return
}
//下面是ServeMux. match()方法的具体实现
func (mux *ServeMux) match (path string) (h Handler, pattern string) { v, ok := mux.m[path] //如采存在精确匹配的Handler,则直接返回if ok { return v.h, v.pattern } for k, v := range mux.m { //查找与path匹配程度最高的Handler实例//具体查找过程(略)}return
}
rafthttp模块详解
通过前面介绍对raft模块的介绍我们知道,raft模块井未提供网络层的相关实现,而是将待发送的消息封装进Ready实例返回给上层模块,然后由上层模块决定如何将这些消息发送到集群中的其他节点。etcd 将网络层相关的实现独立成一个单独的模块,也就是要详细介绍的rafthttp模块。之所以独立出该模块,是为了降低raft模块与网络层实现之间的耦合,降低raft模块和rafthttp模块实现的成本,提高整个程序的可扩展性。在etcd中有多种消息类型,不同类型的消息所能携带的数据大小也不尽相同,其中快照相关消息的数据量就比较大,小至几k,大至几GB都是有可能的,而Leader节点到Follower,大至几GB都是有可能的,而Leader节点到Follower节点之间的心跳消息一般只有几十到几百个字节。因此,etcd的网络层会创建两个消息传输通道。
上面说的两个消息传输通道:Stream消息通道和Pipeline消息通道。这两种消息通道的主要区别在于: Stream消息通道维护的HTTP长连接,主要负责传输数据量较小、发送比较频繁的消息, 例如,前面介绍的MsgApp消息、MsgHeartbeat消息、MsgVote消息等;而Pipeline 消息通道在传输数据完成后会立即关闭连接,主要负责传输数据量较大、发送频率较低的消息,例如,MsgProp、MsgSnap消息等。
Stream消息通道是节点启动后,主动与集群中的其他节点建立的。每个Stream消息通道有2个关联的后台goroutine, 其中一个用于建立关联的HTTP连接,并从连接上读取数据,然后将这些读取到的数据反序列化成Message实例,传递到raft模块中进行处理。另外一个后台goroutine会读取raft模块返回的消息数据并将其序列化,最后写入Stream消息通道。前面对raftNode.serveRaft()方法的介绍,在启动http.Server 时会通过rafthttp.Transporter.Handler()方法为指定的URL路径添加相应的Handler实例,其中“/raft/stream/” 路径对应的Handler为此eamHandler类型,它负责处理Stream消息通道上的请求。相同的,pipelineHandler、snapshotHandler类型都有相应的路径。
1、rafthttp.Transporter接口
rafthttp.Transporter 接口,它是rafthttp包的核心接口之一,它定义了etcd网络层的核心功能,其具体定义如下:
type Transporter interface {//初始化操作Start() error//创建Handler实例,并关联到指定的URL上Handler() http.Handler//发送消息Send(m []raftpb.Message)//发送快照数据SendSnapshot(m snap.Message)//在集群中添加一个节点时,其他节点会通过该方法添加该新加入节点的信息AddRemote(id types.ID, urls []string)Peer接口是当前节点对集群中其他节点的抽象表示,而结构体peer则是Peer接口的一个具体实现下面几个方法就是对Peer的操作,通过名称即可了解其含义AddPeer(id types.ID, urls []string)RemovePeer(id types.ID)RemoveAllPeers()UpdatePeer(id types.ID, urls []string)ActiveSince(id types.ID) time.TimeActivePeers() intStop()
}
还有另一个需要重点介绍的接口:Raft,在前面介绍raftexample示例时,也简单提到过该接口,该接口的定义如下:
type Raft interface {//将指定的消息实例传递到底层的etcd-raft模块进行处理Process(ctx context.Context, m raftpb.Message) errorIsIDRemoved(id uint64) bool//通知底层etcd-raft模块, 当前节点与指定的节点无法遥远ReportUnreachable(id uint64)//通知底层的etcd-raft模块,快照数据是否发送成功ReportSnapshot(id uint64, status raft.SnapshotStatus)
}
rafthttp.Transport是rafthttp.Transporter接口具体实现,rafthttp.Transport中定义的关键字段:ID (types.ID类型): 当前节点自己的ID。URLs ( types.URLs类型): 当前节点与集群中其他节点交互时使用的Url地址。ClusterlD ( types.ID类型):当前节点所在的集群的ID。Raft ( Raft类型): Raft是一个接口,其实现的底层封装了前面介绍的etcd-raft模块,当rafthttp.Transport收到消息之后,会将其交给Raft实例进行处理。Snapshotter ( *snap. Snapshotter类型):Snapshotter负责管理快照文件,后面会介绍其实现。streamRt ( http.RoundTripper类型): Stream消息通道中使用的http. RoundTripper实例。pipelineRt ( http.RoundTripper 类型):Pipeline 消息通道中使用的http.RoundTripper实例peers( map[types. ID]Peer类型):Peer接口是当前节点对集群中其他节点的抽象表示。对于当前节点来说,集群中其他节点在本地都会有一个Peer 实例与之对应,peers 字段维护了节点ID到对应Peer实例之间的映射关系。remotes ( map[types.ID]*remote类型): remote 中只封装了pipeline 实例,remote主要负责发送快照数据,帮助新加入的节点快速追赶上其他节点的数据。prober ( probing.Prober类型):用于探测Pipeline消息通道是否可用。func (t *Transport) Start() error {var err error//创建Stream消息通道使用的http.RoundTripper实例,底层实际上是创建http.Transport实例,//这里需要读者注意一下几个参数的设置,分别是:创建连接的超时时间(根据配置指定)、读写请求的超时时间(默认Ss)和keepAl工ve时间(默认为30s)t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)if err != nil {return err}//创建Pipeline消息通道用的http.RoundTripper实例与streamRt不同的是,读写请求的起时时间设置成了永不过期t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)if err != nil {return err}t.remotes = make(map[types.ID]*remote)t.peers = make(map[types.ID]Peer)t.pipelineProber = probing.NewProber(t.pipelineRt)t.streamProber = probing.NewProber(t.streamRt)// If client didn't provide dial retry frequency, use the default// (100ms backoff between attempts to create a new stream),// so it doesn't bring too much overhead when retry.if t.DialRetryFrequency == 0 {t.DialRetryFrequency = rate.Every(100 * time.Millisecond)}return nil
}
Transport.Handler()方法主要负责创建Steam消息通道和Pipeline 消息通道用到的Handler实例,并注册到相应的请求路径上, 其具体实现在前面介绍过了 。
下面再来看一下Transport.AddPeer()方法,其主要工作就是创建井启动对应节点的Peer实例,具体实现如下:
func (t *Transport) AddPeer(id types.ID, us []string) {t.mu.Lock()defer t.mu.Unlock()if t.peers == nil {panic("transport stopped")}if _, ok := t.peers[id]; ok {return}urls, err := types.NewURLs(us)if err != nil {plog.Panicf("newURLs %+v should never fail: %+v", us, err)}fs := t.LeaderStats.Follower(id.String())//创建指定节点对应的Peer实例,其中会相关的Stream消息通道和Pipeline消息通道t.peers[id] = startPeer(t, urls, id, fs)//每隔一段时间, prober会向该节点发送探测消息, 检测对端的健康状况addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)plog.Infof("added peer %s", id)
}
Transport. Send()方法负责发送指定的ra位pb.Message消息,其中首先尝试使用目标节点对应的Peer实例发送消息,如果没有找到对应的Peer实例,则尝试使用对应的remote实例发送消息。 Transport.Send()方法的具体实现如下
func (t *Transport) Send(msgs []raftpb.Message) {for _, m := range msgs {//遍历msgs切片中的全部J肖息if m.To == 0 {// ignore intentionally dropped messagecontinue}//根据raftpb.Message.To字段,获取目标节点对应的Peer实例to := types.ID(m.To)t.mu.RLock()p, pok := t.peers[to]g, rok := t.remotes[to]t.mu.RUnlock()if pok {if m.Type == raftpb.MsgApp {t.ServerStats.SendAppendReq(m.Size())}p.send(m)//通过peer.send()方法完成消息的发送continue}if rok {//如采指定节点ID不存在对应的Peer实例,则尝试使用查找对应remote实例,通过remote. send()方法完成消息的发送g.send(m)continue}plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)}
}
Transport.SendSnapshot()方法负责发送指定的snap.Message 消息(其中封装了对应的MsgSnap消息实例及其他相关信息〉。该方法是通过调用peer.sendSnap()方法完成的。
2、 Peer接口
type Peer interface {//发送单个消息, 该方法是非阻塞的,如采出现发送失败,则会将失败信息报告给底层的Raft接口send(m raftpb.Message)//发送snap.Message,其他行为与上面的send()方法类似sendSnap(m snap.Message)update(urls types.URLs)attachOutgoingConn(conn *outgoingConn)activeSince() time.Timestop()
}
rafhttp.peer是Peer接口的具体实现,其中各字段的含义如下 id ( types.ID类型):该peer实例对应的节点的ID。r ( Raft类型): Raft接口,在Raft接口实现的底层封装了etcd-raft模块。picker ( *urlPicker类型):每个节点可能提供了多个URL供其他节点访问, 当其中一个访问失败时,我们应该可以尝试访问另一个。 而urlPicker提供的主要功能就是在这些URL之间进行切换。writer ( *streamWrite「类型): 负责向Stream消息通道写入消息。msgAppReader ( *streamReader类型): 负责从Stream消息通道读取消息。pipeline ( *pipeline类型):Pipeline消息通道。snapSender ( *snapshotSende「类型): 负责发送快照数据。recvc ( ehan raftpb.Message类型):从Stream消息通道中读取到消息之后, 会通过该通道将消息交给Raft接口,然后由它返回给底层etcd-raft模块进行处理。Prope( ehan raftpb. Message类型):从Stream消息通道中读取到MsgProp类型的消息之后,会通过该通道将MsgProp消息交给Raft接口,然后由它返回给底层etcd-raft模块进行处理。paused ( bool类型):是否暂停向对应节点发送消息。
在rafthttp.peer.startPeer()方法中完成了初始化上述字段的工作,同时也启动了关联的后台goroutine。该方法的具体实现如下
func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {plog.Infof("starting peer %s...", peerID)defer plog.Infof("started peer %s", peerID)status := newPeerStatus(peerID)picker := newURLPicker(urls)errorc := transport.ErrorCr := transport.Raftpipeline := &pipeline{//创建pipeine实例peerID: peerID,tr: transport,picker: picker,status: status,followerStats: fs,raft: r,errorc: errorc,}pipeline.start()//启动pipeline//创建Peer实例p := &peer{id: peerID,r: r,status: status,picker: picker,msgAppV2Writer: startStreamWriter(peerID, status, fs, r),writer: startStreamWriter(peerID, status, fs, r),//创建并启动streamWriterpipeline: pipeline,snapSender: newSnapshotSender(transport, picker, peerID, status),recvc: make(chan raftpb.Message, recvBufSize),//创建recvc远远,注意缓冲区大小propc: make(chan raftpb.Message, maxPendingProposals),//创建propc通道,注意缓冲区大小stopc: make(chan struct{}),}ctx, cancel := context.WithCancel(context.Background())p.cancel = cancel//启动单独的goroutine,它主妾负责将recvc通道中读取消息,该通道中的消息就是从对端节点发送过来的消息,然后将读取到的消息交给底层的Raft状态机进行处理go func() {for {select {case mm := <-p.recvc://从recvc远远中获取连接上读取到的Message,将Message交给底层Raft状态机处理if err := r.Process(ctx, mm); err != nil {plog.Warningf("failed to process raft message (%v)", err)}case <-p.stopc:return}}}()//在底层Raft状态机处理MsgProp类型的Message时,可能会阻塞,所以启动单独的goroutine来处理go func() {for {select {case mm := <-p.propc://从propc通道中获取MsgProp类型的Message,将Message交给底层Raft状态机处理if err := r.Process(ctx, mm); err != nil {plog.Warningf("failed to process raft message (%v)", err)}case <-p.stopc:return}}}()p.msgAppV2Reader = &streamReader{peerID: peerID,typ: streamTypeMsgAppV2,tr: transport,picker: picker,status: status,recvc: p.recvc,propc: p.propc,rl: rate.NewLimiter(transport.DialRetryFrequency, 1),}//创建并启动streamReader实例, 它主要负责从Stream消息遥远上读取消息p.msgAppReader = &streamReader{peerID: peerID,typ: streamTypeMessage,tr: transport,picker: picker,status: status,recvc: p.recvc,propc: p.propc,rl: rate.NewLimiter(transport.DialRetryFrequency, 1),}p.msgAppV2Reader.start()p.msgAppReader.start()return p
}
看看peer核心方法。首先是peer.send()方法,前面分析的Transport.Send()方法就是通过调用该方法实现消息发送功能的
func (p *peer) send(m raftpb.Message) {p.mu.Lock()paused := p.pausedp.mu.Unlock()if paused {return}//根据消息的类型选择合适的消息通道,peer.pick ()方法下面会详细介绍writec, name := p.pick(m)select {case writec <- m://将Message写入writec通道中,发送default:p.r.ReportUnreachable(m.To)if isMsgSnap(m) {p.r.ReportSnapshot(m.To, raft.SnapshotFailure)}if p.status.isActive() {plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)}plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()}
}
在peer.pick()方法中,会根据消息的类型选择合适的消息通道并返回相应的通道供send()方法写入待发迭的消息,具体实现如下
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {var ok bool//如果是MsgSnap类型的消息, 返回Pipeline消息通道对应的Channel,否则返回Stream消息通道对应的Channel,如果Stream消息通道不可用,则使用Pipeline消息通道发送所有类型的消息if isMsgSnap(m) {return p.pipeline.msgc, pipelineMsg} else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {return writec, streamAppV2} else if writec, ok = p.writer.writec(); ok {return writec, streamMsg}return p.pipeline.msgc, pipelineMsg
}
3、streamWriter
在peer.start()方法中会创建并启动了一个sreamWriter实例。从命名上可以猜出其主要功能就是向Stream 消息通道写入消息在peer.start()方法中是通过调用startStreamWriter()方法初始化并启动streamWriter 实例的,其中还启动了一个后台goroutine来执行streamWriter.run()方法。在streamWriter.run() 方法中,主要完成了下面三件事情:
(1)当其他节点主动与当前节点创建连接(即Stream消息通道底层使用的网络连接〉时,该连接实例会写入对应peer.writer.connc通道, 在streamWriter.run()方法中会通过该通道获取该连接实例井进行绑定,之后才能开始后续的消息发送。
(2)定时发送心跳消息,该心跳消息并不是前面介绍raft模块时提到的MsgHeatbeat消息,而是为了防止底层连接超时的消息。
(3)发送除心跳消息外的其他类型的消息。
func (cw *streamWriter) run() {var (msgc chan raftpb.Message //指向当前streamWriter. msgc字段heartbeatc <-chan time.Time //定时器会定时向该通道发送信号, 触发心跳消息的发送,该心跳消息与后面介绍的Raft的心跳消息有所不同,该心跳消息的主要目的是为了防止连接长时间不用断升的t streamType //用来记录消息的版本信息enc encoder //编码器,负责将消息序列化并写入连接的缓冲区flusher http.Flusher //负责刷新底层连接,将数据真正发送出去batched int //当前未Flush的消息个数)tickc := time.NewTicker(ConnReadTimeout / 3)//发送心跳消息的定时器defer tickc.Stop()unflushed := 0plog.Infof("started streaming with peer %s (writer)", cw.peerID)for {select {case <-heartbeatc: //定时器到期, 触发心跳消息err := enc.encode(&linkHeartbeatMessage)unflushed += linkHeartbeatMessage.Size()if err == nil {flusher.Flush()batched = 0sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))unflushed = 0continue}cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())sentFailures.WithLabelValues(cw.peerID.String()).Inc()cw.close()plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)//将heartbeatc和msgc两个通道清空,后续就不会再发送心跳消息和其他类型的消息了heartbeatc, msgc = nil, nilcase m := <-msgc://peer向streamWriter.msgc写入待发送的消息err := enc.encode(&m)//将消息序列化并写入底层连接if err == nil {unflushed += m.Size()if len(msgc) == 0 || batched > streamBufSize/2 {//msgc通道中的消息全部发送完成或是未Flush的消息较多,则触发Flush,否则只是递增batched变量flusher.Flush()sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))unflushed = 0batched = 0} else {batched++}continue}cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())cw.close()plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)heartbeatc, msgc = nil, nilcw.r.ReportUnreachable(m.To)sentFailures.WithLabelValues(cw.peerID.String()).Inc()case conn := <-cw.connc://获取与当前streamWriter实例绑定的底层连接//当其他节点主动与当前节点创建Stream消息通道时,会先通过StreamHandler的处理,//StreamHandler会通过attach()方法将连接写入对应peer.writer.connc通道,//而当前的goroutine会通过该通道获取连接,然后开始发送消息cw.mu.Lock()closed := cw.closeUnlocked()t = conn.tswitch conn.t {case streamTypeMsgAppV2:enc = newMsgAppV2Encoder(conn.Writer, cw.fs)case streamTypeMessage://将http.ResponseWriter封装成messageEncoder,上层调用通过messageEncoder实例完成消息发送enc = &messageEncoder{w: conn.Writer}default:plog.Panicf("unhandled stream type %s", conn.t)}flusher = conn.Flusher //记录底层连接对应的Flusherunflushed = 0cw.status.activate() //peerStatus.activeit直为truecw.closer = conn.Closer //记录底层连接对应的Flushercw.working = true //标识当前streamWriter正在运行cw.mu.Unlock()if closed {plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)}plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)heartbeatc, msgc = tickc.C, cw.msgc //更新heartbeatc和msgc两个远远,自此之后,才能发送消息case <-cw.stopc:if cw.close() {plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)}plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)close(cw.done)return}}
}
stream Writer中另一个需要读者了解的方法是attach()方法,该方法会接收outgoingConn实例井写入streamWriter.connc通道中,peer.attachOutgoingConn()方法就是通过调用该方法实现的。在后面介绍的streamHandler中,也就是通过调用peer.attachOutgoingConn()方法将底层网络连接传递到streamWriter中的。 streamWriter.attach()方法和peer.attachOutgoingConn()方法的具体实现如下:
func (p *peer) attachOutgoingConn(conn *outgoingConn) {var ok boolswitch conn.t {case streamTypeMsgAppV2:ok = p.msgAppV2Writer.attach(conn)case streamTypeMessage://这里只关注最新版本的实现,将conn实例交给streamWriter处理ok = p.writer.attach(conn)default:plog.Panicf("unhandled stream type %s", conn.t)}if !ok {conn.Close()}
}
func (cw *streamWriter) attach(conn *outgoingConn) bool {select {case cw.connc <- conn://将conn实例写入strearnWriter.connc通过中return truecase <-cw.done:return false}
}
4、streamReader
在peer.start()方法中,除了创建前面介绍的steamWriter,还创建井启动了一个sreamReader实例。 从该结构体的命名和前面对streamWriter 的介绍, 可以猜出其主要功能是从Stream通道中读取消息
peerlD (types. ID类型):对应节点的ID。
typ ( streamType类型):关联的底层连接使用的协议版本信息。
tr (寸「anspo同类型): 关联的rafthttp.Transport实例。
picker ( *urlPicker类型):用于获取对端节点的可用的URL。
recvc ( ehan<- Message 类型):在前面介绍的peer.startPeer()方法中提到, 创建streamReader 实例时是使用peer.recvc 通道初始化该宇段的,其中还会启动一个后台goroutine 从peer.recvc 通道中读取消息。在下面分析中会看到,从对端节点发送来的非MsgProp类型的消息会首先由streamReader 写入recvc通道中, 然后由peer.start()启动的后台goroutine读取出来, 交由底层的raft模块进行处理。
prope ( ehan<-raftpb.Message类型):该通道与上面介绍的recvc通道类似,只不过其中接收的是MsgProp类型的消息。
paused ( bool类型):是否暂停读取数据。
在streamReader.start()方法中会启动一个单独的后台goroutine来执行streamReader.run()方法,与streamWriter类似,它也是streamReader的核心。
func (cr *streamReader) run() {t := cr.typplog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)for {rc, err := cr.dial(t)//向对端节点发送一个GET请求, 主要负责与对端节点建立连接if err != nil {if err != errUnsupportedStreamType {cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())}} else {cr.status.activate()plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)err = cr.decodeLoop(rc, t)//如采未出现异常,则开始读取对端返回的消息,并将读取到的消息写入streamReader.recvc中plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)switch {// all data is read outcase err == io.EOF:// connection is closed by the remotecase transport.IsClosedConnError(err):default:cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())}}// Wait for a while before new dial attempterr = cr.rl.Wait(cr.ctx)if cr.ctx.Err() != nil {plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)close(cr.done)return}if err != nil {plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)}}
}//decodeLoop()方法是streamReader中的核心方法,它会从底层的网络连接读取数据并进行反序列化, 之后将得到的消息实例写入recvc通道(或prope通道)中, 等待Peer进行处理, 其具体实现如下:
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {var dec decodercr.mu.Lock()switch t {case streamTypeMsgAppV2:dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)case streamTypeMessage:dec = &messageDecoder{r: rc}//messageDeeoder主要负责从连接中读取数据default:plog.Panicf("unhandled stream type %s", t)}select {case <-cr.ctx.Done():cr.mu.Unlock()if err := rc.Close(); err != nil {return err}return io.EOFdefault:cr.closer = rc}cr.mu.Unlock()for {m, err := dec.decode()//从底层连接中读取数据,并反序列化成raftpb.Message实例if err != nil {cr.mu.Lock()cr.close()cr.mu.Unlock()return err}receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))cr.mu.Lock()paused := cr.pausedcr.mu.Unlock()if paused {continue}if isLinkHeartbeatMessage(&m) {// raft is not interested in link layer// heartbeat message, so we should ignore// it.continue}//根据读取到的消息类型,选择对应通道进行写入recvc := cr.recvcif m.Type == raftpb.MsgProp {recvc = cr.propc}select {case recvc <- m://将消息写入对应的远远中,之后会交给底层的Raft状态机进行处理default: //recvc通过满了之后,只能丢弃消息,并打印日志(if cr.status.isActive() {plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))}plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()}}
}
5、pipelineHandler实例
在前面介绍的ra的ttp.Transport.Handler()方法中, 会创建多个Handler实例并与指定的URL路径关联,我们重点关注其中的pipelineHandler和streamHandler两个Handler的具体实现
在前面分析的pipeline.handle()方法和pipeline.post()方法中,我们只看到了创建连接和发送请求的逻辑,这里介绍在对端节点的pipelineHandler 中是如何读取快照消息的。下面先来看一下pipelineHandler中各个字段的含义。
tr ( Transporter类型): 当前pipeline实例关联的rafthttp.Transport实例。
r ( Raft类型): 底层的Raft实例。
cid (types.ID类型): 当前集群的ID。
正如前面介绍的那样, 在pipelineHandler中实现了http.Server.Handler接口的ServeHTTP()方法,也是其处理请求的核心方法。pipelineHandler.ServerHTTP()方法通过读取对端节点发来的请求得到相应的消息实例, 然后将其交给底层的raft模块进行处理, 该方法的具体大致实现如下:
正如前面介绍的那样, 在pipelineHandler中实现了http.Server.Handler接口的ServeHTTP()方法,也是其处理请求的核心方法。pipelineHandler.ServerHTTP()方法通过读取对端节点发来的请求得到相应的消息实例, 然后将其交给底层的raft模块进行处理, 该方法的具体大致实现如下:
func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {//首先进行一系列的检查,例如· 检查请求的Method是否为POST, 检测集群ID是否合法,等等(if r.Method != "POST" {w.Header().Set("Allow", "POST")http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)return}w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {http.Error(w, err.Error(), http.StatusPreconditionFailed)return}addRemoteFromRequest(h.tr, r)//限制每次从底层连接读取的字节数上线,默认是64KB,因为快照数据可能非常大,为了防止读取超时,//只能每次读取一部分数据到缓冲区中,最后将全部数据拼接起来,得到完整的快照数据limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)b, err := ioutil.ReadAll(limitedr)//读取HTTP请求的Body的全部内容if err != nil {plog.Errorf("failed to read raft message (%v)", err)http.Error(w, "error reading raft message", http.StatusBadRequest)recvFailures.WithLabelValues(r.RemoteAddr).Inc()return}//反序列化得到raftpb.Message实例var m raftpb.Messageif err := m.Unmarshal(b); err != nil {plog.Errorf("failed to unmarshal raft message (%v)", err)http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)recvFailures.WithLabelValues(r.RemoteAddr).Inc()return}receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))//将读取到的消息实例交给成层的Raft状态机进行处理,if err := h.r.Process(context.TODO(), m); err != nil {switch v := err.(type) {case writerToResponse:v.WriteTo(w)default:plog.Warningf("failed to process raft message (%v)", err)http.Error(w, "error processing raft message", http.StatusInternalServerError)w.(http.Flusher).Flush()// disconnect the http streampanic(err)}return}// Write StatusNoContent header after the message has been processed by// raft, which facilitates the client to report MsgSnap status.w.WriteHeader(http.StatusNoContent)//向对端节点返回合适的状态码, 表示请求已经被处理
}
在前面分析streamWriter 时介绍了向对端消息发送的基本逻辑,在分析streamReader时,我们大致了解了建立连接和读取对端消息的过程。这里主要介绍streamHandler, 它主要负责在接收到对端的网络连接之后,将其与对应的streamWriter实例进行关联。这样, streamWriter 就可以开始向对端节点发送消息了。下面来分析streamHandler中各个字段的含义。
tr ( *Transp。同类型): 关联的rafthttp.Transport实例。
peerGetter ( peerGetter类型):peerGetter接口中的Get()方法会根据指定的节点ID获取对应的peer实例。
r ( Raft类型):底层的Raft实例。
id ( types.ID类型): 当前节点的ID。
cid (types.ID类型): 当前集群的ID
6、streamHandler
在前面分析streamWriter 时介绍了向对端消息发送的基本逻辑,在分析streamReader时,我们大致了解了建立连接和读取对端消息的过程。这里主要介绍streamHandler, 它主要负责在接收到对端的网络连接之后,将其与对应的streamWriter实例进行关联。这样, streamWriter 就可以开始向对端节点发送消息了。下面来分析streamHandler中各个字段的含义。
tr ( *Transp。同类型): 关联的rafthttp.Transport实例。
peerGetter ( peerGetter类型):peerGetter接口中的Get()方法会根据指定的节点ID获取对应的peer实例。
r ( Raft类型):底层的Raft实例。
id ( types.ID类型): 当前节点的ID。
cid (types.ID类型): 当前集群的ID。
//streamHandler的核心也是ServeHTTP()方法
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {//检测请求Method是否为GET,检测集群的IDif r.Method != "GET" {w.Header().Set("Allow", "GET")http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)return}w.Header().Set("X-Server-Version", version.Version)w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {http.Error(w, err.Error(), http.StatusPreconditionFailed)return}var t streamTypeswitch path.Dir(r.URL.Path) {//确定使用的协议版case streamTypeMsgAppV2.endpoint():t = streamTypeMsgAppV2case streamTypeMessage.endpoint():t = streamTypeMessage default:plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)http.Error(w, "invalid path", http.StatusNotFound)return}fromStr := path.Base(r.URL.Path)//获取对揣节点的ID,异常处理from, err := types.IDFromString(fromStr)if err != nil {plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)http.Error(w, "invalid from", http.StatusNotFound)return}if h.r.IsIDRemoved(uint64(from)) {plog.Warningf("rejected the stream from peer %s since it was removed", from)http.Error(w, "removed member", http.StatusGone)return}p := h.peerGetter.Get(from)//根据对端节点ID获取对应的Peer实例if p == nil {if urls := r.Header.Get("X-PeerURLs"); urls != "" {h.tr.AddRemote(from, strings.Split(urls, ","))}plog.Errorf("failed to find member %s in cluster %s", from, h.cid)http.Error(w, "error sender not found", http.StatusNotFound)return}wto := h.id.String()//获取当前节点的IDif gto := r.Header.Get("X-Raft-To"); gto != wto {plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)http.Error(w, "to field mismatch", http.StatusPreconditionFailed)return}w.WriteHeader(http.StatusOK)//返回200状态码w.(http.Flusher).Flush()//调用Flush()方法将响应数据发送到对端节点c := newCloseNotifier()conn := &outgoingConn{ //创建outgoingConn实例t: t,Writer: w,Flusher: w.(http.Flusher),Closer: c,}//将outgoingConn实例与对应的streamWriter实例绑定,peer.attachOutgoingConn ()方法在前面attach部分已经介绍过p.attachOutgoingConn(conn)<-c.closeNotify()
}
这篇关于etcd系列-----raft网络层实现之rafthttp模块的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!