第七章-Broker-创建topic

2024-03-28 20:44
文章标签 创建 第七章 broker topic

本文主要是介绍第七章-Broker-创建topic,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概念就不讲了,直接上操作和源码,这里就用rocketmq自带的dashboard来创建topic,如下图:

在这里插入图片描述

clusterName:Broker的集群,可以选择多个

BROKER_NAME:Broker 名字,可以选择多个

topicName:topic 名字

writeQueueNums:写队列数

readQueueNums:读队列数

perm:设置topic的读写模式

最终创建topic的方法,由DefaultMQAdminExt.createAndUpdateTopicConfig实现,那么我们顺着这个调用链走下去

DefaultMQAdminExt.createAndUpdateTopicConfig

->DefaultMQAdminExtImpl.createAndUpdateTopicConfig

->MQClientAPIImpl.createTopic

MQClientAPIImpl.createTopic

/**
* 先讲下各个参数代表什么
* addr:选择的单个broker的地址,注意这里是单个,当选择多个broker时,会遍历一个一个的调用创建
* defaultTopic:默认topic => TBW102
* topicConfig:读写队列数、权限等信息
* timeoutMillis:超过毫秒,默认20000
*/
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,final long timeoutMillis)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {// 组装请求头CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();requestHeader.setTopic(topicConfig.getTopicName());requestHeader.setDefaultTopic(defaultTopic);requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());requestHeader.setPerm(topicConfig.getPerm());requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());requestHeader.setOrder(topicConfig.isOrder());// 主要就看这一行,将请求组装成远程命令对象,请求码为 RequestCode.UPDATE_AND_CREATE_TOPIC,在RocketMQ中,所有的网络请求最终都会转化成对应的请求码,所以我们找到处理该请求码的代码就行,翻看源码得知,这里就得转到broker模块,并由AdminBrokerProcessor.processRequest中处理,且该方法中对应 RequestCode.UPDATE_AND_CREATE_TOPIC 请求码的方法是 updateAndCreateTopic(ctx, request),好了,这下我们直接进入该方法就行。RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQClientException(response.getCode(), response.getRemark());
}

AdminBrokerProcessor.updateAndCreateTopic

private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 先创建响应命令对象final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 这里就是转换成当前能识别的,有兴趣的可以进去看看final CreateTopicRequestHeader requestHeader =(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));// 检查 topic 名是否与 broker 集群名字冲突,有冲突就要返回错误if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";log.warn(errorMsg);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorMsg);return response;}try {// 上面检查通过了,设置返回码为成功response.setCode(ResponseCode.SUCCESS);// 唯一标识当前请求的idresponse.setOpaque(request.getOpaque());response.markResponseType();response.setRemark(null);ctx.writeAndFlush(response);// 将响应信息通过netty写回客户端} catch (Exception e) {log.error("Failed to produce a proper response", e);}// 设置topicConfigTopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());topicConfig.setPerm(requestHeader.getPerm());topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());// 正式存放 topic的操作在这里,看 TopicConfigManager.updateTopicConfigthis.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
// 将 broker 的信息再注册到 namesrvthis.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());return null;
}

TopicConfigManager.updateTopicConfig

public void updateTopicConfig(final TopicConfig topicConfig) {// 先在缓存 map中设置 topic的配置TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);if (old != null) {log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);} else {log.info("create new topic [{}]", topicConfig);}// 记录操作版本,其实就是id自增this.dataVersion.nextVersion();// 将topic配置信息持久化到文件this.persist();
}

将topic配置信息持续化到文件

持久化操作在ConfigManager类中执行

public synchronized void persist() {// encode是一个模板方法,由子类实现,在这里指的是 TopicConfigManagerString jsonString = this.encode(true);if (jsonString != null) {// 拿到topic文件存放目录,默认情况下是:{user.home}/store/config/topics.jsonString fileName = this.configFilePath();try {// 将 json 字符串写入 topic 配置文件中,格式如`图7-1`MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}}
}public abstract String encode(final boolean prettyFormat);

