# 消息中间件 RocketMQ 高级功能和源码分析(八)

2024-06-23 04:28

本文主要是介绍# 消息中间件 RocketMQ 高级功能和源码分析(八),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

消息中间件 RocketMQ 高级功能和源码分析(八)

一、消息中间件 RocketMQ 源码分析:实时更新消息消费队列与索引文件流程说明

1、实时更新消息消费队列与索引文件

消息消费队文件、消息属性索引文件都是基于 CommitLog 文件构建的,当消息生产者提交的消息存储在 CommitLog 文件中,ConsumerQueue、IndexFile 需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ 通过开启一个线程 ReputMessageService 来准实时转发 CommitLog 文件更新事件,相应的任务处理器根据转发的消息及时更新 ConsumerQueue、IndexFile 文件。

2、消息存储结构 示例图:

在这里插入图片描述

3、构建消息消费队列和索引文件 示例图:

在这里插入图片描述

4、 代码:DefaultMessageStore:start


//设置CommitLog内存中最大偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动
this.reputMessageService.start();

5、 代码:DefaultMessageStore:run


public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");//每隔1毫秒就继续尝试推送消息到消息消费队列和索引文件while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

6、 代码:DefaultMessageStore:deReput


//从result中循环遍历消息,一次读一条,创建DispatherRequest对象。
for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =                               DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {DefaultMessageStore.this.doDispatch(dispatchRequest);}}
}

7、 DispatchRequest

在这里插入图片描述


String topic; //消息主题名称
int queueId;  //消息队列ID
long commitLogOffset;	//消息物理偏移量
int msgSize;	//消息长度
long tagsCode;	//消息过滤tag hashCode
long storeTimestamp;	//消息存储时间戳
long consumeQueueOffset;	//消息队列偏移量
String keys;	//消息索引key
boolean success;	//是否成功解析到完整的消息
String uniqKey;	//消息唯一键
int sysFlag;	//消息系统标记
long preparedTransactionOffset;	//消息预处理事务偏移量
Map<String, String> propertiesMap;	//消息属性
byte[] bitMap;	//位图

二、消息中间件 RocketMQ 源码分析:转发数据到 ConsumerQueue 文件

1、转发到 ConsumerQueue 消息分发到消息消费队列 示例图:

在这里插入图片描述

2、 代码 CommitLogDispatcherBuildConsumeQueue 类:


class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE://消息分发DefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}
}

3、 代码:DefaultMessageStore#putMessagePositionInfo


public void putMessagePositionInfo(DispatchRequest dispatchRequest) {//获得消费队列ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());//消费队列分发消息cq.putMessagePositionInfoWrapper(dispatchRequest);
}

4、 代码:DefaultMessageStore#putMessagePositionInfo


//依次将消息偏移量、消息长度、tag写入到ByteBuffer中
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
//获得内存映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {//将消息追加到内存映射文件,异步输盘return mappedFile.appendMessage(this.byteBufferIndex.array());
}

三、消息中间件 RocketMQ 源码分析:转发 IndexFile 文件

1、转发到 Index 消息分发到索引文件 示例图:

在这里插入图片描述

2、 代码 CommitLogDispatcherBuildIndex 类:


class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {DefaultMessageStore.this.indexService.buildIndex(request);}}
}

3、 代码:DefaultMessageStore#buildIndex


public void buildIndex(DispatchRequest req) {//获得索引文件IndexFile indexFile = retryGetAndCreateIndexFile();if (indexFile != null) {//获得文件最大物理偏移量long endPhyOffset = indexFile.getEndPhyOffset();DispatchRequest msg = req;String topic = msg.getTopic();String keys = msg.getKeys();//如果该消息的物理偏移量小于索引文件中的最大物理偏移量,则说明是重复数据,忽略本次索引构建if (msg.getCommitLogOffset() < endPhyOffset) {return;}final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:break;case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:return;}//如果消息ID不为空,则添加到Hash索引中if (req.getUniqKey() != null) {indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));if (indexFile == null) {return;}}//构建索引key,RocketMQ支持为同一个消息建立多个索引,多个索引键空格隔开.if (keys != null && keys.length() > 0) {String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);for (int i = 0; i < keyset.length; i++) {String key = keyset[i];if (key.length() > 0) {indexFile = putKey(indexFile, msg, buildKey(topic, key));if (indexFile == null) {return;}}}}} else {log.error("build index error, stop building index");}
}

四、消息中间件 RocketMQ 源码分析:消息队列和索引文件恢复

