本文主要是介绍第八章-Broker-加载注册,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在第五章的5.2
节,Broker初始化时,首先会调用4个load加载动作,都加载的是什么呢?
分别是加载topic.json、consumerOffset.json、subscriptionGroup.json、consumerFilter.json这4个文件的内容并做解析,最终存放至本地缓存中,这些文件的存放路径,见6.3 BrokerPathConfigHelper类,
下面主要介绍下这几个文件内部存放的是什么内容,做什么用:
1.topic.json:存储的是该broker支持的所有topic信息,对应类TopicConfigSerializeWrapper,每个topic对应一 个TopicConfig,内部包含topic支持读写队列数、topicName、读写权限(perm)、是否顺序 2.consumerOffset.json:存储的是每个消费者组下的topic的每个cp的消费偏移,对应类 ConsumerOffsetManager,内部维护一个ConcurrentMap<String, ConcurrentMap<Integer, Long>>
key=topic@group({topicName}@{consumerGroup})
value=每个queueId对应的偏移 3.subscriptionGroup.json:存储的是订阅组信息,对应SubscriptionGroupManager类,内部维护一个ConcurrentMap<String, SubscriptionGroupConfig>
key=group(消费者组)
value=SubscriptionGroupConfig4.consumerFilter.json:存储的是消费者过滤数据信息,对应ConsumerFilterManager类,内部维护一个ConcurrentMap<String, FilterDataMapByTopic>,其实就是对应消费者设置的tags
key=topic
value=FilterDataMapByTopic
接下来,就分别对这4个文件的加载做源码解析,入口代码都在 BrokerController.initialize 中。
8.1 topic配置加载
ConfigManager.load
public boolean load() {String fileName = null;try {// topic.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 TopicConfigManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图7-1`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 topics.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 TopicConfigManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}
TopicConfigManager.decode
public void decode(String jsonString) {if (jsonString != null) { // 还是判空// 将 json 字符串转成对象TopicConfigSerializeWrapper topicConfigSerializeWrapper =TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);if (topicConfigSerializeWrapper != null) {// topicConfigTable 是一个 Map,将文件中的内存存到map中this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());// 数据版本赋值this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());// 日志打印this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);}}
}
8.2 消费偏移加载
ConfigManager.load
public boolean load() {String fileName = null;try {// consumerOffset.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 ConsumerOffsetManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图8-1`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 consumerOffset.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 ConsumerOffsetManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}
ConsumerOffsetManager.decode
public void decode(String jsonString) {if (jsonString != null) {// json 字符转换成对象ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);if (obj != null) {// 赋值本地缓存 mapthis.offsetTable = obj.offsetTable;}}
}
图8-1
8.3 订阅组加载
ConfigManager.load
public boolean load() {String fileName = null;try {// subscriptionGroup.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 SubscriptionGroupManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图8-2`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 subscriptionGroup.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 SubscriptionGroupManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}
SubscriptionGroupManager.decode
public void decode(String jsonString) {if (jsonString != null) {// json 字符转换成对象SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {// subscriptionGroupTable 是一个 Map,将文件中的内存存到map中this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);// 数据版本赋值this.dataVersion.assignNewOne(obj.dataVersion);// 日志打印this.printLoadDataWhenFirstBoot(obj);}}
}
图8-2
8.4 消费者布容过滤器加载
ConfigManager.load
public boolean load() {String fileName = null;try {// consumerFilter.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 ConsumerFilterManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 consumerFilter.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 ConsumerFilterManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}
ConsumerFilterManager.decode
这个方法内部主要还是将 json 转换成对象,然后再转成本地内存,至于细节,具体使用时再细讲。
public void decode(final String jsonString) {ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);if (load != null && load.filterDataByTopic != null) {boolean bloomChanged = false;for (String topic : load.filterDataByTopic.keySet()) {FilterDataMapByTopic dataMapByTopic = load.filterDataByTopic.get(topic);if (dataMapByTopic == null) {continue;}for (String group : dataMapByTopic.getGroupFilterData().keySet()) {ConsumerFilterData filterData = dataMapByTopic.getGroupFilterData().get(group);if (filterData == null) {continue;}try {filterData.setCompiledExpression(FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression()));} catch (Exception e) {log.error("load filter data error, " + filterData, e);}// check whether bloom filter is changed// if changed, ignore the bit map calculated before.if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {bloomChanged = true;log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData());break;}log.info("load exist consumer filter data: {}", filterData);if (filterData.getDeadTime() == 0) {// we think all consumers are dead when loadlong deadTime = System.currentTimeMillis() - 30 * 1000;filterData.setDeadTime(deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime);}}}if (!bloomChanged) {this.filterDataByTopic = load.filterDataByTopic;}}
}
8.5 注册处理器
RocketMQ的网络服务是由netty来完成的,并提供多种服务形式,包括发送消息、拉取消息、查询消息、客户端管理、消费者管理、事务处理、Broker管理功能等,每种服务形式都有对应的处理器,需要要Broker启动时注册进去,以下源码就是对处理器的注册。每种处理器能处理的任务通过请求码 RequestCode来区分,所以不管是发送消息还是消费消息,我们只要找到对应的处理器下的请求码就可以知道它是怎么处理的,具体处理细节,待讲解各个细分点时再讲。
// 这个方法在 BrokerController 类中
public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
这篇关于第八章-Broker-加载注册的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!