RocketMq源码解析四:生产者Producer启动

2024-05-27 12:20

本文主要是介绍RocketMq源码解析四:生产者Producer启动,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、主要接口和类

        生产者服务核心接口和类的关系如下图所示:

        MQProducer是生产者解耦,这里找几个有代表性的方法

// 同步发送消息

SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;

// 同步超时发送消息 如果超过了timeout的时间就抛出异常

SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;

// 异步发送消息

void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;
// 异步超时发送消息 如果超过了timeout的时间就抛出异常
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;

// 指定消息队列同步发送消息

SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;

        DefaultMQProducer除了实现MQProducer的方法外,还继承了ClientConfig类,ClientConfig中主要记录了客户端的一些连接配置信息,我们重点看下DefaultMQProducer中有哪些核心属性

producerGroup:生产者所属组
createTopickey:默认Topic

defaultTopicQueueNums:默认主题在每一个Broker队列数量

sendMsgTimeout:发送消息默认超时时间,默认3s

compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k

retryTimeswhensendFailed:同步方式发送消息重试次数,默认为2,总共执行3次retryTimeswhensendAsyncFailed:异步方法发送消息重试次数,默认为2

retryAnotherBrokerwhenNotstoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false

maxMessagesize:允许发送的最大消息长度,默认为4M 

        我们看到 DefaultMQProducer中持有了一个transient 修饰的DefaultMQProducerImpl类的成员属性defaultMQProducerImpl,实际上核心的功能都封装在了这个DefaultMQProducerImpl类中,下面我们逐一来为读者展开说明。

二、生产者启动流程 

        我们先来看生产者启动的方法 DefaultMQProducer::start

    @Overridepublic void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));this.defaultMQProducerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}}

        第一步是获取并设置生产者组的信息;

        第二步调用defaultMQProducerImpl的start方法,我们上文讲过DefaultMQProducer的大部分核心功能都是封装在DefaultMQProducerImpl类中。我们来看下defaultMQProducerImpl中的start方法:

        流程图如下:

        源码如下:

public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {// 如果是启动case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 1、先检查一下配置this.checkConfig();// 2、设置自身的客户端名称为进程IDif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 3、获取MQClientManager并获得MQClientInstance实例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 4、把当前的生产者注入MQClientFactory中boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());// 5、调用start方法启动if (startFactory) {mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;...}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();RequestFutureHolder.getInstance().startScheduledTask(this);}

       重点讲一下第3步:获取MQClientManager并获得MQClientInstance实例。整个JVM中只存在一个MQClienManager实例,维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientld */, MQClientinstance> factoryTable = newConcurrentHashMap<String,MQClientlnstance>():

        同一个clientld只会创建一个MQClientInstance。MQClientinstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
        代码:MQClientManager::getAndCreateMQClientInstance 

    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;}

a:构建客户端的id;

b:从缓存表factoryTable中获取对应clientId的实例;

c:如果没有就生成一个并放入到缓存表中;

d:返回 

        最后我们来看下mQClientFactory.start()当中的源码。

// 先把服务状态改为失败

this.serviceState = ServiceState.START_FAILED;
// 如果配置中的namesrv地址为空,重新获取
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel  启动一个netty服务用于处理请求
this.mQClientAPIImpl.start();
// Start various schedule tasks 启动一系列定时任务用于更新namesrv地址,topic消费情况等
this.startScheduledTask();
// Start pull service 启动拉取消息服务
this.pullMessageService.start();
// Start rebalance service 启动relalance服务
this.rebalanceService.start();
// Start push service 启动推送服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);

// 把服务状态改为运行中
this.serviceState = ServiceState.RUNNING;

        至此生产者启动流程已经讲述完毕。

        

这篇关于RocketMq源码解析四:生产者Producer启动的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007462

相关文章

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

深度解析Java DTO(最新推荐)

《深度解析JavaDTO(最新推荐)》DTO(DataTransferObject)是一种用于在不同层(如Controller层、Service层)之间传输数据的对象设计模式,其核心目的是封装数据,... 目录一、什么是DTO?DTO的核心特点:二、为什么需要DTO?(对比Entity)三、实际应用场景解析

深度解析Java项目中包和包之间的联系

《深度解析Java项目中包和包之间的联系》文章浏览阅读850次,点赞13次,收藏8次。本文详细介绍了Java分层架构中的几个关键包:DTO、Controller、Service和Mapper。_jav... 目录前言一、各大包1.DTO1.1、DTO的核心用途1.2. DTO与实体类(Entity)的区别1

Java中的雪花算法Snowflake解析与实践技巧

《Java中的雪花算法Snowflake解析与实践技巧》本文解析了雪花算法的原理、Java实现及生产实践,涵盖ID结构、位运算技巧、时钟回拨处理、WorkerId分配等关键点,并探讨了百度UidGen... 目录一、雪花算法核心原理1.1 算法起源1.2 ID结构详解1.3 核心特性二、Java实现解析2.

使用Python绘制3D堆叠条形图全解析

《使用Python绘制3D堆叠条形图全解析》在数据可视化的工具箱里,3D图表总能带来眼前一亮的效果,本文就来和大家聊聊如何使用Python实现绘制3D堆叠条形图,感兴趣的小伙伴可以了解下... 目录为什么选择 3D 堆叠条形图代码实现:从数据到 3D 世界的搭建核心代码逐行解析细节优化应用场景:3D 堆叠图

深度解析Python装饰器常见用法与进阶技巧

《深度解析Python装饰器常见用法与进阶技巧》Python装饰器(Decorator)是提升代码可读性与复用性的强大工具,本文将深入解析Python装饰器的原理,常见用法,进阶技巧与最佳实践,希望可... 目录装饰器的基本原理函数装饰器的常见用法带参数的装饰器类装饰器与方法装饰器装饰器的嵌套与组合进阶技巧

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

全面解析MySQL索引长度限制问题与解决方案

《全面解析MySQL索引长度限制问题与解决方案》MySQL对索引长度设限是为了保持高效的数据检索性能,这个限制不是MySQL的缺陷,而是数据库设计中的权衡结果,下面我们就来看看如何解决这一问题吧... 目录引言:为什么会有索引键长度问题?一、问题根源深度解析mysql索引长度限制原理实际场景示例二、五大解决

深度解析Spring Boot拦截器Interceptor与过滤器Filter的区别与实战指南

《深度解析SpringBoot拦截器Interceptor与过滤器Filter的区别与实战指南》本文深度解析SpringBoot中拦截器与过滤器的区别,涵盖执行顺序、依赖关系、异常处理等核心差异,并... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实