# 消息中间件 RocketMQ 高级功能和源码分析(八)

2024-06-23 04:28

本文主要是介绍# 消息中间件 RocketMQ 高级功能和源码分析(八),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

消息中间件 RocketMQ 高级功能和源码分析(八)

一、消息中间件 RocketMQ 源码分析:实时更新消息消费队列与索引文件流程说明

1、实时更新消息消费队列与索引文件

消息消费队文件、消息属性索引文件都是基于 CommitLog 文件构建的,当消息生产者提交的消息存储在 CommitLog 文件中,ConsumerQueue、IndexFile 需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ 通过开启一个线程 ReputMessageService 来准实时转发 CommitLog 文件更新事件,相应的任务处理器根据转发的消息及时更新 ConsumerQueue、IndexFile 文件。

2、消息存储结构 示例图:

在这里插入图片描述

3、构建消息消费队列和索引文件 示例图:

在这里插入图片描述

4、 代码:DefaultMessageStore:start


//设置CommitLog内存中最大偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动
this.reputMessageService.start();

5、 代码:DefaultMessageStore:run


public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");//每隔1毫秒就继续尝试推送消息到消息消费队列和索引文件while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

6、 代码:DefaultMessageStore:deReput


//从result中循环遍历消息,一次读一条,创建DispatherRequest对象。
for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =                               DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {DefaultMessageStore.this.doDispatch(dispatchRequest);}}
}

7、 DispatchRequest

在这里插入图片描述


String topic; //消息主题名称
int queueId;  //消息队列ID
long commitLogOffset;	//消息物理偏移量
int msgSize;	//消息长度
long tagsCode;	//消息过滤tag hashCode
long storeTimestamp;	//消息存储时间戳
long consumeQueueOffset;	//消息队列偏移量
String keys;	//消息索引key
boolean success;	//是否成功解析到完整的消息
String uniqKey;	//消息唯一键
int sysFlag;	//消息系统标记
long preparedTransactionOffset;	//消息预处理事务偏移量
Map<String, String> propertiesMap;	//消息属性
byte[] bitMap;	//位图

二、消息中间件 RocketMQ 源码分析:转发数据到 ConsumerQueue 文件

1、转发到 ConsumerQueue 消息分发到消息消费队列 示例图:

在这里插入图片描述

2、 代码 CommitLogDispatcherBuildConsumeQueue 类:


class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE://消息分发DefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}
}

3、 代码:DefaultMessageStore#putMessagePositionInfo


public void putMessagePositionInfo(DispatchRequest dispatchRequest) {//获得消费队列ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());//消费队列分发消息cq.putMessagePositionInfoWrapper(dispatchRequest);
}

4、 代码:DefaultMessageStore#putMessagePositionInfo


//依次将消息偏移量、消息长度、tag写入到ByteBuffer中
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
//获得内存映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {//将消息追加到内存映射文件,异步输盘return mappedFile.appendMessage(this.byteBufferIndex.array());
}

三、消息中间件 RocketMQ 源码分析:转发 IndexFile 文件

1、转发到 Index 消息分发到索引文件 示例图:

在这里插入图片描述

2、 代码 CommitLogDispatcherBuildIndex 类:


class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {DefaultMessageStore.this.indexService.buildIndex(request);}}
}

3、 代码:DefaultMessageStore#buildIndex


public void buildIndex(DispatchRequest req) {//获得索引文件IndexFile indexFile = retryGetAndCreateIndexFile();if (indexFile != null) {//获得文件最大物理偏移量long endPhyOffset = indexFile.getEndPhyOffset();DispatchRequest msg = req;String topic = msg.getTopic();String keys = msg.getKeys();//如果该消息的物理偏移量小于索引文件中的最大物理偏移量,则说明是重复数据,忽略本次索引构建if (msg.getCommitLogOffset() < endPhyOffset) {return;}final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:break;case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:return;}//如果消息ID不为空,则添加到Hash索引中if (req.getUniqKey() != null) {indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));if (indexFile == null) {return;}}//构建索引key,RocketMQ支持为同一个消息建立多个索引,多个索引键空格隔开.if (keys != null && keys.length() > 0) {String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);for (int i = 0; i < keyset.length; i++) {String key = keyset[i];if (key.length() > 0) {indexFile = putKey(indexFile, msg, buildKey(topic, key));if (indexFile == null) {return;}}}}} else {log.error("build index error, stop building index");}
}

