第八章-Broker-加载注册

2024-03-29 16:52
文章标签 加载 注册 第八章 broker

本文主要是介绍第八章-Broker-加载注册,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在第五章的5.2节,Broker初始化时,首先会调用4个load加载动作,都加载的是什么呢?

分别是加载topic.json、consumerOffset.json、subscriptionGroup.json、consumerFilter.json这4个文件的内容并做解析,最终存放至本地缓存中,这些文件的存放路径,见6.3 BrokerPathConfigHelper类,
下面主要介绍下这几个文件内部存放的是什么内容,做什么用:
1.topic.json:存储的是该broker支持的所有topic信息,对应类TopicConfigSerializeWrapper,每个topic对应一 个TopicConfig,内部包含topic支持读写队列数、topicName、读写权限(perm)、是否顺序

​ 2.consumerOffset.json:存储的是每个消费者组下的topic的每个cp的消费偏移,对应类 ConsumerOffsetManager,内部维护一个ConcurrentMap<String, ConcurrentMap<Integer, Long>>
​ key=topic@group({topicName}@{consumerGroup})
​ value=每个queueId对应的偏移

​ 3.subscriptionGroup.json:存储的是订阅组信息,对应SubscriptionGroupManager类,内部维护一个ConcurrentMap<String, SubscriptionGroupConfig>
​ key=group(消费者组)
​ value=SubscriptionGroupConfig

4.consumerFilter.json:存储的是消费者过滤数据信息,对应ConsumerFilterManager类,内部维护一个ConcurrentMap<String, FilterDataMapByTopic>,其实就是对应消费者设置的tags
key=topic
value=FilterDataMapByTopic

接下来,就分别对这4个文件的加载做源码解析,入口代码都在 BrokerController.initialize 中。

8.1 topic配置加载

ConfigManager.load