1、消息队列和索引文件恢复

由于 RocketMQ 存储首先将消息全量存储在 CommitLog 文件中,然后异步生成转发任务更新 ConsumerQueue 和 Index 文件。如果消息成功存储到 CommitLog 文件中,转发任务未成功执行,此时消息服务器 Broker 由于某个愿意宕机,导致CommitLog、ConsumerQueue、IndexFile 文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在 CommitLog 中文件中存在,但由于没有转发到 ConsumerQueue,这部分消息将永远复发被消费者消费。

2、文件恢复总体流程 示例图:

在这里插入图片描述

3、存储文件加载

代码:DefaultMessageStore#load

判断上一次是否异常退出。实现机制是 Broker 在启动时创建 abort 文件,在退出时通过 JVM 钩子函数删除 abort 文件。如果下次启动时存在 abort 文件。说明 Broker 时异常退出的,CommitLog 与 ConsumerQueue 数据有可能不一致,需要进行修复。


//判断临时文件是否存在
boolean lastExitOK = !this.isTempFileExist();
//根据临时文件判断当前Broker是否异常退出
private boolean isTempFileExist() {String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());File file = new File(fileName);return file.exists();
}

4、 代码:DefaultMessageStore#load


//加载延时队列
if (null != scheduleMessageService) {result = result && this.scheduleMessageService.load();
}// 加载CommitLog文件
result = result && this.commitLog.load();// 加载消费队列文件
result = result && this.loadConsumeQueue();if (result) {//加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//加载index文件this.indexService.load(lastExitOK);//根据Broker是否异常退出,执行不同的恢复策略this.recover(lastExitOK);
}

5、 代码:MappedFileQueue#load

加载 CommitLog 到映射文件


//指向CommitLog文件目录
File dir = new File(this.storePath);
//获得文件数组
File[] files = dir.listFiles();
if (files != null) {// 文件排序Arrays.sort(files);//遍历文件for (File file : files) {//如果文件大小和配置文件不一致,退出if (file.length() != this.mappedFileSize) {return false;}try {//创建映射文件MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);//将映射文件添加到队列this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}
}return true;

6、 代码:DefaultMessageStore#loadConsumeQueue

加载消息消费队列


//执行消费队列目录
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
//遍历消费队列目录
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {for (File fileTopic : fileTopicList) {//获得子目录名称,即topic名称String topic = fileTopic.getName();//遍历子目录下的消费队列文件File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {//遍历文件for (File fileQueueId : fileQueueIdList) {//文件名称即队列IDint queueId;try {queueId = Integer.parseInt(fileQueueId.getName());} catch (NumberFormatException e) {continue;}//创建消费队列并加载到内存ConsumeQueue logic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);this.putConsumeQueue(topic, queueId, logic);if (!logic.load()) {return false;}}}}
}log.info("load logics queue all over, OK");return true;

7、 代码:IndexService#load

加载索引文件


public boolean load(final boolean lastExitOK) {//索引文件目录File dir = new File(this.storePath);//遍历索引文件File[] files = dir.listFiles();if (files != null) {//文件排序Arrays.sort(files);//遍历文件for (File file : files) {try {//加载索引文件IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);f.load();if (!lastExitOK) {//索引文件上次的刷盘时间小于该索引文件的消息时间戳,该文件将立即删除if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {f.destroy(0);continue;}}//将索引文件添加到队列log.info("load index file OK, " + f.getFileName());this.indexFileList.add(f);} catch (IOException e) {log.error("load file {} error", file, e);return false;} catch (NumberFormatException e) {log.error("load file {} error", file, e);}}}return true;
}

8、 代码:DefaultMessageStore#recover

文件恢复,根据 Broker 是否正常退出执行不同的恢复策略


private void recover(final boolean lastExitOK) {//获得最大的物理便宜消费队列long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();if (lastExitOK) {//正常恢复this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {//异常恢复this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}//在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量this.recoverTopicQueueTable();
}

9、 代码:DefaultMessageStore#recoverTopicQueueTable

恢复 ConsumerQueue 后,将在 CommitLog 实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列 ID、还存储了消息队列的关键所在。


public void recoverTopicQueueTable() {HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);//CommitLog最小偏移量long minPhyOffset = this.commitLog.getMinOffset();//遍历消费队列,将消费队列保存在CommitLog中for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {String key = logic.getTopic() + "-" + logic.getQueueId();table.put(key, logic.getMaxOffsetInQueue());logic.correctMinOffset(minPhyOffset);}}this.commitLog.setTopicQueueTable(table);
}

