全网最细RocketMQ源码四:消息存储

2024-01-16 02:36

本文主要是介绍全网最细RocketMQ源码四:消息存储,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习

入口

SendMessageProcessor.processRequest
在这里插入图片描述
在这里插入图片描述

  private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));CompletableFuture<PutMessageResult> putMessageResult = null;Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 使用defaultMessageStore.aysncPutMessage存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦

MappedFile

public class MappedFile extends ReferenceResource {// 内存页大小:4kpublic static final int OS_PAGE_SIZE = 1024 * 4;protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// 当前进程下 所有的 mappedFile占用的总虚拟内存大小private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);// 当前进程下 所有的 mappedFile个数private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);// 当前mappedFile数据写入点protected final AtomicInteger wrotePosition = new AtomicInteger(0);protected final AtomicInteger committedPosition = new AtomicInteger(0);// 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)private final AtomicInteger flushedPosition = new AtomicInteger(0);// 文件大小protected int fileSize;// 文件通道protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/protected ByteBuffer writeBuffer = null;protected TransientStorePool transientStorePool = null;// 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量   索引文件: 年月日小时分钟秒.. )private String fileName;// 文件名转longprivate long fileFromOffset;// 文件对象private File file;// 内存映射缓冲区,访问虚拟内存private MappedByteBuffer mappedByteBuffer;// 该文件下 保存的第一条 msg 的存储时间private volatile long storeTimestamp = 0;// 当前文件如果是 目录内 有效文件的 首文件的话,该值为trueprivate boolean firstCreateInQueue = false;
  • 构造方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage(byte[] data)
    在这里插入图片描述

  • flush
    在这里插入图片描述

MappedFileQueue

public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);private static final int DELETE_FILES_BATCH_MAX = 10;// mfq 管理的目录(CommitLog: ../store/commitlog  或者  consumeQueue: ../store/xxx_topic/0)private final String storePath;// 目录下每个文件大小(commitLog文件 默认 1g     consumeQueue 文件 默认 600w字节)private final int mappedFileSize;// list,目录下的每个 mappedFile 都加入该listprivate final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。private final AllocateMappedFileService allocateMappedFileService;// 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)private long flushedWhere = 0;private long committedWhere = 0;// 当前目录下最后一条msg存储时间private volatile long storeTimestamp = 0;
  • load方法
    在这里插入图片描述

  • getLastMappedFile

 /*** 参数1:startOffset ,文件起始偏移量* 参数2:needCreate ,当 list 为空时,是否创建mappedFile*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {// 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾// 两种情况会创建:// 1. list内没有mappedFile// 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {// 情况1  list内没有mappedFile// createOffset 取值 必须是 mappedFileSize 的倍数 或者 0createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2  list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..// 上一个文件名 转long + mappedFileSizecreateOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。// 获取待创建文件的 绝对路径(下次即将要创建的文件名)String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);// 获取 下下次 要创建的文件的 绝对路径String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,// 内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。// 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}
  • flush
 /*** @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)* @return boolean true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘*/public boolean flush(final int flushLeastPages) {boolean result = true;// 获取当前正在刷盘的文件 (正在顺序写的mappedFile)MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {// 获取mappedFile 最后一条消息的存储时间long tmpTimeStamp = mappedFile.getStoreTimestamp();// 调用mf 刷盘 ,返回mf的最新的落盘位点int offset = mappedFile.flush(flushLeastPages);// mf起始偏移量 + mf最新的落盘位点long where = mappedFile.getFileFromOffset() + offset;// true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘result = where == this.flushedWhere;// 将最新的目录刷盘位点 赋值给 flushedWherethis.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result;}

CommitLog

ConsumeQueue

DefaultMessageStore

这篇关于全网最细RocketMQ源码四:消息存储的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/611042

相关文章

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Oracle存储过程里操作BLOB的字节数据的办法

《Oracle存储过程里操作BLOB的字节数据的办法》该篇文章介绍了如何在Oracle存储过程中操作BLOB的字节数据,作者研究了如何获取BLOB的字节长度、如何使用DBMS_LOB包进行BLOB操作... 目录一、缘由二、办法2.1 基本操作2.2 DBMS_LOB包2.3 字节级操作与RAW数据类型2.

Java实现数据库图片上传与存储功能

《Java实现数据库图片上传与存储功能》在现代的Web开发中,上传图片并将其存储在数据库中是常见的需求之一,本文将介绍如何通过Java实现图片上传,存储到数据库的完整过程,希望对大家有所帮助... 目录1. 项目结构2. 数据库表设计3. 实现图片上传功能3.1 文件上传控制器3.2 图片上传服务4. 实现

Spring 中 BeanFactoryPostProcessor 的作用和示例源码分析

《Spring中BeanFactoryPostProcessor的作用和示例源码分析》Spring的BeanFactoryPostProcessor是容器初始化的扩展接口,允许在Bean实例化前... 目录一、概览1. 核心定位2. 核心功能详解3. 关键特性二、Spring 内置的 BeanFactory

C语言中的浮点数存储详解

《C语言中的浮点数存储详解》:本文主要介绍C语言中的浮点数存储详解,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、首先明确一个概念2、接下来,讲解C语言中浮点型数存储的规则2.1、可以将上述公式分为两部分来看2.2、问:十进制小数0.5该如何存储?2.3 浮点