NSQ源码分析(三)——disQueue

2023-12-16 16:32
文章标签 分析 源码 nsq disqueue

本文主要是介绍NSQ源码分析(三)——disQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

disQueue是Topic以及Channel中的内存消息满了以后,消息存放的实现方法,diskQueue实现了文件系统支持的FIFO队列,disQueue也是BackendQueue接口的实现,diskQueue在文件读写给我们提供了很好的学习示例。

BackendQueue接口

// BackendQueue represents the behavior for the secondary message storage systemtype BackendQueue interface {Put([]byte) error //向文件中写入数据ReadChan() chan []byte //返回一个读的chanClose() error //关闭Delete() error //删除Depth() int64 //获取未读消息的个数Empty() error //清空操作}

diskQueue结构体及字段的解释

    type diskQueue struct {// 64bit atomic vars need to be first for proper alignment on 32bit platforms// run-time state (also persisted to disk)//以下5个字段为元数据readPos int64 //当前读文件的指针偏移量(从文件的某个位置开始读)writePos int64 //当前写文件的指针偏移量readFileNum int64 //当前读的文件编号writeFileNum int64 //当前写的文件编号,用于创建文件名使用,文件编号每新建一个文件会递增1depth int64 //写一条消息加1,读一条消息减1,可以理解成还未读完的消息数量sync.RWMutex// instantiation time metadataname string //名称dataPath string //文件的路径maxBytesPerFile int64 //每个文件的最大字节数minMsgSize int32 //单条消息的最小字节数,默认是MsgIDLength + 8 + 2 = 26maxMsgSize int32 //单条消息的最大字节数,默认是1024 * 1024 + minMsgSizesyncEvery int64 // 定期刷新文件的读写次数阈值(默认2500)syncTimeout time.Duration // 定期刷新文件的时间戳阈值(默认2s)exitFlag int32needSync bool //需要刷新文件// keeps track of the position where we have read// (but not yet sent over readChan)nextReadPos int64 //下次需要读的指针偏移量nextReadFileNum int64 //下次需要读的文件编号readFile *os.File //当前正在读的文件,如果为nil则读取下一个文件编号的文件writeFile *os.File //当前正在写的文件,如果为nil则新建文件reader *bufio.ReaderwriteBuf bytes.Buffer// exposed via ReadChan()readChan chan []byte //通过ReadChan()函数暴露出去// internal channelswriteChan chan []byte //写通道writeResponseChan chan error //写之后返回的结果emptyChan chan int //清空文件的通道emptyResponseChan chan errorexitChan chan intexitSyncChan chan intlogf AppLogFunc //处理日志的函数}

一、diskQueue的创建和初始化

1.初始化diskQueue实例
2.调用retrieveMetaData函数 从磁盘中恢复diskQueue的状态。diskQueue会定时将自己的状态备份到文件中
3.开启一个协程执行ioLoop函数,ioLoop函数是整个disQueue最核心的方法,作用是实现disQueue的消息循环,定时刷新文件,读写操作功能

func New(name string, dataPath string, maxBytesPerFile int64,minMsgSize int32, maxMsgSize int32,syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {d := diskQueue{name: name,dataPath: dataPath,maxBytesPerFile: maxBytesPerFile,minMsgSize: minMsgSize,maxMsgSize: maxMsgSize,readChan: make(chan []byte),writeChan: make(chan []byte),writeResponseChan: make(chan error),emptyChan: make(chan int),emptyResponseChan: make(chan error),exitChan: make(chan int),exitSyncChan: make(chan int),syncEvery: syncEvery,syncTimeout: syncTimeout,logf: logf,}// no need to lock here, nothing else could possibly be touching this instanceerr := d.retrieveMetaData()if err != nil && !os.IsNotExist(err) {d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)}go d.ioLoop()return &d}

     retrieveMetaData函数

      retrieveMetaData函数记录了disQueue的元数据readPos、writePos、readFileNum、writeFileNum、depth

      1.retrieveMetaData()函数用于初始化文件系统,作用是当程序突然中止,文件中的消息未读完,该函数用于初始化读写的文件编号和位置及未读的消息数depth

      2.retrieveMetaData函数从磁盘中恢复diskQueue的状态。diskQueue会定时将自己的状态备份到文件中,

      3.文件名由metaDataFileName函数确定。retrieveMetaData函数同样通过metaDataFileName函数获得保存状态的文件名并打开。

    4.  该文件只有三行,格式为%d\n%d,%d\n%d,%d\n,第一行保存着该diskQueue中未读的消息数量(depth),第二行保存readFileNum和readPos,第三行保存writeFileNum和writePos。

  

func (d *diskQueue) retrieveMetaData() error {var f *os.Filevar err errorfileName := d.metaDataFileName() //获取文件名f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)if err != nil {return err}defer f.Close()var depth int64_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",&depth,&d.readFileNum, &d.readPos,&d.writeFileNum, &d.writePos)if err != nil {return err}atomic.StoreInt64(&d.depth, depth)d.nextReadFileNum = d.readFileNumd.nextReadPos = d.readPosreturn nil}

persistMetaData函数

与retrieveMetaData相对应的是persistMetaData函数,这个函数将运行时的元数据保存到文件用于下次重新构建diskQueue时的恢复。

逻辑基本与retrieveMetaData,此处不再赘述。

func (d *diskQueue) persistMetaData() error {var f *os.Filevar err errorfileName := d.metaDataFileName()tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())// write to tmp filef, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)if err != nil {return err}_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",atomic.LoadInt64(&d.depth),d.readFileNum, d.readPos,d.writeFileNum, d.writePos)if err != nil {f.Close()return err}f.Sync()f.Close()// atomically renamereturn os.Rename(tmpFileName, fileName)}

二、diskQueue的消息循环

    ioLoop函数实现了diskQueue的消息循环功能,即轮询读写、刷新文件等操作。

    刷新文件的目的:防止突然结束程序后内存中的内容未被提交到磁盘,导致内容丢失

  有两种情况下会刷新文件

    1.当count达到syncEvery时,即读写的次数累积到syncEvery。刷新文件后count会被置为0

    2.定时刷新,每syncTimeOut就会刷新文件

 

count的变化规则

     1.如果一次消息循环中,有读或写操作,count会自增1

     2.当count达到syncEvery时,count会置为0,并刷新文件

     3.当收到emptyChan消失时,会将count置为0,因为文件已经被删除了

 

判断文件中有可读的消息

      (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos)  即读的文件编号小于写的文件编号或者读的指针偏移量小于写的指针偏移量

 

读取文件中的下一条消息需要满足两个条件:

    1.文件中有可读的消息

    2. d.nextReadPos = d.readPos 即上次读到的消息已经投递出去,需要读取下条新消息

 

d.nextReadPos 和 d.readPos的区别

     nextReadPos是下次要读取消息的偏移量,读取的消息会赋值给dataRead。

     当消息读取到之后不一定本次循环一定会执行       case r <- dataRead: 将消息投递出去,也可能会执行其他的case分支。此时nextReadPos是下次要读的消息的位置,而readPos是本次消息读的位置

     nextReadPos是下次要读取消息的偏移量,也是消息投递成功后需要读的位置。而readPos当消息投递出去才会等于nextReadPos的值

可以简单理解为:

      消息读取前   nextReadPos = readPos

      消息已读取,但没有投递出去,nextReadPos是下次消息要读的位置,而readPos仍是本次消息读的开始位置。此时:nextReadPos =  readPos + 本条消息的长度

     消息投递成功后: readPos  = nextReadPos   ,将nextReadPos的值赋值给readPos

func (d *diskQueue) ioLoop() {var dataRead []bytevar err errorvar count int64 //读写的累积次数var r chan []bytesyncTicker := time.NewTicker(d.syncTimeout)for {// dont sync all the time :)if count == d.syncEvery {d.needSync = true}if d.needSync {err = d.sync()if err != nil {d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)}count = 0}if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { //消息文件中有可以读的消息if d.nextReadPos == d.readPos { //即上次读到的消息已经投递出去,需要读取下条新消息dataRead, err = d.readOne()if err != nil {d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",d.name, d.readPos, d.fileName(d.readFileNum), err)d.handleReadError()continue}}r = d.readChan} else {r = nil}select {// the Go channel spec dictates that nil channel operations (read or write)// in a select are skipped, we set r to d.readChan only when there is data to read//如果r为空,则这个分支会被跳过。这个特性的使用了select的逻辑,简化了当数据为空时的判断case r <- dataRead:count++// moveForward sets needSync flag if a file is removedd.moveForward()case <-d.emptyChan:d.emptyResponseChan <- d.deleteAllFiles()count = 0case dataWrite := <-d.writeChan:count++d.writeResponseChan <- d.writeOne(dataWrite)case <-syncTicker.C:if count == 0 {// avoid sync when there's no activitycontinue}d.needSync = truecase <-d.exitChan:goto exit}}exit:d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)syncTicker.Stop()d.exitSyncChan <- 1}

三、文件写操作

writeOne函数负责将消息写入到文件中

    1.如果writeFile为nil,则需要新建文件

     2.如果writePos大于0,则设置写的指针偏移量

     3.校验消息长度

     4.将消息的长度(4个字节) 和 消息作为一条消息写入到文件中

     5.改变 writePos 和depth的值

     6.如果写的指针偏移量大于每个文件的最大字节数,将writeFileNum递增,并将writeFile置为nil,用于下次写入消息的时候创建文件          

func (d *diskQueue) writeOne(data []byte) error {var err errorif d.writeFile == nil { //如果writeFile为nilcurFileName := d.fileName(d.writeFileNum) //根据writeFileNum获取文件名称d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) //创建文件if err != nil {return err}d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)if d.writePos > 0 { //如果writePos大于0,则设置写的指针偏移量_, err = d.writeFile.Seek(d.writePos, 0)if err != nil {d.writeFile.Close()d.writeFile = nilreturn err}}}dataLen := int32(len(data)) //本次消息的长度if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { //校验消息长度return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)}d.writeBuf.Reset() //重置writeBuf缓冲区err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) //将数据长度dataLen以二进制大端的形式写入到writeBuf中if err != nil {return err}_, err = d.writeBuf.Write(data) //将data写入到writeBuf缓冲区中if err != nil {return err}// only write to the file once_, err = d.writeFile.Write(d.writeBuf.Bytes()) //将writeBuf缓冲区的数据写入到文件中if err != nil {d.writeFile.Close()d.writeFile = nilreturn err}totalBytes := int64(4 + dataLen)d.writePos += totalBytes //改变writePos(写的偏移量)atomic.AddInt64(&d.depth, 1) //depth的值加1if d.writePos > d.maxBytesPerFile { //如果写的指针偏移量大于每个文件的最大字节数/*将writeFileNum递增1,用于创建下一个文件使用writePos写的指针偏移量置为0刷新文件关闭文件将writeFile置为nil,那么下次有新消息需要写入文件则会新建文件这个操作的目的是为了防止单个文件过大*/d.writeFileNum++d.writePos = 0// sync every time we start writing to a new fileerr = d.sync()if err != nil {d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)}if d.writeFile != nil {d.writeFile.Close()d.writeFile = nil}}return err}

四、文件读操作

    readOne函数主要用于从文件中读取下一条未读消息

   主要流程:

      1.如果readFile为nil,则读取新的文件

      2.根据readPos获取读的位置,并读取4个字节(即本条消息的长度),然后读取本条消息

      3.更新 nextReadPos和nextReadFileNum 的值

       4.如果下次需要读的位置大于每个文件的最大值,则将nextReadFileNum递增,nextReadPos置为0,关闭readFile并置为nil。这样下次就会读取下一个文件的消息

       5.最后返回本条消息的内容

func (d *diskQueue) readOne() ([]byte, error) {var err errorvar msgSize int32if d.readFile == nil { //如果readFile为nil,则读取新的文件/*获取并打开需要读取的文件*/curFileName := d.fileName(d.readFileNum)d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)if err != nil {return nil, err}d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)if d.readPos > 0 {//如果readPos大于0,则设置读的指针偏移量_, err = d.readFile.Seek(d.readPos, 0)if err != nil {d.readFile.Close()d.readFile = nilreturn nil, err}}d.reader = bufio.NewReader(d.readFile) //设置reader}err = binary.Read(d.reader, binary.BigEndian, &msgSize) //从reader中读取4个字节,也就是本条消息的长度if err != nil {d.readFile.Close()d.readFile = nilreturn nil, err}if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { //校验消息长度// this file is corrupt and we have no reasonable guarantee on// where a new message should begind.readFile.Close()d.readFile = nilreturn nil, fmt.Errorf("invalid message read size (%d)", msgSize)}readBuf := make([]byte, msgSize)_, err = io.ReadFull(d.reader, readBuf) //将消息读取到reaBuf中if err != nil {d.readFile.Close()d.readFile = nilreturn nil, err}totalBytes := int64(4 + msgSize)// we only advance next* because we have not yet sent this to consumers// (where readFileNum, readPos will actually be advanced)d.nextReadPos = d.readPos + totalBytes //设置下次需要读的指针偏移量d.nextReadFileNum = d.readFileNum// TODO: each data file should embed the maxBytesPerFile// as the first 8 bytes (at creation time) ensuring that// the value can change without affecting runtimeif d.nextReadPos > d.maxBytesPerFile { //如果下次需要读的位置大于每个文件的最大值/*关闭正在读的文件,并置为nilnextReadFileNum递增1nextReadPos置为0*/if d.readFile != nil {d.readFile.Close()d.readFile = nil}d.nextReadFileNum++d.nextReadPos = 0}return readBuf, nil}

五、文件刷新

sync函数主要用于刷新文件

   1.调用file的Sync()函数,将缓存中的消息持久化到文件中

    2.将元数据更新持久化

func (d *diskQueue) sync() error {if d.writeFile != nil {err := d.writeFile.Sync()if err != nil {d.writeFile.Close()d.writeFile = nilreturn err}}err := d.persistMetaData()if err != nil {return err}d.needSync = falsereturn nil}

以上就是diskQueue文件队列的主要实现流程和方法。

这篇关于NSQ源码分析(三)——disQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/501135

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专