public boolean load() {String fileName = null;try {// topic.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 TopicConfigManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图7-1`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 topics.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 TopicConfigManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

TopicConfigManager.decode

public void decode(String jsonString) {if (jsonString != null) { // 还是判空// 将 json 字符串转成对象TopicConfigSerializeWrapper topicConfigSerializeWrapper =TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);if (topicConfigSerializeWrapper != null) {// topicConfigTable 是一个 Map,将文件中的内存存到map中this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());// 数据版本赋值this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());// 日志打印this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);}}
}

8.2 消费偏移加载

ConfigManager.load

public boolean load() {String fileName = null;try {// consumerOffset.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 ConsumerOffsetManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图8-1`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 consumerOffset.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 ConsumerOffsetManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

ConsumerOffsetManager.decode

public void decode(String jsonString) {if (jsonString != null) {// json 字符转换成对象ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);if (obj != null) {// 赋值本地缓存 mapthis.offsetTable = obj.offsetTable;}}
}

图8-1

在这里插入图片描述

8.3 订阅组加载

ConfigManager.load

public boolean load() {String fileName = null;try {// subscriptionGroup.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 SubscriptionGroupManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图8-2`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 subscriptionGroup.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 SubscriptionGroupManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

SubscriptionGroupManager.decode

public void decode(String jsonString) {if (jsonString != null) {// json 字符转换成对象SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {// subscriptionGroupTable 是一个 Map,将文件中的内存存到map中this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);// 数据版本赋值this.dataVersion.assignNewOne(obj.dataVersion);// 日志打印this.printLoadDataWhenFirstBoot(obj);}}
}

图8-2

在这里插入图片描述

8.4 消费者布容过滤器加载

ConfigManager.load

public boolean load() {String fileName = null;try {// consumerFilter.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 ConsumerFilterManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 consumerFilter.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 ConsumerFilterManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

ConsumerFilterManager.decode

这个方法内部主要还是将 json 转换成对象,然后再转成本地内存,至于细节,具体使用时再细讲。

public void decode(final String jsonString) {ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);if (load != null && load.filterDataByTopic != null) {boolean bloomChanged = false;for (String topic : load.filterDataByTopic.keySet()) {FilterDataMapByTopic dataMapByTopic = load.filterDataByTopic.get(topic);if (dataMapByTopic == null) {continue;}for (String group : dataMapByTopic.getGroupFilterData().keySet()) {ConsumerFilterData filterData = dataMapByTopic.getGroupFilterData().get(group);if (filterData == null) {continue;}try {filterData.setCompiledExpression(FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression()));} catch (Exception e) {log.error("load filter data error, " + filterData, e);}// check whether bloom filter is changed// if changed, ignore the bit map calculated before.if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {bloomChanged = true;log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData());break;}log.info("load exist consumer filter data: {}", filterData);if (filterData.getDeadTime() == 0) {// we think all consumers are dead when loadlong deadTime = System.currentTimeMillis() - 30 * 1000;filterData.setDeadTime(deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime);}}}if (!bloomChanged) {this.filterDataByTopic = load.filterDataByTopic;}}
}

8.5 注册处理器

RocketMQ的网络服务是由netty来完成的,并提供多种服务形式,包括发送消息、拉取消息、查询消息、客户端管理、消费者管理、事务处理、Broker管理功能等,每种服务形式都有对应的处理器,需要要Broker启动时注册进去,以下源码就是对处理器的注册。每种处理器能处理的任务通过请求码 RequestCode来区分,所以不管是发送消息还是消费消息,我们只要找到对应的处理器下的请求码就可以知道它是怎么处理的,具体处理细节,待讲解各个细分点时再讲。

// 这个方法在 BrokerController 类中
public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}

这篇关于第八章-Broker-加载注册的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/859129

相关文章

浏览器插件cursor实现自动注册、续杯的详细过程

《浏览器插件cursor实现自动注册、续杯的详细过程》Cursor简易注册助手脚本通过自动化邮箱填写和验证码获取流程,大大简化了Cursor的注册过程,它不仅提高了注册效率,还通过友好的用户界面和详细... 目录前言功能概述使用方法安装脚本使用流程邮箱输入页面验证码页面实战演示技术实现核心功能实现1. 随机

Spring如何使用注解@DependsOn控制Bean加载顺序

《Spring如何使用注解@DependsOn控制Bean加载顺序》:本文主要介绍Spring如何使用注解@DependsOn控制Bean加载顺序,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录1.javascript 前言2. 代码实现总结1. 前言默认情况下,Spring加载Bean的顺

springboot加载不到nacos配置中心的配置问题处理

《springboot加载不到nacos配置中心的配置问题处理》:本文主要介绍springboot加载不到nacos配置中心的配置问题处理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录springboot加载不到nacos配置中心的配置两种可能Spring Boot 版本Nacos

Nacos注册中心和配置中心的底层原理全面解读

《Nacos注册中心和配置中心的底层原理全面解读》:本文主要介绍Nacos注册中心和配置中心的底层原理的全面解读,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录临时实例和永久实例为什么 Nacos 要将服务实例分为临时实例和永久实例?1.x 版本和2.x版本的区别

使用Python获取JS加载的数据的多种实现方法

《使用Python获取JS加载的数据的多种实现方法》在当今的互联网时代,网页数据的动态加载已经成为一种常见的技术手段,许多现代网站通过JavaScript(JS)动态加载内容,这使得传统的静态网页爬取... 目录引言一、动态 网页与js加载数据的原理二、python爬取JS加载数据的方法(一)分析网络请求1

IDEA下"File is read-only"可能原因分析及"找不到或无法加载主类"的问题

《IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题》:本文主要介绍IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题,具有很好的参... 目录1.File is read-only”可能原因2.“找不到或无法加载主类”问题的解决总结1.File

重新对Java的类加载器的学习方式

《重新对Java的类加载器的学习方式》:本文主要介绍重新对Java的类加载器的学习方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍1.1、简介1.2、符号引用和直接引用1、符号引用2、直接引用3、符号转直接的过程2、加载流程3、类加载的分类3.1、显示

在 PyQt 加载 UI 三种常见方法

《在PyQt加载UI三种常见方法》在PyQt中,加载UI文件通常指的是使用QtDesigner设计的.ui文件,并将其转换为Python代码,以便在PyQt应用程序中使用,这篇文章给大家介绍在... 目录方法一:使用 uic 模块动态加载 (不推荐用于大型项目)方法二:将 UI 文件编译为 python 模

Spring框架中@Lazy延迟加载原理和使用详解

《Spring框架中@Lazy延迟加载原理和使用详解》:本文主要介绍Spring框架中@Lazy延迟加载原理和使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、@Lazy延迟加载原理1.延迟加载原理1.1 @Lazy三种配置方法1.2 @Component

SpringBoot中配置文件的加载顺序解读

《SpringBoot中配置文件的加载顺序解读》:本文主要介绍SpringBoot中配置文件的加载顺序,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot配置文件的加载顺序1、命令⾏参数2、Java系统属性3、操作系统环境变量5、项目【外部】的ap