RocketMq源码解析四:生产者Producer启动

2024-05-27 12:20

本文主要是介绍RocketMq源码解析四:生产者Producer启动,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、主要接口和类

        生产者服务核心接口和类的关系如下图所示:

        MQProducer是生产者解耦,这里找几个有代表性的方法

// 同步发送消息

SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;

// 同步超时发送消息 如果超过了timeout的时间就抛出异常

SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;

// 异步发送消息

void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;
// 异步超时发送消息 如果超过了timeout的时间就抛出异常
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;

// 指定消息队列同步发送消息

SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;

        DefaultMQProducer除了实现MQProducer的方法外,还继承了ClientConfig类,ClientConfig中主要记录了客户端的一些连接配置信息,我们重点看下DefaultMQProducer中有哪些核心属性

producerGroup:生产者所属组
createTopickey:默认Topic

defaultTopicQueueNums:默认主题在每一个Broker队列数量

sendMsgTimeout:发送消息默认超时时间,默认3s

compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k

retryTimeswhensendFailed:同步方式发送消息重试次数,默认为2,总共执行3次retryTimeswhensendAsyncFailed:异步方法发送消息重试次数,默认为2

retryAnotherBrokerwhenNotstoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false

maxMessagesize:允许发送的最大消息长度,默认为4M 

        我们看到 DefaultMQProducer中持有了一个transient 修饰的DefaultMQProducerImpl类的成员属性defaultMQProducerImpl,实际上核心的功能都封装在了这个DefaultMQProducerImpl类中,下面我们逐一来为读者展开说明。

二、生产者启动流程 

        我们先来看生产者启动的方法 DefaultMQProducer::start

    @Overridepublic void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));this.defaultMQProducerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}}

        第一步是获取并设置生产者组的信息;

        第二步调用defaultMQProducerImpl的start方法,我们上文讲过DefaultMQProducer的大部分核心功能都是封装在DefaultMQProducerImpl类中。我们来看下defaultMQProducerImpl中的start方法:

        流程图如下:

        源码如下:

public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {// 如果是启动case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 1、先检查一下配置this.checkConfig();// 2、设置自身的客户端名称为进程IDif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 3、获取MQClientManager并获得MQClientInstance实例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 4、把当前的生产者注入MQClientFactory中boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());// 5、调用start方法启动if (startFactory) {mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;...}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();RequestFutureHolder.getInstance().startScheduledTask(this);}

       重点讲一下第3步:获取MQClientManager并获得MQClientInstance实例。整个JVM中只存在一个MQClienManager实例,维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientld */, MQClientinstance> factoryTable = newConcurrentHashMap<String,MQClientlnstance>():

        同一个clientld只会创建一个MQClientInstance。MQClientinstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
        代码:MQClientManager::getAndCreateMQClientInstance 

    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;}

a:构建客户端的id;

b:从缓存表factoryTable中获取对应clientId的实例;

c:如果没有就生成一个并放入到缓存表中;

d:返回 

        最后我们来看下mQClientFactory.start()当中的源码。

// 先把服务状态改为失败

this.serviceState = ServiceState.START_FAILED;
// 如果配置中的namesrv地址为空,重新获取
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel  启动一个netty服务用于处理请求
this.mQClientAPIImpl.start();
// Start various schedule tasks 启动一系列定时任务用于更新namesrv地址,topic消费情况等
this.startScheduledTask();
// Start pull service 启动拉取消息服务
this.pullMessageService.start();
// Start rebalance service 启动relalance服务
this.rebalanceService.start();
// Start push service 启动推送服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);

// 把服务状态改为运行中
this.serviceState = ServiceState.RUNNING;

        至此生产者启动流程已经讲述完毕。

        

这篇关于RocketMq源码解析四:生产者Producer启动的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007462

相关文章

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

在C#中合并和解析相对路径方式

《在C#中合并和解析相对路径方式》Path类提供了几个用于操作文件路径的静态方法,其中包括Combine方法和GetFullPath方法,Combine方法将两个路径合并在一起,但不会解析包含相对元素... 目录C#合并和解析相对路径System.IO.Path类幸运的是总结C#合并和解析相对路径对于 C

Java解析JSON的六种方案

《Java解析JSON的六种方案》这篇文章介绍了6种JSON解析方案,包括Jackson、Gson、FastJSON、JsonPath、、手动解析,分别阐述了它们的功能特点、代码示例、高级功能、优缺点... 目录前言1. 使用 Jackson:业界标配功能特点代码示例高级功能优缺点2. 使用 Gson:轻量

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

bat脚本启动git bash窗口,并执行命令方式

《bat脚本启动gitbash窗口,并执行命令方式》本文介绍了如何在Windows服务器上使用cmd启动jar包时出现乱码的问题,并提供了解决方法——使用GitBash窗口启动并设置编码,通过编写s... 目录一、简介二、使用说明2.1 start.BAT脚本2.2 参数说明2.3 效果总结一、简介某些情

python解析HTML并提取span标签中的文本

《python解析HTML并提取span标签中的文本》在网页开发和数据抓取过程中,我们经常需要从HTML页面中提取信息,尤其是span元素中的文本,span标签是一个行内元素,通常用于包装一小段文本或... 目录一、安装相关依赖二、html 页面结构三、使用 BeautifulSoup javascript

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听