本文主要是介绍RocketMQ中单消费者订阅多个Topic,会阻塞消费吗?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
RocketMQ
问题
背景是这样: 最近有个项目用MQ做消息流转,在RocketMQ集群模式下,一个消费者实例,订阅了两个Topic A、B
。
Topic A:存储的是批量业务消息。
Topic B:存储的是单个业务消息。
有个小伙伴问我:A 先存入了100万消息,消费端线程开始消费 Topic A 数据,消费速度较慢;之后 B 也存入了1万消息,A 的数据积压会阻塞 B 的消费吗?
先说结论:B 消息消费会有一定的延迟,但不会绝对阻塞。
分析
本文 RocketMQ 版本:4.4.0。
从一张类图入手。
项目中,生产者 DefaultMQProducer
和消费者 DefaultMQPushConsumer
启动的时候,都会调用 MQClientInstance#start
方法来启动 MQClientInstance
组件。
MQClientInstance#start 主要工作:
-
启动NettyRemotingClient客户端,用于与NameServer和Broker进行通信。
-
定时更新Topic所对应路由信息、清除离线broker、向所有在线broker发送心跳包等。
-
启动拉取消息服务线程(PullMessageService),异步拉取消息。
-
启动负载均衡服务线程(RebalanceService),用于实现消息队列(Message Queue)的负载均衡。
在RocketMQ中,有ProcessQueue和MessageQueue两个概念。
MessageQueue表示一个消息队列,它包含主题、队列ID、Broker地址信息。
ProcessQueue 表示消费者正在消费的消息队列,是一个消费进度的抽象概念。在消费者消费消息时,将消息从MessageQueue中取出,并将消息保存在ProcessQueue中进行消费。包含了消费者消费的消息列表、消息偏移量以及一些状态信息等。
消费端 ProcessQueue 与 Broker 中的 MessageQueue 是一一对应的。
PullMessageService#run
在 PullMessageService 类中定义了 pullRequestQueue 属性,用于存储拉取消息请求(PullRequest)的队列。
PullRequest 拉取消息请求的数据结构定义了:
-
消费者组。
-
消费主题。
-
消费队列。
-
消费偏移量等。
PullMessageService#run 方法会从队列中取出 PullRequest 并执行,通过不断的发送 PullRequest来持续的拉取消息。
public class PullMessageService extends ServiceThread {private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();@Overridepublic void run() {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);}
}
PullMessageService#pullMessage 方法找到消费组里面的消费实现类,执行对应的逻辑。
private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
DefaultMQPushConsumerImpl#pullMessage主要功能:
-
消费者服务状态校验,判断消费者是否是运行态;
-
流控校验,判断ProcessMessage 队列缓存消息数是否超过阀值。
-
并发消费和顺序消费相关的校验,并发消费限制最大跨度偏移量(offset),顺序消费则判断是否锁定,未锁定设置消费点位。
-
创建匿名内部类PullCallback,后续拉取消息返回响应后会回调onSuccess完成消息的消费;
-
调用PullAPIWrapper#pullKernelImpl拉取消息。
public class DefaultMQPushConsumerImpl implements MQConsumerInner {public void pullMessage(final PullRequest pullRequest) {//校验//判断...//创建匿名内部类PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {switch (pullResult.getPullStatus()) {case FOUND:有新消息,封装请求提交到线程池,自定义消费者消费数据DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);case NO_NEW_MSG:case NO_MATCHED_MSG:没有新消息或未匹配到消息时,继续往队列放入拉取消息请求(PullRequest),循环调用。...}}}
}
PullStatus状态定义值,消息的中间状态,枚举:
-
FOUND:拉取到消息。
-
NO_NEW_MSG:没有新消息。
-
NO_MATCHED_MSG:没有匹配的消息。
-
OFFSET_ILLEGAL:非法的偏移量,可能设置的太大或大小。
ConsumeMessageConcurrentlyService#submitConsumeRequest 消费线程入口是这里,封装一个consumerRequest对象来执行业务 初始化消费者线程池,会根据配置来创建消费线程数。
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {//线程池,用于执行自定义消费逻辑private final ThreadPoolExecutor consumeExecutor;@Overridepublic void submitConsumeRequest(提交任务,如定义了消费消息最大值时,会根据设置的值进行分割,然后提交到线程池待执行。)class ConsumeRequest implements Runnable {@Overridepublic void run() {//找到自定义消息监听类MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;...//执行对应的实现类,消费数据status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);...}}
}
回到PullAPIWrapper#pullKernelImpl拉取消息方法中,最终会往Broker发送请求。
NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback){//获取信号量,最大为65535个boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);//缓存正在执行的请求this.responseTable.put(opaque, responseFuture);channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {//设置响应状态responseFuture.setSendRequestOK(true);return;}requestFail(opaque);log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));}});}
}
上面代码中创建了ResponseFuture对象,放在了responseTable Map对象中,后续会获取后回调对应的方法。
也就是回调: MQClientAPIImpl#pullMessageAsync
private void pullMessageAsync(final String addr,final RemotingCommand request,final long timeoutMillis,final PullCallback pullCallback) {this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {RemotingCommand response = responseFuture.getResponseCommand();if (response != null) {PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);assert pullResult != null;//最终会回调PullMessageService#pullMessage方法中定义的回调函数,完成消息的分发pullCallback.onSuccess(pullResult);}}});
}
拉取到的消息会调用PullMessageService#pullMessage方法中定义的回调函数,完成消息的分发。
RebalanceService#run
定时任务,重分配消息队列。
public class RebalanceService extends ServiceThread {
@Overridepublic void run() {while (!this.isStopped()) {//20s轮询this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}}
}
最终会调用RebalanceImpl#updateProcessQueueTableInRebalance 重新分配 topic 队列。
updateProcessQueueTableInRebalance方法主要功能:
-
获取当前消费者的订阅信息,包括订阅的主题、消费者组等。
-
获取当前消费者的消费进度,包括已消费的消息队列和消息偏移量等。
-
获取当前主题的所有消息队列。
-
根据消费者组和消息队列数量进行负载均衡,将消息队列分配给消费者。
-
更新消费者的消费进度,将已消费的消息队列和消息偏移量更新为新分配的消息队列。
-
往pullRequestQueue队列中写入PullRequest请求数据。
最终也会生成拉取消息请求,放入队列中,等待PullMessageService线程执行。
答案?
至此,我们简单分析了消息的拉取、消费流程,那么回到我们之前的问题来。
假设单消费者实例定义了4个消费线程,那么线程池会创建4个核心线程用来执行任务。
生产者往Topic A先写入100万消息后,拉取线程会从这8个队列(Topic A和Topic B各4个)拉取消息,此时拉取到A的数据会放入消费者线程池等待消费者线程消费。
pull线程会持续的拉取,所以会持续的往线程池队列尾部写入Msg任务,按照我们分析的这种场景,Topic B消息虽然后面写入了,但是Topic B消息拉取后的数据放在了队列的中部或者后尾部的位置。
假如Thread 拿到消息后处理较慢,则会导致线程池队列的数据出现积压,也就是会最终会对B数据的消费产生影响,但不是绝对阻塞。
结论:
1. 会存在一定的消费延迟,但不是绝对的阻塞。也不是必须等A消费完,才会消费B的数据。
上测试代码
Topic A的消费类,线程睡眠1s,模拟正常业务处理耗时。
@Slf4j
@MQConsumeService(topic = "A")
public class A extends AbstractMQMsgProcessor {@Overrideprotected boolean consumeMessage(String tag, String keys, MessageExt messageExt) {try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}String message = new String(messageExt.getBody(), StandardCharsets.UTF_8);log.info("A message:{}, {}, {}, {}", message, Thread.currentThread().getName(), LocalDateTime.now(), keys);return true;}
}
Topic B的消费类。
@Slf4j
@MQConsumeService(topic = "B")
public class B extends AbstractMQMsgProcessor {@Overrideprotected boolean consumeMessage(String tag, String keys, MessageExt messageExt) {String message = new String(messageExt.getBody(), StandardCharsets.UTF_8);log.info("B message:{}, {}, {}, {}", message, Thread.currentThread().getName(), LocalDateTime.now(), keys);return true;}
}
模拟先写入A类消息5000条,再写入B类消息100条。
@GetMapping(value = "/send")
public void reportList() throws Exception {for (int i = 0; i<5000; i++) {processor.aysncSend("A", "*", "你好" + i);}Thread.sleep(10000);for (int i = 0; i<100; i++) {processor.aysncSend("B", "*", "hello" + i);}
}
16:20左右A类消息发送完毕,开始消费。
16:21左右B类消息发送完毕,此时待消费线程池队列中任务数在持续增加,可以看到在持续拉取A类消息,放入队列中待消费。
16:31分B类消息被消费,因为消费者线程是批量消费消息的,所以此时B类的10条消息都是由 ConsumeMessageThread_2 执行的。
后面A、B类消息会持续消费,最终在 16:46分所有消息消费完成。
总结
本文分析的版本是4.x,假如是最新版本,结论可能不一样,RockeMQ 5.0有了全新的消费模式-POP,对原有的消费模型的更新。
5.0之前的客户端架构中,拉取到消息之后会先将消息缓存到 ProcessQueue 中,当需要消费时,会从 ProcessQueue 中取出对应的消息进行消费,当消费成功之后再将消息从 ProcessQueue 中 remove 走。其中重试消息的发送,位点的更新在这个过程中穿插。
新版本在ProcessQueue 中维护了2个队列,有兴趣的同学可以去了解下,这里就不展开了。
参考资料
-
https://xie.infoq.cn/article/68d0ef479b65ae4431f10f67e
这篇关于RocketMQ中单消费者订阅多个Topic,会阻塞消费吗?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!