四、消息中间件 RocketMQ 源码分析:消息队列和索引文件恢复

1、消息队列和索引文件恢复

由于 RocketMQ 存储首先将消息全量存储在 CommitLog 文件中,然后异步生成转发任务更新 ConsumerQueue 和 Index 文件。如果消息成功存储到 CommitLog 文件中,转发任务未成功执行,此时消息服务器 Broker 由于某个愿意宕机,导致CommitLog、ConsumerQueue、IndexFile 文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在 CommitLog 中文件中存在,但由于没有转发到 ConsumerQueue,这部分消息将永远复发被消费者消费。

2、文件恢复总体流程 示例图:

在这里插入图片描述

3、存储文件加载

代码:DefaultMessageStore#load

判断上一次是否异常退出。实现机制是 Broker 在启动时创建 abort 文件,在退出时通过 JVM 钩子函数删除 abort 文件。如果下次启动时存在 abort 文件。说明 Broker 时异常退出的,CommitLog 与 ConsumerQueue 数据有可能不一致,需要进行修复。


//判断临时文件是否存在
boolean lastExitOK = !this.isTempFileExist();
//根据临时文件判断当前Broker是否异常退出
private boolean isTempFileExist() {String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());File file = new File(fileName);return file.exists();
}

4、 代码:DefaultMessageStore#load


//加载延时队列
if (null != scheduleMessageService) {result = result && this.scheduleMessageService.load();
}// 加载CommitLog文件
result = result && this.commitLog.load();// 加载消费队列文件
result = result && this.loadConsumeQueue();if (result) {//加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//加载index文件this.indexService.load(lastExitOK);//根据Broker是否异常退出,执行不同的恢复策略this.recover(lastExitOK);
}

5、 代码:MappedFileQueue#load

加载 CommitLog 到映射文件


//指向CommitLog文件目录
File dir = new File(this.storePath);
//获得文件数组
File[] files = dir.listFiles();
if (files != null) {// 文件排序Arrays.sort(files);//遍历文件for (File file : files) {//如果文件大小和配置文件不一致,退出if (file.length() != this.mappedFileSize) {return false;}try {//创建映射文件MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);//将映射文件添加到队列this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}
}return true;

6、 代码:DefaultMessageStore#loadConsumeQueue

加载消息消费队列


