第八章-Broker-加载注册

2024-03-29 16:52
文章标签 加载 注册 第八章 broker

本文主要是介绍第八章-Broker-加载注册,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在第五章的5.2节,Broker初始化时,首先会调用4个load加载动作,都加载的是什么呢?

分别是加载topic.json、consumerOffset.json、subscriptionGroup.json、consumerFilter.json这4个文件的内容并做解析,最终存放至本地缓存中,这些文件的存放路径,见6.3 BrokerPathConfigHelper类,
下面主要介绍下这几个文件内部存放的是什么内容,做什么用:
1.topic.json:存储的是该broker支持的所有topic信息,对应类TopicConfigSerializeWrapper,每个topic对应一 个TopicConfig,内部包含topic支持读写队列数、topicName、读写权限(perm)、是否顺序

​ 2.consumerOffset.json:存储的是每个消费者组下的topic的每个cp的消费偏移,对应类 ConsumerOffsetManager,内部维护一个ConcurrentMap<String, ConcurrentMap<Integer, Long>>
​ key=topic@group({topicName}@{consumerGroup})
​ value=每个queueId对应的偏移

​ 3.subscriptionGroup.json:存储的是订阅组信息,对应SubscriptionGroupManager类,内部维护一个ConcurrentMap<String, SubscriptionGroupConfig>
​ key=group(消费者组)
​ value=SubscriptionGroupConfig

4.consumerFilter.json:存储的是消费者过滤数据信息,对应ConsumerFilterManager类,内部维护一个ConcurrentMap<String, FilterDataMapByTopic>,其实就是对应消费者设置的tags
key=topic
value=FilterDataMapByTopic

接下来,就分别对这4个文件的加载做源码解析,入口代码都在 BrokerController.initialize 中。

8.1 topic配置加载

ConfigManager.load

