NSQ源码分析(五)——Channel

2023-12-16 16:32
文章标签 分析 源码 channel nsq

本文主要是介绍NSQ源码分析(五)——Channel,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Channel相关的代码主要位于nsqd/channel.go, nsqd/nsqd.go中。

Channel是消费者订阅特定Topic的一种抽象。对于发往Topic的消息,nsqd向该Topic下的所有Channel投递消息,而同一个Channel只投递一次,Channel下如果存在多个消费者,则随机选择一个消费者做投递。这种投递方式可以被用作消费者负载均衡。Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。

 aa

一、Channel的创建和初始化

    1.初始化Channel,初始化topicName,name,memoryMsgChan,ctx,clients及删除函数deleteCallback

    2.给e2eProcessingLatencyStream赋值,主要用于统计消息投递的延迟等

    3.initPQ函数创建了两个mapinFlightMessages、deferredMessages和两个队列inFlightPQ和deferredPQ。主要用于索引和存放这两类消息

   4.初始化backend为diskqueue,磁盘存储的消息文件

    5.通知nsqd创建了Channel

    func NewChannel(topicName string, channelName string, ctx *context,deleteCallback func(*Channel)) *Channel {c := &Channel{topicName: topicName,name: channelName,memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),clients: make(map[int64]Consumer),deleteCallback: deleteCallback,ctx: ctx,}if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {c.e2eProcessingLatencyStream = quantile.New(ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles,)}c.initPQ()if strings.HasSuffix(channelName, "#ephemeral") {c.ephemeral = truec.backend = newDummyBackendQueue()} else {dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {opts := ctx.nsqd.getOpts()lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...)}// backend names, for uniqueness, automatically include the topic...backendName := getBackendName(topicName, channelName)c.backend = diskqueue.New(backendName,ctx.nsqd.getOpts().DataPath,ctx.nsqd.getOpts().MaxBytesPerFile,int32(minValidMsgLength),int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,ctx.nsqd.getOpts().SyncEvery,ctx.nsqd.getOpts().SyncTimeout,dqLogf,)}c.ctx.nsqd.Notify(c)return c}

    initPQ函数

   initPQ 主要用于索引和存放这两类消息

   1.获取队列缓冲长度pgSize   值为 1和MemQueueSize/10的最大值,MemQueueSize的默认值为10000

   2.初始化inFlightMessages,存储Message的MessageID和Message的对应关系

   3.初始化inFlightPQ队列,正在投递但还没确认投递成功的消息

   4.初始化deferredMessages 和 deferredPQ  ,deferredPQ 队列是延时消息和投递失败等待指定时间后重新投递的消息

func (c *Channel) initPQ() {pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10))c.inFlightMutex.Lock()c.inFlightMessages = make(map[MessageID]*Message)c.inFlightPQ = newInFlightPqueue(pqSize)c.inFlightMutex.Unlock()c.deferredMutex.Lock()c.deferredMessages = make(map[MessageID]*pqueue.Item)c.deferredPQ = pqueue.New(pqSize)c.deferredMutex.Unlock()}

二、Channel中的消息来源

       在分析Topic时提到,消息进入Topic的消息循环后会被投递到该Topic下所有的Channel,由Channel的PutMessage函数进行处理。

以下是topic的messagePump函数的片段(源码在nsq/nsqd/topic.go文件中的messagePump函数)

