RocketMQ源码分析----Producer启动过程

2024-08-30 09:58

本文主要是介绍RocketMQ源码分析----Producer启动过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

总体流程

首先从demo为入口分析整个启动过程

	public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {String message = "send message";final Message msg = new Message("topic", "tag",// tagmessage.getBytes());// bodySendResult sendResult = producer.send(msg);producer.shutdown();} catch (Exception e) {e.printStackTrace();}}

DefaultMQProducer构造方法如下:

    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {this.producerGroup = producerGroup;defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);}public DefaultMQProducer(final String producerGroup) {this(producerGroup, null);}

然后调用start方法后,即开始初始化流程,其间接调用了DefaultMQProducerImpl的start方法,即上面构造方法创建的DefaultMQProducerImpl,代码如下:

    public void start() throws MQClientException {this.start(true);}public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查producerGroupthis.checkConfig();// 如果producerGroup为该值,则改变instanceName为pidif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 获取或者创建客户端实例this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 实例注册,实际是放到一个map中,producerGroupboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 是否要启动该实例mQClientFactory.start();}this.serviceState = ServiceState.RUNNING;break;case 其他状态不处理://....}// 心跳发送this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}

MQClientInstance创建和启动

MQClientInstance主要是客户端的一些操作的集合(不知道该怎么总结好…),通过MQClientManager的getAndCreateMQClientInstance方法创建,这里使用了单例模式去创建MQClientManager

   public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {return getAndCreateMQClientInstance(clientConfig, null);}public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {//ip@instanceName@unitNameString clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {// 创建MQClientInstance并放入map中instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;} else {}}return instance;}

构造方法如下:

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.clientConfig = clientConfig;this.instanceIndex = instanceIndex;// 通信服务的一些配置this.nettyClientConfig = new NettyClientConfig();//设置线程数this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());//网络请求的处理器this.clientRemotingProcessor = new ClientRemotingProcessor(this);// 客户端一些基础APIthis.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);if (this.clientConfig.getNamesrvAddr() != null) {this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());}this.clientId = clientId;this.mQAdminImpl = new MQAdminImpl(this);// 拉取消息服务this.pullMessageService = new PullMessageService(this);// rebalance服务this.rebalanceService = new RebalanceService(this);//创建一个group为CLIENT_INNER_PRODUCER_GROUP的producer// 该producer主要用于将消息发回Brokerthis.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);// 将用户自建的producer属性复制过来lithis.defaultMQProducer.resetClientConfig(clientConfig);// Consumer一些消费情况(TPS,RT等)的统计管理this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);}

可以看到MQClientInstance里包含了客户端(Producer、Consumer)的一些通用操作以及一些属性的保存,其中一些具体是功能后续文章分析

上面有点需要注意:

  • 在判断producerGroup为MixAll.CLIENT_INNER_PRODUCER_GROUP时会改变InstanceName为pid,为什么需要这样做呢?

个人理解如下:
由于该Producer是系统内置的producer,用户系统行为的消息发送,如果一个JVM内用户启动了多个Producer,那么内置的producer是可以用一个的,所以这里将instanceName改成pid就好了,那么在getAndCreateMQClientInstance中获取的clientId(ip@instanceName@unitName)就会一样,就能取出之前创建的MQClientInstance对象

启动方法如下:

    public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}//netty相关通信的初始化,然后开启一个定时任务扫描responseTablethis.mQClientAPIImpl.start();// 启动定时任务this.startScheduledTask();// 启动消息拉取服务this.pullMessageService.start();// 启动Rebalance拉取服务this.rebalanceService.start();// 启动内置producerthis.defaultMQProducer.getDefaultMQProducerImpl().start(false);this.serviceState = ServiceState.RUNNING;break;case//其他状态不处理;}}}

启动定时任务

MQClientInstance启动的时候会把一些客户端相关的服务都启动,另外启动了很多重要的定时任务:

  1. 拉取NameServer地址
  2. 更新Topic路由信息
  3. 清除下线的Broker
  4. 发送心跳给所有Broker
  5. 持久化Consumer的消费offset
  6. 调整线程池

整个启动的大体过程就是如此,细节很多,不在这里展开,会在另外文章具体分析

这篇关于RocketMQ源码分析----Producer启动过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120498

相关文章

Spring boot整合dubbo+zookeeper的详细过程

《Springboot整合dubbo+zookeeper的详细过程》本文讲解SpringBoot整合Dubbo与Zookeeper实现API、Provider、Consumer模式,包含依赖配置、... 目录Spring boot整合dubbo+zookeeper1.创建父工程2.父工程引入依赖3.创建ap

Linux下进程的CPU配置与线程绑定过程

《Linux下进程的CPU配置与线程绑定过程》本文介绍Linux系统中基于进程和线程的CPU配置方法,通过taskset命令和pthread库调整亲和力,将进程/线程绑定到特定CPU核心以优化资源分配... 目录1 基于进程的CPU配置1.1 对CPU亲和力的配置1.2 绑定进程到指定CPU核上运行2 基于

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java进程异常故障定位及排查过程

《Java进程异常故障定位及排查过程》:本文主要介绍Java进程异常故障定位及排查过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、故障发现与初步判断1. 监控系统告警2. 日志初步分析二、核心排查工具与步骤1. 进程状态检查2. CPU 飙升问题3. 内存

SpringBoot整合liteflow的详细过程

《SpringBoot整合liteflow的详细过程》:本文主要介绍SpringBoot整合liteflow的详细过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋...  liteflow 是什么? 能做什么?总之一句话:能帮你规范写代码逻辑 ,编排并解耦业务逻辑,代码

Java中调用数据库存储过程的示例代码

《Java中调用数据库存储过程的示例代码》本文介绍Java通过JDBC调用数据库存储过程的方法,涵盖参数类型、执行步骤及数据库差异,需注意异常处理与资源管理,以优化性能并实现复杂业务逻辑,感兴趣的朋友... 目录一、存储过程概述二、Java调用存储过程的基本javascript步骤三、Java调用存储过程示

MySQL中的InnoDB单表访问过程

《MySQL中的InnoDB单表访问过程》:本文主要介绍MySQL中的InnoDB单表访问过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、访问类型【1】const【2】ref【3】ref_or_null【4】range【5】index【6】

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景