public boolean load() {String fileName = null;try {// topic.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 TopicConfigManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图7-1`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 topics.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 TopicConfigManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

TopicConfigManager.decode

public void decode(String jsonString) {if (jsonString != null) { // 还是判空// 将 json 字符串转成对象TopicConfigSerializeWrapper topicConfigSerializeWrapper =TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);if (topicConfigSerializeWrapper != null) {// topicConfigTable 是一个 Map,将文件中的内存存到map中this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());// 数据版本赋值this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());// 日志打印this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);}}
}

8.2 消费偏移加载

ConfigManager.load

public boolean load() {String fileName = null;try {// consumerOffset.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 ConsumerOffsetManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图8-1`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 consumerOffset.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 ConsumerOffsetManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

ConsumerOffsetManager.decode

public void decode(String jsonString) {if (jsonString != null) {// json 字符转换成对象ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);if (obj != null) {// 赋值本地缓存 mapthis.offsetTable = obj.offsetTable;}}
}

图8-1

在这里插入图片描述

8.3 订阅组加载

ConfigManager.load

public boolean load() {String fileName = null;try {// subscriptionGroup.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 SubscriptionGroupManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串,格式如`图8-2`String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 subscriptionGroup.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 SubscriptionGroupManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

SubscriptionGroupManager.decode

public void decode(String jsonString) {if (jsonString != null) {// json 字符转换成对象SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {// subscriptionGroupTable 是一个 Map,将文件中的内存存到map中this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);// 数据版本赋值this.dataVersion.assignNewOne(obj.dataVersion);// 日志打印this.printLoadDataWhenFirstBoot(obj);}}
}

图8-2

在这里插入图片描述

8.4 消费者布容过滤器加载

ConfigManager.load

public boolean load() {String fileName = null;try {// consumerFilter.json 文件完成路径,这是一个模板方法,由子类实现,此时子类是 ConsumerFilterManagerfileName = this.configFilePath();// 将文件中的内容读取出来,并形成json字符串String jsonString = MixAll.file2String(fileName);// 判空if (null == jsonString || jsonString.length() == 0) {// 如果为空,就加载 consumerFilter.json.bak 文件,加载逻辑同 loadreturn this.loadBak();} else {// 不为空,那就将内容转换成本地缓存,这是一个模板方法,由子类实现,此时子类是 ConsumerFilterManagerthis.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}

ConsumerFilterManager.decode

这个方法内部主要还是将 json 转换成对象,然后再转成本地内存,至于细节,具体使用时再细讲。

public void decode(final String jsonString) {ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);if (load != null && load.filterDataByTopic != null) {boolean bloomChanged = false;for (String topic : load.filterDataByTopic.keySet()) {FilterDataMapByTopic dataMapByTopic = load.filterDataByTopic.get(topic);if (dataMapByTopic == null) {continue;}for (String group : dataMapByTopic.getGroupFilterData().keySet()) {ConsumerFilterData filterData = dataMapByTopic.getGroupFilterData().get(group);if (filterData == null) {continue;}try {filterData.setCompiledExpression(FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression()));} catch (Exception e) {log.error("load filter data error, " + filterData, e);}// check whether bloom filter is changed// if changed, ignore the bit map calculated before.if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {bloomChanged = true;log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData());break;}log.info("load exist consumer filter data: {}", filterData);if (filterData.getDeadTime() == 0) {// we think all consumers are dead when loadlong deadTime = System.currentTimeMillis() - 30 * 1000;filterData.setDeadTime(deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime);}}}if (!bloomChanged) {this.filterDataByTopic = load.filterDataByTopic;}}
}

8.5 注册处理器

RocketMQ的网络服务是由netty来完成的,并提供多种服务形式,包括发送消息、拉取消息、查询消息、客户端管理、消费者管理、事务处理、Broker管理功能等,每种服务形式都有对应的处理器,需要要Broker启动时注册进去,以下源码就是对处理器的注册。每种处理器能处理的任务通过请求码 RequestCode来区分,所以不管是发送消息还是消费消息,我们只要找到对应的处理器下的请求码就可以知道它是怎么处理的,具体处理细节,待讲解各个细分点时再讲。

// 这个方法在 BrokerController 类中
public void registerProcessor() {/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}

这篇关于第八章-Broker-加载注册的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/859129

相关文章

Flutter 进阶:绘制加载动画

绘制加载动画:由小圆组成的大圆 1. 定义 LoadingScreen 类2. 实现 _LoadingScreenState 类3. 定义 LoadingPainter 类4. 总结 实现加载动画 我们需要定义两个类:LoadingScreen 和 LoadingPainter。LoadingScreen 负责控制动画的状态,而 LoadingPainter 则负责绘制动画。

《数据结构(C语言版)第二版》第八章-排序(8.3-交换排序、8.4-选择排序)

8.3 交换排序 8.3.1 冒泡排序 【算法特点】 (1) 稳定排序。 (2) 可用于链式存储结构。 (3) 移动记录次数较多,算法平均时间性能比直接插入排序差。当初始记录无序,n较大时, 此算法不宜采用。 #include <stdio.h>#include <stdlib.h>#define MAXSIZE 26typedef int KeyType;typedef char In

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

使用WebP解决网站加载速度问题,这些细节你需要了解

说到网页的图片格式,大家最常想到的可能是JPEG、PNG,毕竟这些老牌格式陪伴我们这么多年。然而,近几年,有一个格式悄悄崭露头角,那就是WebP。很多人可能听说过,但到底它好在哪?你的网站或者项目是不是也应该用WebP呢?别着急,今天咱们就来好好聊聊WebP这个图片格式的前世今生,以及它值不值得你花时间去用。 为什么会有WebP? 你有没有遇到过这样的情况?网页加载特别慢,尤其是那

Chapter 13 普通组件的注册使用

欢迎大家订阅【Vue2+Vue3】入门到实践 专栏,开启你的 Vue 学习之旅! 文章目录 前言一、组件创建二、局部注册三、全局注册 前言 在 Vue.js 中,组件是构建应用程序的基本单元。本章详细讲解了注册和使用 Vue 的普通组件的两种方式:局部注册和全局注册。 本篇文章参考黑马程序员 一、组件创建 ①定义 Vue 组件是一种具有特定功能的 Vue 实

c++11工厂子类实现自注册的两种方法

文章目录 一、产品类构建1. 猫基类与各品种猫子类2.狗基类与各品种狗子类 二、工厂类构建三、客户端使用switch-case实现调用不同工厂子类四、自注册方法一:公开注册函数显式注册五、自注册方法二:构造函数隐形注册总结 一、产品类构建 1. 猫基类与各品种猫子类 class Cat {public:virtual void Printer() = 0;};class

gazebo 已加载模型但无法显示

目录 写在前面的话问题一:robot_state_publisher 发布机器人信息失败报错一 Error: Error document empty.报错二 .xcaro 文件中有多行注释成功启动 问题二:通过 ros2 启动 gazebo 失败成功启动 问题三:gazebo 崩溃和无法显示模型问题四: 缺少 robot_description 等话题正确的输出 写在前面的话

JVM类的加载器及加载过程

类的加载器及加载过程 文章目录 类的加载器及加载过程类的加载过程加载:链接(验证、准备、解析):初始化: 类加载器的分类引导类加载器:BootstrapClassLoader 启动类加载器( C/C++实现,嵌套在JVM内部)自定义类加载器(所有派生于抽象类ClassLoader的类加载器)获取ClassLoader的途径 双亲委派机制(重点)判断两个Class对象是否为同一个类

SAP学习笔记 - 开发02 - BTP实操流程(账号注册,BTP控制台,BTP集成开发环境搭建)

上一章讲了 BAPI的概念,以及如何调用SAP里面的既存BAPI。 SAP学习笔记 - 开发01 - BAPI是什么?通过界面和ABAP代码来调用BAPI-CSDN博客 本章继续讲开发相关的内容,主要就是BTP的实际操作流程,比如账号注册,登录,BTP集成开发环境的搭建这方面。 目录 1,账号注册 2,BTP登录URL 3,如何在BTP上进行开发? 以下是详细内容。 1,账

Unity Adressables 使用说明(六)加载(Load) Addressable Assets

【概述】Load Addressable Assets Addressables类提供了加载 Addressable assets 的方法。你可以一次加载一个资源或批量加载资源。为了识别要加载的资源,你需要向加载方法传递一个键或键列表。键可以是以下对象之一: Address:包含你分配给资源的地址的字符串。Label:包含分配给一个或多个资源的标签的字符串。AssetReference Obj