第八章-Broker-加载注册

2024-03-29 16:52
文章标签 加载 注册 第八章 broker

本文主要是介绍第八章-Broker-加载注册,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在第五章的5.2节,Broker初始化时,首先会调用4个load加载动作,都加载的是什么呢?

分别是加载topic.json、consumerOffset.json、subscriptionGroup.json、consumerFilter.json这4个文件的内容并做解析,最终存放至本地缓存中,这些文件的存放路径,见6.3 BrokerPathConfigHelper类,
下面主要介绍下这几个文件内部存放的是什么内容,做什么用:
1.topic.json:存储的是该broker支持的所有topic信息,对应类TopicConfigSerializeWrapper,每个topic对应一 个TopicConfig,内部包含topic支持读写队列数、topicName、读写权限(perm)、是否顺序

​ 2.consumerOffset.json:存储的是每个消费者组下的topic的每个cp的消费偏移,对应类 ConsumerOffsetManager,内部维护一个ConcurrentMap<String, ConcurrentMap<Integer, Long>>
​ key=topic@group({topicName}@{consumerGroup})
​ value=每个queueId对应的偏移

​ 3.subscriptionGroup.json:存储的是订阅组信息,对应SubscriptionGroupManager类,内部维护一个ConcurrentMap<String, SubscriptionGroupConfig>
​ key=group(消费者组)
​ value=SubscriptionGroupConfig

4.consumerFilter.json:存储的是消费者过滤数据信息,对应ConsumerFilterManager类,内部维护一个ConcurrentMap<String, FilterDataMapByTopic>,其实就是对应消费者设置的tags
key=topic
value=FilterDataMapByTopic

接下来,就分别对这4个文件的加载做源码解析,入口代码都在 BrokerController.initialize 中。

8.1 topic配置加载

ConfigManager.load

