本文主要是介绍RocketMQ之消息存储管理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
我们知道RocketMQ的queue都是逻辑上的概念,实际消息都是写入文件来管理的,达到了操作queue的表象,下面就RocketMQ管理消息操作文件的思路做个讲解。RocketMQ主要有6类文件,小文件有3类:checkpoint文件,config目录下的配置文件,abort文件,大文件有3类:Index文件,ConsumeQueue文件,CommitLog文件。Broker操作相关文件业务中最终统一调用DefaultMessageStore,而DefaultMessageStore又管理三类文件的写入,具体体现为IndexService、ConsumeQueue、CommitLog三个类型,形象的图形可参考下面
对于大文件的操作,使用的是NIO的MappedByteBuffer类来提高读写性能。这个类是文件内存映射的相关类,支持随机读和顺序写,在RocketMQ中,被封装成了MappedFile类。 RocketMQ对于大文件的存储会进行多文件存储,到达指定大小会重新建立新文件存储。
下面我们来了解下RocketMQ是如何模拟出逻辑队列的,CommitLog是用于存储真实的物理消息的结构,ConsumeQueue是逻辑队列,仅仅存储了CommitLog的位移而已,真实的存储都在CommitLog中。CommitLog文件的存储地址: $HOME\store\commitlog\${fileName}。每个文件的大小默认为1G,CommitLog的文件名fileName,名字长度为20位,左边补0,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0, 文件大小为 1G=1073741824; 当这个文件满了,第二个文件名字为 00000000001073741824。
创建Index文件的目的是能快速定位查询出消息,是为随机查询消息服务的,比如在管理端查询消息等,和consume queue没有直接联系的,与consume queue相比,我们更应该关系consume queue的机制。Index文件的存储位置是:$HOME \store\index\${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W*4+2000W*20= 420000040个字节大小。一个索引文件从整体上可以分为header和其它部分(body)。header主要描述此文件中索引的整体参数等信息。slot存储里面保存的是 Index Linked List的索引。消息对应slotPos=Math.abs(keyHash)%hashSlotNum,消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE,也就是说能很快定位到某一个消息在IndexLinkedList中的位置,形象的图可以如下:
Header字段
beginTimestamp(8) 第一个索引消息保存broker的时间戳
endTimestamp(8) 最后一个索引消息保存broker的时间戳
beginPhyOffset(8) 第一个索引消息在commitLog的偏移地址
endPhyOffset(8) 最后一个索引消息在commitlog的偏移地址
hashSlotCount(4) 从0开始,计数,记录Slot Table使用个数
indexCount(4) 从1开始,计数,记录Index Linked List使用个数
IndexLinkedList字段
keyHash(4) topic+key的hash值
phyOffset(8) 消息在commitLog中的偏移地址
timeDiff(4) 消息的保存时间戳减去IndexHeader的beginTimestamp
slotValue 上一个相同keyHash的节点在Index Linked List的位置,链表连接指针
RocketMQ在启动时会根据CommitLog文件修改Index文件和ConsumeQueue文件,运行时也会通过独立线程根据CommitLog文件来刷新Index文件和ConsumeQueue文件。
下面是ConsumeQueue的结构
与ConsumeQueue打交道的是Consumer,这个结构能迅速在CommitLog里将消息取出进行消费。
这篇关于RocketMQ之消息存储管理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!