五、消息中间件 RocketMQ 源码分析:正常恢复和异常恢复

1、正常恢复

代码:CommitLog#recoverNormally


public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {//Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。int index = mappedFiles.size() - 3;if (index < 0)index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();//代表当前已校验通过的offsetlong mappedFileOffset = 0;while (true) {//查找消息DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);//消息长度int size = dispatchRequest.getMsgSize();//查找结果为true,并且消息长度大于0,表示消息正确.mappedFileOffset向前移动本消息长度if (dispatchRequest.isSuccess() && size > 0) {mappedFileOffset += size;}//如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。else if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenbreak;} else {//取出每个文件mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;}}// 查找结果为false,表明该文件未填满所有消息,跳出循环,结束循环else if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}//更新MappedFileQueue的flushedWhere和committedWhere指针processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);//删除offset之后的所有文件this.mappedFileQueue.truncateDirtyFiles(processOffset);if (maxPhyOffsetOfConsumeQueue >= processOffset) {this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}} else {this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}
}

2、 代码:MappedFileQueue#truncateDirtyFiles


public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();//遍历目录下文件for (MappedFile file : this.mappedFiles) {//文件尾部的偏移量long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//文件尾部的偏移量大于offsetif (fileTailOffset > offset) {//offset大于文件的起始偏移量if (offset >= file.getFileFromOffset()) {//更新wrotePosition、committedPosition、flushedPosistionfile.setWrotePosition((int) (offset % this.mappedFileSize));file.setCommittedPosition((int) (offset % this.mappedFileSize));file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,释放mappedFile占用内存,删除文件file.destroy(1000);willRemoveFiles.add(file);}}}this.deleteExpiredFile(willRemoveFiles);
}

3、异常恢复

Broker 异常停止文件恢复的实现为 CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果 CommitLog 目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。

代码:CommitLog#recoverAbnormally


if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which fileint index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);//判断消息文件是否是一个正确的文件if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}//根据索引取出mappedFile文件if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}//...验证消息的合法性,并将消息转发到消息消费队列和索引文件}else{//未找到mappedFile,重置flushWhere、committedWhere都为0,销毁消息队列文件this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();
}

上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(七)

这篇关于# 消息中间件 RocketMQ 高级功能和源码分析(八)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1086201

相关文章

Go语言实现将中文转化为拼音功能

《Go语言实现将中文转化为拼音功能》这篇文章主要为大家详细介绍了Go语言中如何实现将中文转化为拼音功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 有这么一个需求:新用户入职 创建一系列账号比较麻烦,打算通过接口传入姓名进行初始化。想把姓名转化成拼音。因为有些账号即需要中文也需要英

基于WinForm+Halcon实现图像缩放与交互功能

《基于WinForm+Halcon实现图像缩放与交互功能》本文主要讲述在WinForm中结合Halcon实现图像缩放、平移及实时显示灰度值等交互功能,包括初始化窗口的不同方式,以及通过特定事件添加相应... 目录前言初始化窗口添加图像缩放功能添加图像平移功能添加实时显示灰度值功能示例代码总结最后前言本文将

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

使用Python实现批量访问URL并解析XML响应功能

《使用Python实现批量访问URL并解析XML响应功能》在现代Web开发和数据抓取中,批量访问URL并解析响应内容是一个常见的需求,本文将详细介绍如何使用Python实现批量访问URL并解析XML响... 目录引言1. 背景与需求2. 工具方法实现2.1 单URL访问与解析代码实现代码说明2.2 示例调用

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

最好用的WPF加载动画功能

《最好用的WPF加载动画功能》当开发应用程序时,提供良好的用户体验(UX)是至关重要的,加载动画作为一种有效的沟通工具,它不仅能告知用户系统正在工作,还能够通过视觉上的吸引力来增强整体用户体验,本文给... 目录前言需求分析高级用法综合案例总结最后前言当开发应用程序时,提供良好的用户体验(UX)是至关重要

python实现自动登录12306自动抢票功能

《python实现自动登录12306自动抢票功能》随着互联网技术的发展,越来越多的人选择通过网络平台购票,特别是在中国,12306作为官方火车票预订平台,承担了巨大的访问量,对于热门线路或者节假日出行... 目录一、遇到的问题?二、改进三、进阶–展望总结一、遇到的问题?1.url-正确的表头:就是首先ur

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实