TopicConfigManager.encode

public String encode(final boolean prettyFormat) {// 这个方法挺简单,就是把刚才在 updateTopicConfig 方法中存放在本地缓存 map中的值设置到 topicConfigSerializeWrapper对象,同时,将 dataVersion 也设置进去,然后再将对象序列化成 json 字符串TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);topicConfigSerializeWrapper.setDataVersion(this.dataVersion);// 生成 json,格式如`图7-1`return topicConfigSerializeWrapper.toJson(prettyFormat);
}

图7-1

在这里插入图片描述

这篇关于第七章-Broker-创建topic的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/856654

相关文章

idea中创建新类时自动添加注释的实现

《idea中创建新类时自动添加注释的实现》在每次使用idea创建一个新类时,过了一段时间发现看不懂这个类是用来干嘛的,为了解决这个问题,我们可以设置在创建一个新类时自动添加注释,帮助我们理解这个类的用... 目录前言:详细操作:步骤一:点击上方的 文件(File),点击&nbmyHIgsp;设置(Setti

Spring 中使用反射创建 Bean 实例的几种方式

《Spring中使用反射创建Bean实例的几种方式》文章介绍了在Spring框架中如何使用反射来创建Bean实例,包括使用Class.newInstance()、Constructor.newI... 目录1. 使用 Class.newInstance() (仅限无参构造函数):2. 使用 Construc

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

Python中conda虚拟环境创建及使用小结

《Python中conda虚拟环境创建及使用小结》本文主要介绍了Python中conda虚拟环境创建及使用小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录0.前言1.Miniconda安装2.conda本地基本操作3.创建conda虚拟环境4.激活c

使用Python创建一个能够筛选文件的PDF合并工具

《使用Python创建一个能够筛选文件的PDF合并工具》这篇文章主要为大家详细介绍了如何使用Python创建一个能够筛选文件的PDF合并工具,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录背景主要功能全部代码代码解析1. 初始化 wx.Frame 窗口2. 创建工具栏3. 创建布局和界面控件4

Java中对象的创建和销毁过程详析

《Java中对象的创建和销毁过程详析》:本文主要介绍Java中对象的创建和销毁过程,对象的创建过程包括类加载检查、内存分配、初始化零值内存、设置对象头和执行init方法,对象的销毁过程由垃圾回收机... 目录前言对象的创建过程1. 类加载检查2China编程. 分配内存3. 初始化零值4. 设置对象头5. 执行

Android 悬浮窗开发示例((动态权限请求 | 前台服务和通知 | 悬浮窗创建 )

《Android悬浮窗开发示例((动态权限请求|前台服务和通知|悬浮窗创建)》本文介绍了Android悬浮窗的实现效果,包括动态权限请求、前台服务和通知的使用,悬浮窗权限需要动态申请并引导... 目录一、悬浮窗 动态权限请求1、动态请求权限2、悬浮窗权限说明3、检查动态权限4、申请动态权限5、权限设置完毕后

Python创建Excel的4种方式小结

《Python创建Excel的4种方式小结》这篇文章主要为大家详细介绍了Python中创建Excel的4种常见方式,文中的示例代码简洁易懂,具有一定的参考价值,感兴趣的小伙伴可以学习一下... 目录库的安装代码1——pandas代码2——openpyxl代码3——xlsxwriterwww.cppcns.c

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

解决IDEA使用springBoot创建项目,lombok标注实体类后编译无报错,但是运行时报错问题

《解决IDEA使用springBoot创建项目,lombok标注实体类后编译无报错,但是运行时报错问题》文章详细描述了在使用lombok的@Data注解标注实体类时遇到编译无误但运行时报错的问题,分析... 目录问题分析问题解决方案步骤一步骤二步骤三总结问题使用lombok注解@Data标注实体类,编译时