本文主要是介绍NSQ源码分析(三)——disQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
disQueue是Topic以及Channel中的内存消息满了以后,消息存放的实现方法,diskQueue实现了文件系统支持的FIFO队列,disQueue也是BackendQueue接口的实现,diskQueue在文件读写给我们提供了很好的学习示例。
BackendQueue接口
// BackendQueue represents the behavior for the secondary message storage systemtype BackendQueue interface {Put([]byte) error //向文件中写入数据ReadChan() chan []byte //返回一个读的chanClose() error //关闭Delete() error //删除Depth() int64 //获取未读消息的个数Empty() error //清空操作}
diskQueue结构体及字段的解释
type diskQueue struct {// 64bit atomic vars need to be first for proper alignment on 32bit platforms// run-time state (also persisted to disk)//以下5个字段为元数据readPos int64 //当前读文件的指针偏移量(从文件的某个位置开始读)writePos int64 //当前写文件的指针偏移量readFileNum int64 //当前读的文件编号writeFileNum int64 //当前写的文件编号,用于创建文件名使用,文件编号每新建一个文件会递增1depth int64 //写一条消息加1,读一条消息减1,可以理解成还未读完的消息数量sync.RWMutex// instantiation time metadataname string //名称dataPath string //文件的路径maxBytesPerFile int64 //每个文件的最大字节数minMsgSize int32 //单条消息的最小字节数,默认是MsgIDLength + 8 + 2 = 26maxMsgSize int32 //单条消息的最大字节数,默认是1024 * 1024 + minMsgSizesyncEvery int64 // 定期刷新文件的读写次数阈值(默认2500)syncTimeout time.Duration // 定期刷新文件的时间戳阈值(默认2s)exitFlag int32needSync bool //需要刷新文件// keeps track of the position where we have read// (but not yet sent over readChan)nextReadPos int64 //下次需要读的指针偏移量nextReadFileNum int64 //下次需要读的文件编号readFile *os.File //当前正在读的文件,如果为nil则读取下一个文件编号的文件writeFile *os.File //当前正在写的文件,如果为nil则新建文件reader *bufio.ReaderwriteBuf bytes.Buffer// exposed via ReadChan()readChan chan []byte //通过ReadChan()函数暴露出去// internal channelswriteChan chan []byte //写通道writeResponseChan chan error //写之后返回的结果emptyChan chan int //清空文件的通道emptyResponseChan chan errorexitChan chan intexitSyncChan chan intlogf AppLogFunc //处理日志的函数}
一、diskQueue的创建和初始化
1.初始化diskQueue实例
2.调用retrieveMetaData函数 从磁盘中恢复diskQueue的状态。diskQueue会定时将自己的状态备份到文件中
3.开启一个协程执行ioLoop函数,ioLoop函数是整个disQueue最核心的方法,作用是实现disQueue的消息循环,定时刷新文件,读写操作功能
func New(name string, dataPath string, maxBytesPerFile int64,minMsgSize int32, maxMsgSize int32,syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {d := diskQueue{name: name,dataPath: dataPath,maxBytesPerFile: maxBytesPerFile,minMsgSize: minMsgSize,maxMsgSize: maxMsgSize,readChan: make(chan []byte),writeChan: make(chan []byte),writeResponseChan: make(chan error),emptyChan: make(chan int),emptyResponseChan: make(chan error),exitChan: make(chan int),exitSyncChan: make(chan int),syncEvery: syncEvery,syncTimeout: syncTimeout,logf: logf,}// no need to lock here, nothing else could possibly be touching this instanceerr := d.retrieveMetaData()if err != nil && !os.IsNotExist(err) {d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)}go d.ioLoop()return &d}
retrieveMetaData函数
retrieveMetaData函数记录了disQueue的元数据readPos、writePos、readFileNum、writeFileNum、depth
1.retrieveMetaData()函数用于初始化文件系统,作用是当程序突然中止,文件中的消息未读完,该函数用于初始化读写的文件编号和位置及未读的消息数depth
2.retrieveMetaData函数从磁盘中恢复diskQueue的状态。diskQueue会定时将自己的状态备份到文件中,
3.文件名由metaDataFileName函数确定。retrieveMetaData函数同样通过metaDataFileName函数获得保存状态的文件名并打开。
4. 该文件只有三行,格式为%d\n%d,%d\n%d,%d\n,第一行保存着该diskQueue中未读的消息数量(depth),第二行保存readFileNum和readPos,第三行保存writeFileNum和writePos。
func (d *diskQueue) retrieveMetaData() error {var f *os.Filevar err errorfileName := d.metaDataFileName() //获取文件名f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)if err != nil {return err}defer f.Close()var depth int64_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",&depth,&d.readFileNum, &d.readPos,&d.writeFileNum, &d.writePos)if err != nil {return err}atomic.StoreInt64(&d.depth, depth)d.nextReadFileNum = d.readFileNumd.nextReadPos = d.readPosreturn nil}
persistMetaData函数
与retrieveMetaData相对应的是persistMetaData函数,这个函数将运行时的元数据保存到文件用于下次重新构建diskQueue时的恢复。
逻辑基本与retrieveMetaData,此处不再赘述。
func (d *diskQueue) persistMetaData() error {var f *os.Filevar err errorfileName := d.metaDataFileName()tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())// write to tmp filef, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)if err != nil {return err}_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",atomic.LoadInt64(&d.depth),d.readFileNum, d.readPos,d.writeFileNum, d.writePos)if err != nil {f.Close()return err}f.Sync()f.Close()// atomically renamereturn os.Rename(tmpFileName, fileName)}
二、diskQueue的消息循环
ioLoop函数实现了diskQueue的消息循环功能,即轮询读写、刷新文件等操作。
刷新文件的目的:防止突然结束程序后内存中的内容未被提交到磁盘,导致内容丢失
有两种情况下会刷新文件
1.当count达到syncEvery时,即读写的次数累积到syncEvery。刷新文件后count会被置为0
2.定时刷新,每syncTimeOut就会刷新文件
count的变化规则
1.如果一次消息循环中,有读或写操作,count会自增1
2.当count达到syncEvery时,count会置为0,并刷新文件
3.当收到emptyChan消失时,会将count置为0,因为文件已经被删除了
判断文件中有可读的消息
(d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) 即读的文件编号小于写的文件编号或者读的指针偏移量小于写的指针偏移量
读取文件中的下一条消息需要满足两个条件:
1.文件中有可读的消息
2. d.nextReadPos = d.readPos 即上次读到的消息已经投递出去,需要读取下条新消息
d.nextReadPos 和 d.readPos的区别
nextReadPos是下次要读取消息的偏移量,读取的消息会赋值给dataRead。
当消息读取到之后不一定本次循环一定会执行 case r <- dataRead: 将消息投递出去,也可能会执行其他的case分支。此时nextReadPos是下次要读的消息的位置,而readPos是本次消息读的位置
nextReadPos是下次要读取消息的偏移量,也是消息投递成功后需要读的位置。而readPos当消息投递出去才会等于nextReadPos的值
可以简单理解为:
消息读取前 nextReadPos = readPos
消息已读取,但没有投递出去,nextReadPos是下次消息要读的位置,而readPos仍是本次消息读的开始位置。此时:nextReadPos = readPos + 本条消息的长度
消息投递成功后: readPos = nextReadPos ,将nextReadPos的值赋值给readPos
func (d *diskQueue) ioLoop() {var dataRead []bytevar err errorvar count int64 //读写的累积次数var r chan []bytesyncTicker := time.NewTicker(d.syncTimeout)for {// dont sync all the time :)if count == d.syncEvery {d.needSync = true}if d.needSync {err = d.sync()if err != nil {d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)}count = 0}if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { //消息文件中有可以读的消息if d.nextReadPos == d.readPos { //即上次读到的消息已经投递出去,需要读取下条新消息dataRead, err = d.readOne()if err != nil {d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",d.name, d.readPos, d.fileName(d.readFileNum), err)d.handleReadError()continue}}r = d.readChan} else {r = nil}select {// the Go channel spec dictates that nil channel operations (read or write)// in a select are skipped, we set r to d.readChan only when there is data to read//如果r为空,则这个分支会被跳过。这个特性的使用了select的逻辑,简化了当数据为空时的判断case r <- dataRead:count++// moveForward sets needSync flag if a file is removedd.moveForward()case <-d.emptyChan:d.emptyResponseChan <- d.deleteAllFiles()count = 0case dataWrite := <-d.writeChan:count++d.writeResponseChan <- d.writeOne(dataWrite)case <-syncTicker.C:if count == 0 {// avoid sync when there's no activitycontinue}d.needSync = truecase <-d.exitChan:goto exit}}exit:d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)syncTicker.Stop()d.exitSyncChan <- 1}
三、文件写操作
writeOne函数负责将消息写入到文件中
1.如果writeFile为nil,则需要新建文件
2.如果writePos大于0,则设置写的指针偏移量
3.校验消息长度
4.将消息的长度(4个字节) 和 消息作为一条消息写入到文件中
5.改变 writePos 和depth的值
6.如果写的指针偏移量大于每个文件的最大字节数,将writeFileNum递增,并将writeFile置为nil,用于下次写入消息的时候创建文件
func (d *diskQueue) writeOne(data []byte) error {var err errorif d.writeFile == nil { //如果writeFile为nilcurFileName := d.fileName(d.writeFileNum) //根据writeFileNum获取文件名称d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) //创建文件if err != nil {return err}d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)if d.writePos > 0 { //如果writePos大于0,则设置写的指针偏移量_, err = d.writeFile.Seek(d.writePos, 0)if err != nil {d.writeFile.Close()d.writeFile = nilreturn err}}}dataLen := int32(len(data)) //本次消息的长度if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { //校验消息长度return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)}d.writeBuf.Reset() //重置writeBuf缓冲区err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) //将数据长度dataLen以二进制大端的形式写入到writeBuf中if err != nil {return err}_, err = d.writeBuf.Write(data) //将data写入到writeBuf缓冲区中if err != nil {return err}// only write to the file once_, err = d.writeFile.Write(d.writeBuf.Bytes()) //将writeBuf缓冲区的数据写入到文件中if err != nil {d.writeFile.Close()d.writeFile = nilreturn err}totalBytes := int64(4 + dataLen)d.writePos += totalBytes //改变writePos(写的偏移量)atomic.AddInt64(&d.depth, 1) //depth的值加1if d.writePos > d.maxBytesPerFile { //如果写的指针偏移量大于每个文件的最大字节数/*将writeFileNum递增1,用于创建下一个文件使用writePos写的指针偏移量置为0刷新文件关闭文件将writeFile置为nil,那么下次有新消息需要写入文件则会新建文件这个操作的目的是为了防止单个文件过大*/d.writeFileNum++d.writePos = 0// sync every time we start writing to a new fileerr = d.sync()if err != nil {d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)}if d.writeFile != nil {d.writeFile.Close()d.writeFile = nil}}return err}
四、文件读操作
readOne函数主要用于从文件中读取下一条未读消息
主要流程:
1.如果readFile为nil,则读取新的文件
2.根据readPos获取读的位置,并读取4个字节(即本条消息的长度),然后读取本条消息
3.更新 nextReadPos和nextReadFileNum 的值
4.如果下次需要读的位置大于每个文件的最大值,则将nextReadFileNum递增,nextReadPos置为0,关闭readFile并置为nil。这样下次就会读取下一个文件的消息
5.最后返回本条消息的内容
func (d *diskQueue) readOne() ([]byte, error) {var err errorvar msgSize int32if d.readFile == nil { //如果readFile为nil,则读取新的文件/*获取并打开需要读取的文件*/curFileName := d.fileName(d.readFileNum)d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)if err != nil {return nil, err}d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)if d.readPos > 0 {//如果readPos大于0,则设置读的指针偏移量_, err = d.readFile.Seek(d.readPos, 0)if err != nil {d.readFile.Close()d.readFile = nilreturn nil, err}}d.reader = bufio.NewReader(d.readFile) //设置reader}err = binary.Read(d.reader, binary.BigEndian, &msgSize) //从reader中读取4个字节,也就是本条消息的长度if err != nil {d.readFile.Close()d.readFile = nilreturn nil, err}if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { //校验消息长度// this file is corrupt and we have no reasonable guarantee on// where a new message should begind.readFile.Close()d.readFile = nilreturn nil, fmt.Errorf("invalid message read size (%d)", msgSize)}readBuf := make([]byte, msgSize)_, err = io.ReadFull(d.reader, readBuf) //将消息读取到reaBuf中if err != nil {d.readFile.Close()d.readFile = nilreturn nil, err}totalBytes := int64(4 + msgSize)// we only advance next* because we have not yet sent this to consumers// (where readFileNum, readPos will actually be advanced)d.nextReadPos = d.readPos + totalBytes //设置下次需要读的指针偏移量d.nextReadFileNum = d.readFileNum// TODO: each data file should embed the maxBytesPerFile// as the first 8 bytes (at creation time) ensuring that// the value can change without affecting runtimeif d.nextReadPos > d.maxBytesPerFile { //如果下次需要读的位置大于每个文件的最大值/*关闭正在读的文件,并置为nilnextReadFileNum递增1nextReadPos置为0*/if d.readFile != nil {d.readFile.Close()d.readFile = nil}d.nextReadFileNum++d.nextReadPos = 0}return readBuf, nil}
五、文件刷新
sync函数主要用于刷新文件
1.调用file的Sync()函数,将缓存中的消息持久化到文件中
2.将元数据更新持久化
func (d *diskQueue) sync() error {if d.writeFile != nil {err := d.writeFile.Sync()if err != nil {d.writeFile.Close()d.writeFile = nilreturn err}}err := d.persistMetaData()if err != nil {return err}d.needSync = falsereturn nil}
以上就是diskQueue文件队列的主要实现流程和方法。
这篇关于NSQ源码分析(三)——disQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!