本文主要是介绍RocketMQ源码分析----Consumer消费进度相关,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在Consumer消费的时候总有几个疑问:
- 消费完成后,这个消费进度存在哪里
- 消费完成后,还没保存消费进度就挂了,会不会导致重复消费
Consumer
消费进度保存
消费完成后,会返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS告诉MQ消费成功,以MessageListener的consumeMessage为入口分析。
消费的时候,是以ConsumeRequest类为Runnable对象,在线程池中进行处理的,即ConsumeRequest的run方法会处理这个状态
@Overridepublic void run() {//....status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);// 如果这个ProcessQueue废弃了,则不处理if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);}}
在消费完成后,将status交给processConsumeResult处理,代码如下
public void processConsumeResult(//final ConsumeConcurrentlyStatus status, //final ConsumeConcurrentlyContext context, //final ConsumeRequest consumeRequest//) {//....消费成功或者失败的处理// 将这批消息从ProcessQueue中移除,代表消费完毕,并返回当前ProcessQueue中的消息最小的offsetlong offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 更新消费进度this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}
在分析ProcessQueue的时候,说过removeMessage返回有两种情况:
- 如果移除这批消息之后已经没有消息了,那么返回ProcessQueue中最大的offset+1
- 如果还有消息,那么返回treeMap中最小的key,即未消费的消息中最小的offset
getOffsetStore返回RemoteBrokerOffsetStore,看下其实现
@Overridepublic void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {// 通过MessageQueue获取本地的对应的消费进度AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {//increaseOnly 为false则直接覆盖//increaseOnly为true则会判断更新的值比老的值大才会进行更新if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}}
这篇关于RocketMQ源码分析----Consumer消费进度相关的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!