RocketMQ之消息存储管理

2024-03-11 23:18

本文主要是介绍RocketMQ之消息存储管理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

        我们知道RocketMQ的queue都是逻辑上的概念,实际消息都是写入文件来管理的,达到了操作queue的表象,下面就RocketMQ管理消息操作文件的思路做个讲解。RocketMQ主要有6类文件,小文件有3类:checkpoint文件,config目录下的配置文件,abort文件,大文件有3类:Index文件,ConsumeQueue文件,CommitLog文件。Broker操作相关文件业务中最终统一调用DefaultMessageStore,而DefaultMessageStore又管理三类文件的写入,具体体现为IndexService、ConsumeQueue、CommitLog三个类型,形象的图形可参考下面
对于大文件的操作,使用的是NIO的MappedByteBuffer类来提高读写性能。这个类是文件内存映射的相关类,支持随机读和顺序写,在RocketMQ中,被封装成了MappedFile类。 RocketMQ对于大文件的存储会进行多文件存储,到达指定大小会重新建立新文件存储。
        下面我们来了解下RocketMQ是如何模拟出逻辑队列的,CommitLog是用于存储真实的物理消息的结构,ConsumeQueue是逻辑队列,仅仅存储了CommitLog的位移而已,真实的存储都在CommitLog中。CommitLog文件的存储地址: $HOME\store\commitlog\${fileName}。每个文件的大小默认为1G,CommitLog的文件名fileName,名字长度为20位,左边补0,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0, 文件大小为 1G=1073741824; 当这个文件满了,第二个文件名字为 00000000001073741824。
        创建Index文件的目的是能快速定位查询出消息,是为随机查询消息服务的,比如在管理端查询消息等,和consume queue没有直接联系的,与consume queue相比,我们更应该关系consume queue的机制。Index文件的存储位置是:$HOME \store\index\${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W*4+2000W*20= 420000040个字节大小。一个索引文件从整体上可以分为header和其它部分(body)。header主要描述此文件中索引的整体参数等信息。slot存储里面保存的是 Index Linked List的索引。消息对应slotPos=Math.abs(keyHash)%hashSlotNum,消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE,也就是说能很快定位到某一个消息在IndexLinkedList中的位置,形象的图可以如下:
Header字段
beginTimestamp(8)   第一个索引消息保存broker的时间戳
endTimestamp(8)    最后一个索引消息保存broker的时间戳
beginPhyOffset(8)  第一个索引消息在commitLog的偏移地址
endPhyOffset(8)  最后一个索引消息在commitlog的偏移地址
hashSlotCount(4) 从0开始,计数,记录Slot Table使用个数
indexCount(4) 从1开始,计数,记录Index Linked List使用个数
IndexLinkedList字段
keyHash(4)   topic+key的hash值
phyOffset(8) 消息在commitLog中的偏移地址
timeDiff(4)  消息的保存时间戳减去IndexHeader的beginTimestamp
slotValue   上一个相同keyHash的节点在Index Linked List的位置,链表连接指针
RocketMQ在启动时会根据CommitLog文件修改Index文件和ConsumeQueue文件,运行时也会通过独立线程根据CommitLog文件来刷新Index文件和ConsumeQueue文件。
        下面是ConsumeQueue的结构

与ConsumeQueue打交道的是Consumer,这个结构能迅速在CommitLog里将消息取出进行消费。

这篇关于RocketMQ之消息存储管理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/799392

相关文章

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

【Rocketmq入门-基本概念】

Rocketmq入门-基本概念 名词解释名称服务器(NameServer)消息队列(Message Queue)主题(Topic)标签(Tag)生产者(Producer)消费者(Consumer)拉取模式(Pull)推送模式(Push)消息模型(Message Model) 关键组件Broker消息存储工作流程 名词解释 名称服务器(NameServer) 定义: 名称服务器

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

centos7 安装rocketmq4.7.0以及RocketMQ-Console-Ng控制台

一、前置工作 1.1安装jdk8 https://blog.csdn.net/pang_ping/article/details/80570011 1.2安装maven https://www.cnblogs.com/116970u/p/11211963.html 1.3安装git https://blog.csdn.net/xwj1992930/article/details/964

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Android 友盟消息推送集成遇到的问题

友盟消息推送遇到的问题 集成友盟消息推送,步骤根据提供的技术文档接入便可。可是当你集成到项目中去的时候,可能并不是一帆风顺就搞定,因为你项目里面是可能集成了其他的sdk(比如支付宝,微信,七鱼等等三方的sdk)。那么这个时候,再加上友盟的消息推送sdk集成可能就会出现问题。 问题清单 友盟消息推送sdk和支付宝sdk冲突问题 后台配置了消息推送,也显示发送成功,但是手机没有收到消息通知

消息队列的理解和应用场景

知乎上的一个通俗理解的优秀答案 by 祁达方 小红是小明的姐姐。 小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。 后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看。 书架就是一个消息队列,小红是生产者,小明是

RocketMQ 介绍

前言 消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360°的刁难。 作为一个在互联网公司面一次拿一次Offer的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。 于是在一个寂寞难耐的夜晚,我痛定思痛,决定开始写《吊打面试官》系列,希望能帮助各位读者以后面试势如破竹,

操作系统分页式存储管理

每次输入地址后,计算出页号,若页号越界,则给出错误提示。否则依次调用FIFO和LRU算法,这里值得注意的是,由于我们的FIFO算法先于LRU算法被调用,那么当在处理FIFO算法时,我们暂且不将位视图相应位置做变化,留到处理LRU算法再做处理。 对于FIFO、LRU算法的缺页,我们分两种情况考虑,第一种是模拟栈内还有空间,那么直接将其入栈。第二种是模拟栈内无空间,要发生置换。发生置换时把模拟栈最底