public boolean load() {String fileName = null;try {// topic.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 TopicConfigManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图7-1`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 topics.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 TopicConfigManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

TopicConfigManager.decode

public void decode(String jsonString) {if (jsonString != null) { // 还是判空// 将 json 字符串转成对象TopicConfigSerializeWrapper topicConfigSerializeWrapper =TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);if (topicConfigSerializeWrapper != null) {// topicConfigTable 是一个 Map,将文件中的内存存到map中this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());// 数据版本赋值this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());// 日志打印this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);}}
}

8.2 消费偏移加载

ConfigManager.load

public boolean load() {String fileName = null;try {// consumerOffset.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 ConsumerOffsetManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图8-1`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 consumerOffset.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 ConsumerOffsetManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

ConsumerOffsetManager.decode

public void decode(String jsonString) {if (jsonString != null) {// json 字符转换成对象ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);if (obj != null) {// 赋值本地缓存 mapthis.offsetTable = obj.offsetTable;}}
}

图8-1

在这里插入图片描述

8.3 订阅组加载

ConfigManager.load

public boolean load() {String fileName = null;try {// subscriptionGroup.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 SubscriptionGroupManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图8-2`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 subscriptionGroup.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 SubscriptionGroupManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

SubscriptionGroupManager.decode

public void decode(String jsonString) {if (jsonString != null) {// json 字符转换成对象SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {// subscriptionGroupTable 是一个 Map,将文件中的内存存到map中this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);// 数据版本赋值this.dataVersion.assignNewOne(obj.dataVersion);// 日志打印this.printLoadDataWhenFirstBoot(obj);}}
}

图8-2

在这里插入图片描述

8.4 消费者布容过滤器加载

ConfigManager.load

public boolean load() {String fileName = null;try {// consumerFilter.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 ConsumerFilterManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 consumerFilter.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 ConsumerFilterManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

ConsumerFilterManager.decode

这个方法内部主要还是将 json 转换成对象,然后再转成本地内存,至于细节,具体使用时再细讲。

public void decode(final String jsonString) {ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);if (load != null && load.filterDataByTopic != null) {boolean bloomChanged = false;for (String topic : load.filterDataByTopic.keySet()) {FilterDataMapByTopic dataMapByTopic = load.filterDataByTopic.get(topic);if (dataMapByTopic == null) {continue;}for (String group : dataMapByTopic.getGroupFilterData().keySet()) {ConsumerFilterData filterData = dataMapByTopic.getGroupFilterData().get(group);if (filterData == null) {continue;}try {filterData.setCompiledExpression(FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression()));} catch (Exception e) {log.error("load filter data error, " + filterData, e);}// check whether bloom filter is changed// if changed, ignore the bit map calculated before.if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {bloomChanged = true;log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData());break;}log.info("load exist consumer filter data: {}", filterData);if (filterData.getDeadTime() == 0) {// we think all consumers are dead when loadlong deadTime = System.currentTimeMillis() - 30 * 1000;filterData.setDeadTime(deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime);}}}if (!bloomChanged) {this.filterDataByTopic = load.filterDataByTopic;}}
}

8.5 注册处理器

RocketMQ的网络服务是由netty来完成的,并提供多种服务形式,包括发送消息、拉取消息、查询消息、客户端管理、消费者管理、事务处理、Broker管理功能等,每种服务形式都有对应的处理器,需要要Broker启动时注册进去,以下源码就是对处理器的注册。每种处理器能处理的任务通过请求码 RequestCode来区分,所以不管是发送消息还是消费消息,我们只要找到对应的处理器下的请求码就可以知道它是怎么处理的,具体处理细节,待讲解各个细分点时再讲。

// 这个方法在 BrokerController 类中
public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}

这篇关于第八章-Broker-加载注册的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/859129

相关文章

SpringBoot项目启动报错"找不到或无法加载主类"的解决方法

《SpringBoot项目启动报错找不到或无法加载主类的解决方法》在使用IntelliJIDEA开发基于SpringBoot框架的Java程序时,可能会出现找不到或无法加载主类com.example.... 目录一、问题描述二、排查过程三、解决方案一、问题描述在使用 IntelliJ IDEA 开发基于

Spring Cloud之注册中心Nacos的使用详解

《SpringCloud之注册中心Nacos的使用详解》本文介绍SpringCloudAlibaba中的Nacos组件,对比了Nacos与Eureka的区别,展示了如何在项目中引入SpringClo... 目录Naacos服务注册/服务发现引⼊Spring Cloud Alibaba依赖引入Naco编程s依

Android WebView无法加载H5页面的常见问题和解决方法

《AndroidWebView无法加载H5页面的常见问题和解决方法》AndroidWebView是一种视图组件,使得Android应用能够显示网页内容,它基于Chromium,具备现代浏览器的许多功... 目录1. WebView 简介2. 常见问题3. 网络权限设置4. 启用 JavaScript5. D

SpringBoot项目启动错误:找不到或无法加载主类的几种解决方法

《SpringBoot项目启动错误:找不到或无法加载主类的几种解决方法》本文主要介绍了SpringBoot项目启动错误:找不到或无法加载主类的几种解决方法,具有一定的参考价值,感兴趣的可以了解一下... 目录方法1:更改IDE配置方法2:在Eclipse中清理项目方法3:使用Maven命令行在开发Sprin

spring-boot-starter-thymeleaf加载外部html文件方式

《spring-boot-starter-thymeleaf加载外部html文件方式》本文介绍了在SpringMVC中使用Thymeleaf模板引擎加载外部HTML文件的方法,以及在SpringBoo... 目录1.Thymeleaf介绍2.springboot使用thymeleaf2.1.引入spring

Go路由注册方法详解

《Go路由注册方法详解》Go语言中,http.NewServeMux()和http.HandleFunc()是两种不同的路由注册方式,前者创建独立的ServeMux实例,适合模块化和分层路由,灵活性高... 目录Go路由注册方法1. 路由注册的方式2. 路由器的独立性3. 灵活性4. 启动服务器的方式5.

关于Spring @Bean 相同加载顺序不同结果不同的问题记录

《关于Spring@Bean相同加载顺序不同结果不同的问题记录》本文主要探讨了在Spring5.1.3.RELEASE版本下,当有两个全注解类定义相同类型的Bean时,由于加载顺序不同,最终生成的... 目录问题说明测试输出1测试输出2@Bean注解的BeanDefiChina编程nition加入时机总结问题说明

SpringBoot项目启动后自动加载系统配置的多种实现方式

《SpringBoot项目启动后自动加载系统配置的多种实现方式》:本文主要介绍SpringBoot项目启动后自动加载系统配置的多种实现方式,并通过代码示例讲解的非常详细,对大家的学习或工作有一定的... 目录1. 使用 CommandLineRunner实现方式:2. 使用 ApplicationRunne

SpringBoot项目删除Bean或者不加载Bean的问题解决

《SpringBoot项目删除Bean或者不加载Bean的问题解决》文章介绍了在SpringBoot项目中如何使用@ComponentScan注解和自定义过滤器实现不加载某些Bean的方法,本文通过实... 使用@ComponentScan注解中的@ComponentScan.Filter标记不加载。@C

springboot 加载本地jar到maven的实现方法

《springboot加载本地jar到maven的实现方法》如何在SpringBoot项目中加载本地jar到Maven本地仓库,使用Maven的install-file目标来实现,本文结合实例代码给... 在Spring Boothttp://www.chinasem.cn项目中,如果你想要加载一个本地的ja