本文主要是介绍RocketMQ源码分析----Producer启动过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
总体流程
首先从demo为入口分析整个启动过程
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {String message = "send message";final Message msg = new Message("topic", "tag",// tagmessage.getBytes());// bodySendResult sendResult = producer.send(msg);producer.shutdown();} catch (Exception e) {e.printStackTrace();}}
DefaultMQProducer构造方法如下:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {this.producerGroup = producerGroup;defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);}public DefaultMQProducer(final String producerGroup) {this(producerGroup, null);}
然后调用start方法后,即开始初始化流程,其间接调用了DefaultMQProducerImpl的start方法,即上面构造方法创建的DefaultMQProducerImpl,代码如下:
public void start() throws MQClientException {this.start(true);}public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查producerGroupthis.checkConfig();// 如果producerGroup为该值,则改变instanceName为pidif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 获取或者创建客户端实例this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 实例注册,实际是放到一个map中,producerGroupboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 是否要启动该实例mQClientFactory.start();}this.serviceState = ServiceState.RUNNING;break;case 其他状态不处理://....}// 心跳发送this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}
MQClientInstance创建和启动
MQClientInstance主要是客户端的一些操作的集合(不知道该怎么总结好…),通过MQClientManager的getAndCreateMQClientInstance方法创建,这里使用了单例模式去创建MQClientManager
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {return getAndCreateMQClientInstance(clientConfig, null);}public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {//ip@instanceName@unitNameString clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {// 创建MQClientInstance并放入map中instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;} else {}}return instance;}
构造方法如下:
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.clientConfig = clientConfig;this.instanceIndex = instanceIndex;// 通信服务的一些配置this.nettyClientConfig = new NettyClientConfig();//设置线程数this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());//网络请求的处理器this.clientRemotingProcessor = new ClientRemotingProcessor(this);// 客户端一些基础APIthis.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);if (this.clientConfig.getNamesrvAddr() != null) {this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());}this.clientId = clientId;this.mQAdminImpl = new MQAdminImpl(this);// 拉取消息服务this.pullMessageService = new PullMessageService(this);// rebalance服务this.rebalanceService = new RebalanceService(this);//创建一个group为CLIENT_INNER_PRODUCER_GROUP的producer// 该producer主要用于将消息发回Brokerthis.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);// 将用户自建的producer属性复制过来lithis.defaultMQProducer.resetClientConfig(clientConfig);// Consumer一些消费情况(TPS,RT等)的统计管理this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);}
可以看到MQClientInstance里包含了客户端(Producer、Consumer)的一些通用操作以及一些属性的保存,其中一些具体是功能后续文章分析
上面有点需要注意:
- 在判断producerGroup为MixAll.CLIENT_INNER_PRODUCER_GROUP时会改变InstanceName为pid,为什么需要这样做呢?
个人理解如下:
由于该Producer是系统内置的producer,用户系统行为的消息发送,如果一个JVM内用户启动了多个Producer,那么内置的producer是可以用一个的,所以这里将instanceName改成pid就好了,那么在getAndCreateMQClientInstance中获取的clientId(ip@instanceName@unitName)就会一样,就能取出之前创建的MQClientInstance对象
启动方法如下:
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}//netty相关通信的初始化,然后开启一个定时任务扫描responseTablethis.mQClientAPIImpl.start();// 启动定时任务this.startScheduledTask();// 启动消息拉取服务this.pullMessageService.start();// 启动Rebalance拉取服务this.rebalanceService.start();// 启动内置producerthis.defaultMQProducer.getDefaultMQProducerImpl().start(false);this.serviceState = ServiceState.RUNNING;break;case//其他状态不处理;}}}
启动定时任务
MQClientInstance启动的时候会把一些客户端相关的服务都启动,另外启动了很多重要的定时任务:
- 拉取NameServer地址
- 更新Topic路由信息
- 清除下线的Broker
- 发送心跳给所有Broker
- 持久化Consumer的消费offset
- 调整线程池
整个启动的大体过程就是如此,细节很多,不在这里展开,会在另外文章具体分析
这篇关于RocketMQ源码分析----Producer启动过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!