//执行消费队列目录
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
//遍历消费队列目录
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {for (File fileTopic : fileTopicList) {//获得子目录名称,即topic名称String topic = fileTopic.getName();//遍历子目录下的消费队列文件File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {//遍历文件for (File fileQueueId : fileQueueIdList) {//文件名称即队列IDint queueId;try {queueId = Integer.parseInt(fileQueueId.getName());} catch (NumberFormatException e) {continue;}//创建消费队列并加载到内存ConsumeQueue logic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);this.putConsumeQueue(topic, queueId, logic);if (!logic.load()) {return false;}}}}
}log.info("load logics queue all over, OK");return true;

7、 代码:IndexService#load

加载索引文件


public boolean load(final boolean lastExitOK) {//索引文件目录File dir = new File(this.storePath);//遍历索引文件File[] files = dir.listFiles();if (files != null) {//文件排序Arrays.sort(files);//遍历文件for (File file : files) {try {//加载索引文件IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);f.load();if (!lastExitOK) {//索引文件上次的刷盘时间小于该索引文件的消息时间戳,该文件将立即删除if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {f.destroy(0);continue;}}//将索引文件添加到队列log.info("load index file OK, " + f.getFileName());this.indexFileList.add(f);} catch (IOException e) {log.error("load file {} error", file, e);return false;} catch (NumberFormatException e) {log.error("load file {} error", file, e);}}}return true;
}

8、 代码:DefaultMessageStore#recover

文件恢复,根据 Broker 是否正常退出执行不同的恢复策略


private void recover(final boolean lastExitOK) {//获得最大的物理便宜消费队列long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();if (lastExitOK) {//正常恢复this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {//异常恢复this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}//在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量this.recoverTopicQueueTable();
}

9、 代码:DefaultMessageStore#recoverTopicQueueTable

恢复 ConsumerQueue 后,将在 CommitLog 实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列 ID、还存储了消息队列的关键所在。


public void recoverTopicQueueTable() {HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);//CommitLog最小偏移量long minPhyOffset = this.commitLog.getMinOffset();//遍历消费队列,将消费队列保存在CommitLog中for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {String key = logic.getTopic() + "-" + logic.getQueueId();table.put(key, logic.getMaxOffsetInQueue());logic.correctMinOffset(minPhyOffset);}}this.commitLog.setTopicQueueTable(table);
}

五、消息中间件 RocketMQ 源码分析:正常恢复和异常恢复

1、正常恢复

代码:CommitLog#recoverNormally


public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {//Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。int index = mappedFiles.size() - 3;if (index < 0)index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();//代表当前已校验通过的offsetlong mappedFileOffset = 0;while (true) {//查找消息DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);//消息长度int size = dispatchRequest.getMsgSize();//查找结果为true,并且消息长度大于0,表示消息正确.mappedFileOffset向前移动本消息长度if (dispatchRequest.isSuccess() && size > 0) {mappedFileOffset += size;}//如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。else if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenbreak;} else {//取出每个文件mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;}}// 查找结果为false,表明该文件未填满所有消息,跳出循环,结束循环else if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}//更新MappedFileQueue的flushedWhere和committedWhere指针processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);//删除offset之后的所有文件this.mappedFileQueue.truncateDirtyFiles(processOffset);if (maxPhyOffsetOfConsumeQueue >= processOffset) {this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}} else {this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}
}

2、 代码:MappedFileQueue#truncateDirtyFiles


public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();//遍历目录下文件for (MappedFile file : this.mappedFiles) {//文件尾部的偏移量long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//文件尾部的偏移量大于offsetif (fileTailOffset > offset) {//offset大于文件的起始偏移量if (offset >= file.getFileFromOffset()) {//更新wrotePosition、committedPosition、flushedPosistionfile.setWrotePosition((int) (offset % this.mappedFileSize));file.setCommittedPosition((int) (offset % this.mappedFileSize));file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,释放mappedFile占用内存,删除文件file.destroy(1000);willRemoveFiles.add(file);}}}this.deleteExpiredFile(willRemoveFiles);
}

3、异常恢复

Broker 异常停止文件恢复的实现为 CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果 CommitLog 目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。

代码:CommitLog#recoverAbnormally


if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which fileint index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);//判断消息文件是否是一个正确的文件if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}//根据索引取出mappedFile文件if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}//...验证消息的合法性,并将消息转发到消息消费队列和索引文件}else{//未找到mappedFile,重置flushWhere、committedWhere都为0,销毁消息队列文件this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();
}

上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(七)

这篇关于# 消息中间件 RocketMQ 高级功能和源码分析(八)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1086201

相关文章

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

C++11第三弹:lambda表达式 | 新的类功能 | 模板的可变参数

🌈个人主页: 南桥几晴秋 🌈C++专栏: 南桥谈C++ 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据库学习专栏: 南桥谈MySQL 🌈Qt学习专栏: 南桥谈Qt 🌈菜鸡代码练习: 练习随想记录 🌈git学习: 南桥谈Git 🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈�

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57