func (t *Topic) messagePump() {......for {......for i, channel := range chans { //遍历topic的所有的channelchanMsg := msg// copy the message because each channel// needs a unique instance but...// fastpath to avoid copy if its the first channel// (the topic already created the first copy)//复制消息,因为每个channel需要唯一的实例if i > 0 {chanMsg = NewMessage(msg.ID, msg.Body)chanMsg.Timestamp = msg.TimestampchanMsg.deferred = msg.deferred}if chanMsg.deferred != 0 { //发送延时消息channel.PutMessageDeferred(chanMsg, chanMsg.deferred)continue}//发送即时消息err := channel.PutMessage(chanMsg)if err != nil {t.ctx.nsqd.logf(LOG_ERROR,"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",t.name, msg.ID, channel.name, err)}}}}

从中我们看到topic将Message发送给所有关联的Channels,消息有两种即时消息和延时消息

   Channel接收到延时消息的处理流程

    1.Channel中的messageCount自增,messageCount也就是消息数量

    2.调用StartDeferredTimeout函数,将消息维护到pushDeferredMessage和deferredPQ优先级队列中

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {atomic.AddUint64(&c.messageCount, 1)c.StartDeferredTimeout(msg, timeout)}

   继续来看StartDeferredTimeout函数

 将消息添加到deferredMessages 和 deferredPQ   队列中等待投递

   1.初始化item,Priority的值为当前时间+延时时间的时间戳

    2.调用pushDeferredMessage函数将消息添加到pushDeferredMessage中,pushDeferredMessage该map中储存了MessageID和Message的对应关系

   3.调用addToDeferredPQ将item添加到deferredPQ优先级队列中

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {absTs := time.Now().Add(timeout).UnixNano()item := &pqueue.Item{Value: msg, Priority: absTs}err := c.pushDeferredMessage(item)if err != nil {return err}c.addToDeferredPQ(item)return nil}

  pushDeferredMessage函数

//向deferredMessages map中添加重新投递的消息信息func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {c.deferredMutex.Lock()// TODO: these map lookups are costlyid := item.Value.(*Message).ID_, ok := c.deferredMessages[id]if ok {c.deferredMutex.Unlock()return errors.New("ID already deferred")}c.deferredMessages[id] = itemc.deferredMutex.Unlock()return nil}

 addToDeferredPQ函数

//向deferredPQ队列中添加元素func (c *Channel) addToDeferredPQ(item *pqueue.Item) {c.deferredMutex.Lock()heap.Push(&c.deferredPQ, item)c.deferredMutex.Unlock()}

   Channel接收到即时消息的处理流程

     1.如果文件channel已经退出,则返回错误

     2.调用put(m),将消息写到内存队列memoryMsgChan或磁盘文件中

     3.将该channel的消息数量原子性加1

func (c *Channel) PutMessage(m *Message) error {c.RLock()defer c.RUnlock()if c.Exiting() { //channel已经退出return errors.New("exiting")}err := c.put(m)if err != nil {return err}atomic.AddUint64(&c.messageCount, 1)return nil}

   put函数 

     1.memoryMsgChan内存队列默认缓冲是10000,如果memoryMsgChan已满,则写入到硬盘中

      2.通过bufferPoolGet函数从buffer池中获取一个buffer,bufferPoolGet及以下bufferPoolPut函数是对sync.Pool的简单包装。两个函数位于nsqd/buffer_pool.go中。

    3.调用writeMessageToBackend函数将消息写入磁盘文件中。

    4.通过bufferPoolPut函数将buffer归还buffer池。

     5.调用SetHealth函数将writeMessageToBackend的返回值写入errValue变量。该变量衍生出IsHealthy,GetError和GetHealth3个函数,主要用于测试以及从HTTP API获取nsqd的运行情况(是否发生错误)

func (c *Channel) put(m *Message) error {select {case c.memoryMsgChan <- m:default:b := bufferPoolGet()err := writeMessageToBackend(b, m, c.backend)bufferPoolPut(b)c.ctx.nsqd.SetHealth(err)if err != nil {c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",c.name, err)return err}}return nil}

  三、Channel中消息的投递

     

Channel中的消息是要投递给客户端(消费者),第一节讲到在tcp server监听到有新的客户端连接时会开启一个协程,调用protocol_v2文件中的IOLoop(conn net.Conn)进行客户端读写操作。在IOLoop函数中会开启一个协程调用messagePump函数来轮询将Channel中的消息写给客户端。下面我们主要来看下messagePump函数

源码在nsq/nsqd/protocol_v2.go文件中

处理channel中的消息,channel接收到的消息主要在memoryMsgChan和磁盘文件中

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {.......if sampleRate > 0 && rand.Int31n(100) > sampleRate {continue}msg, err := decodeMessage(b)if err != nil {p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)continue}msg.Attempts++subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)client.SendingMessage()err = p.SendMessage(client, msg)if err != nil {goto exit}flushed = falsecase msg := <-memoryMsgChan:if sampleRate > 0 && rand.Int31n(100) > sampleRate {continue}msg.Attempts++ //投递尝试的次数subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)client.SendingMessage()err = p.SendMessage(client, msg)if err != nil {goto exit}flushed = false}

   看到无论是从磁盘中取出的消息还是从内存队列中取出的消息,执行的流程差不多。   

     1.msg的Attempts自增(消息尝试投递的次数)

      2.调用StartInFlightTimeout函数将本条消息msg添加到inFlightMessages和inFlightPQ优先队列中  (inFlightMessages和inFlightPQ存放已投递但不确定是否投递成功的消息)

     3.调用SendingMessage函数将clientV2中的InFlightCount和MessageCount自增

      4.调用SendMessage函数将消息发送给客户端

   

四、消息投递后的处理

客户端成功消费一条消息后,会发送一个FIN消息,带上message ID 或者客户端如果消费失败,也会发送一个REQ的请求。IOLoop函数中除了开启一个协程调用messagePump函数轮询的投递Channel中的消息,for循环模块中也在轮询读取从客户端返回的消息。

   

func (p *protocolV2) IOLoop(conn net.Conn) error {.....for {......line, err = client.Reader.ReadSlice('\n')if err != nil {if err == io.EOF {err = nil} else {err = fmt.Errorf("failed to read command - %s", err)}break}/*去除行尾的\n \r,并按空格切分成params*/// trim the '\n'line = line[:len(line)-1]// optionally trim the '\r'if len(line) > 0 && line[len(line)-1] == '\r' {line = line[:len(line)-1]}params := bytes.Split(line, separatorBytes)p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)var response []byteresponse, err = p.Exec(client, params)}}

消息投送成功的处理

 客户端成功消费一条消息后,会发送一个FIN消息。会执行到Exec函数中的FIN流程,最后调用FIN函数

1.获取消息id

2.调用FinishMessage方法,从  inFlightMessages 和    inFlightPQ      队列中移除该消息

3.调用     FinishedMessage将该clientV2的FinishCount增1,InFlightCount减1,并并向ReadStateChan发送一个消息;如果服务端因为RDY限制停止推送消息,收到这个消息后,也会重新查看是否可以继续推送消息。

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {state := atomic.LoadInt32(&client.State)if state != stateSubscribed && state != stateClosing {return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state")}if len(params) < 2 {return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params")}id, err := getMessageID(params[1])if err != nil {return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error())}err = client.Channel.FinishMessage(client.ID, *id)if err != nil {return nil, protocol.NewClientErr(err, "E_FIN_FAILED",fmt.Sprintf("FIN %s failed %s", *id, err.Error()))}client.FinishedMessage()return nil, nil}

消息投递失败的处理

  消息投递失败的处理流程主要在REQ函数中

    1.获取消息id

    2.获取timeoutDuration的值

     3.调用RequeueMessage方法,将消息msg 根据消息id从inFlightMessages和inFlightPQ队列中移除,并根据timeoutDuration的值决定将该消息添加到deferredMessages 和 deferredPQ   队列中,还是放到memoryMsgChan或磁盘文件中 并等待下次投递

    4.调用RequeuedMessage方法,将clientV2的RequeueCount值增1,将InFlightCount,减1,并并向ReadStateChan发送一个消息

func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) {state := atomic.LoadInt32(&client.State)if state != stateSubscribed && state != stateClosing {return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot REQ in current state")}if len(params) < 3 {return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "REQ insufficient number of params")}id, err := getMessageID(params[1])if err != nil {return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error())}timeoutMs, err := protocol.ByteToBase10(params[2])if err != nil {return nil, protocol.NewFatalClientErr(err, "E_INVALID",fmt.Sprintf("REQ could not parse timeout %s", params[2]))}timeoutDuration := time.Duration(timeoutMs) * time.MillisecondmaxReqTimeout := p.ctx.nsqd.getOpts().MaxReqTimeoutclampedTimeout := timeoutDurationif timeoutDuration < 0 {clampedTimeout = 0} else if timeoutDuration > maxReqTimeout {clampedTimeout = maxReqTimeout}if clampedTimeout != timeoutDuration {p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] REQ timeout %d out of range 0-%d. Setting to %d",client, timeoutDuration, maxReqTimeout, clampedTimeout)timeoutDuration = clampedTimeout}err = client.Channel.RequeueMessage(client.ID, *id, timeoutDuration)if err != nil {return nil, protocol.NewClientErr(err, "E_REQ_FAILED",fmt.Sprintf("REQ %s failed %s", *id, err.Error()))}client.RequeuedMessage()return nil, nil}

RequeueMessage函数是消息投递失败的主要流程

  1.将消息msg 根据消息id从inFlightMessages和inFlightPQ队列中移除

  2.如果timeout为0,则将该消息重新添加到memoryMsgChan或磁盘文件中,等待下次投递

  3.如果timeout大于0,则将消息添加到deferredMessages 和 deferredPQ   队列中等待重新投递

    func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {// remove from inflight firstmsg, err := c.popInFlightMessage(clientID, id)if err != nil {return err}c.removeFromInFlightPQ(msg)atomic.AddUint64(&c.requeueCount, 1)if timeout == 0 {c.exitMutex.RLock()if c.Exiting() {c.exitMutex.RUnlock()return errors.New("exiting")}err := c.put(msg)c.exitMutex.RUnlock()return err}// deferred requeuereturn c.StartDeferredTimeout(msg, timeout)}

   (1)timeout为0的情况(timeout可以理解成消息投递失败后,需要等待多久之后再投递)

       调用put函数将消息写到memoryMsgChan或磁盘文件中,前面已经介绍过这个函数,这里就不在详细说明。

  

func (c *Channel) put(m *Message) error {select {case c.memoryMsgChan <- m:default:b := bufferPoolGet()err := writeMessageToBackend(b, m, c.backend)bufferPoolPut(b)c.ctx.nsqd.SetHealth(err)if err != nil {c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",c.name, err)return err}}return nil}

(2)timeout大于0的情况

      调用StartDeferredTimeout函数将消息写入到pushDeferredMessage 和 deferredPQ中。这个函数在前面Channel中获取延时消息也是调用这个函数。

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {absTs := time.Now().Add(timeout).UnixNano()item := &pqueue.Item{Value: msg, Priority: absTs}err := c.pushDeferredMessage(item)if err != nil {return err}c.addToDeferredPQ(item)return nil}

 

五、Channel的暂停和取消暂停

   Channel的暂停和取消暂停和Topic的操作一样,由Channel中paused字段的值决定,该字段是原子操作的,paused为1表示暂停状态,0表示未暂停。

func (c *Channel) Pause() error {return c.doPause(true)}func (c *Channel) UnPause() error {return c.doPause(false)}//暂停或取消暂停向客户端发送消息func (c *Channel) doPause(pause bool) error {if pause {atomic.StoreInt32(&c.paused, 1)} else {atomic.StoreInt32(&c.paused, 0)}c.RLock()for _, client := range c.clients {if pause {client.Pause()} else {client.UnPause()}}c.RUnlock()return nil}//返回该Channel是否是暂停状态func (c *Channel) IsPaused() bool {return atomic.LoadInt32(&c.paused) == 1}

 

这篇关于NSQ源码分析(五)——Channel的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/501137

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专