本文主要是介绍RocketMQ源码分析----Producer队列选择与容错策略,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
队列选择
在HA的文章里大概讲了一下Producer如何为高可用贡献出一份力量的,当时只是说了遍历列表选择队列,然后选择一个,没有深入分析,这篇文章深入分析一下其源码,首先从发送消息选择队列的代码开始:
String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue tmpmq = this.selectOneMessageQueue(lastBrokerName);
if (tmpmq != null) {mq = tmpmq;
//....
如上,如果发送失败了,重试的时候lastBrokerName将不为空,进入到selectOneMessageQueue方法
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}
具体实现在mqFaultStrategy中
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
这篇关于RocketMQ源码分析----Producer队列选择与容错策略的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!