RocketMQ中单消费者订阅多个Topic,会阻塞消费吗?

2024-01-12 09:20

本文主要是介绍RocketMQ中单消费者订阅多个Topic,会阻塞消费吗?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RocketMQ

问题

背景是这样: 最近有个项目用MQ做消息流转,在RocketMQ集群模式下,一个消费者实例,订阅了两个Topic A、B

Topic A:存储的是批量业务消息。
Topic B:存储的是单个业务消息。

有个小伙伴问我:A 先存入了100万消息,消费端线程开始消费 Topic A 数据,消费速度较慢;之后 B 也存入了1万消息,A 的数据积压会阻塞 B 的消费吗?

先说结论:B 消息消费会有一定的延迟,但不会绝对阻塞。

分析

本文 RocketMQ 版本:4.4.0。

从一张类图入手。

项目中,生产者 DefaultMQProducer 和消费者 DefaultMQPushConsumer 启动的时候,都会调用 MQClientInstance#start 方法来启动 MQClientInstance 组件。

MQClientInstance#start 主要工作:

  1. 启动NettyRemotingClient客户端,用于与NameServer和Broker进行通信。

  2. 定时更新Topic所对应路由信息、清除离线broker、向所有在线broker发送心跳包等。

  3. 启动拉取消息服务线程(PullMessageService),异步拉取消息。

  4. 启动负载均衡服务线程(RebalanceService),用于实现消息队列(Message Queue)的负载均衡。

在RocketMQ中,有ProcessQueue和MessageQueue两个概念。

  1. MessageQueue表示一个消息队列,它包含主题、队列ID、Broker地址信息。

  2. ProcessQueue 表示消费者正在消费的消息队列,是一个消费进度的抽象概念。在消费者消费消息时,将消息从MessageQueue中取出,并将消息保存在ProcessQueue中进行消费。包含了消费者消费的消息列表、消息偏移量以及一些状态信息等。

消费端 ProcessQueue 与 Broker 中的 MessageQueue 是一一对应的。

PullMessageService#run

在 PullMessageService 类中定义了 pullRequestQueue 属性,用于存储拉取消息请求(PullRequest)的队列。

PullRequest 拉取消息请求的数据结构定义了:

  • 消费者组。

  • 消费主题。

  • 消费队列。

  • 消费偏移量等。

PullMessageService#run 方法会从队列中取出 PullRequest 并执行,通过不断的发送 PullRequest来持续的拉取消息。

public class PullMessageService extends ServiceThread {private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();@Overridepublic void run() {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);}
}

PullMessageService#pullMessage 方法找到消费组里面的消费实现类,执行对应的逻辑。

private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}

DefaultMQPushConsumerImpl#pullMessage主要功能:

  1. 消费者服务状态校验,判断消费者是否是运行态;

  2. 流控校验,判断ProcessMessage 队列缓存消息数是否超过阀值。

  3. 并发消费和顺序消费相关的校验,并发消费限制最大跨度偏移量(offset),顺序消费则判断是否锁定,未锁定设置消费点位。

  4. 创建匿名内部类PullCallback,后续拉取消息返回响应后会回调onSuccess完成消息的消费;

  5. 调用PullAPIWrapper#pullKernelImpl拉取消息。

public class DefaultMQPushConsumerImpl implements MQConsumerInner {public void pullMessage(final PullRequest pullRequest) {//校验//判断...//创建匿名内部类PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {switch (pullResult.getPullStatus()) {case FOUND:有新消息,封装请求提交到线程池,自定义消费者消费数据DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);case NO_NEW_MSG:case NO_MATCHED_MSG:没有新消息或未匹配到消息时,继续往队列放入拉取消息请求(PullRequest),循环调用。...}}}
}

PullStatus状态定义值,消息的中间状态,枚举:

  1. FOUND:拉取到消息。

  2. NO_NEW_MSG:没有新消息。

  3. NO_MATCHED_MSG:没有匹配的消息。

  4. OFFSET_ILLEGAL:非法的偏移量,可能设置的太大或大小。

ConsumeMessageConcurrentlyService#submitConsumeRequest 消费线程入口是这里,封装一个consumerRequest对象来执行业务 初始化消费者线程池,会根据配置来创建消费线程数。

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {//线程池,用于执行自定义消费逻辑private final ThreadPoolExecutor consumeExecutor;@Overridepublic void submitConsumeRequest(提交任务,如定义了消费消息最大值时,会根据设置的值进行分割,然后提交到线程池待执行。)class ConsumeRequest implements Runnable {@Overridepublic void run() {//找到自定义消息监听类MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;...//执行对应的实现类,消费数据status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);...}}
}

回到PullAPIWrapper#pullKernelImpl拉取消息方法中,最终会往Broker发送请求。

NettyRemotingAbstract#invokeAsyncImpl

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback){//获取信号量,最大为65535个boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);//缓存正在执行的请求this.responseTable.put(opaque, responseFuture);channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {//设置响应状态responseFuture.setSendRequestOK(true);return;}requestFail(opaque);log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));}});}
}

上面代码中创建了ResponseFuture对象,放在了responseTable Map对象中,后续会获取后回调对应的方法。

也就是回调: MQClientAPIImpl#pullMessageAsync

private void pullMessageAsync(final String addr,final RemotingCommand request,final long timeoutMillis,final PullCallback pullCallback) {this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {RemotingCommand response = responseFuture.getResponseCommand();if (response != null) {PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);assert pullResult != null;//最终会回调PullMessageService#pullMessage方法中定义的回调函数,完成消息的分发pullCallback.onSuccess(pullResult);}}});
}

拉取到的消息会调用PullMessageService#pullMessage方法中定义的回调函数,完成消息的分发。

RebalanceService#run

定时任务,重分配消息队列。

public class RebalanceService extends ServiceThread {
@Overridepublic void run() {while (!this.isStopped()) {//20s轮询this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}}
}

最终会调用RebalanceImpl#updateProcessQueueTableInRebalance 重新分配 topic 队列。

updateProcessQueueTableInRebalance方法主要功能:

  1. 获取当前消费者的订阅信息,包括订阅的主题、消费者组等。

  2. 获取当前消费者的消费进度,包括已消费的消息队列和消息偏移量等。

  3. 获取当前主题的所有消息队列。

  4. 根据消费者组和消息队列数量进行负载均衡,将消息队列分配给消费者。

  5. 更新消费者的消费进度,将已消费的消息队列和消息偏移量更新为新分配的消息队列。

  6. 往pullRequestQueue队列中写入PullRequest请求数据。

最终也会生成拉取消息请求,放入队列中,等待PullMessageService线程执行。

答案?

至此,我们简单分析了消息的拉取、消费流程,那么回到我们之前的问题来。

假设单消费者实例定义了4个消费线程,那么线程池会创建4个核心线程用来执行任务。

生产者往Topic A先写入100万消息后,拉取线程会从这8个队列(Topic A和Topic B各4个)拉取消息,此时拉取到A的数据会放入消费者线程池等待消费者线程消费。

pull线程会持续的拉取,所以会持续的往线程池队列尾部写入Msg任务,按照我们分析的这种场景,Topic B消息虽然后面写入了,但是Topic B消息拉取后的数据放在了队列的中部或者后尾部的位置。

假如Thread 拿到消息后处理较慢,则会导致线程池队列的数据出现积压,也就是会最终会对B数据的消费产生影响,但不是绝对阻塞。

结论:

1. 会存在一定的消费延迟,但不是绝对的阻塞。也不是必须等A消费完,才会消费B的数据。

上测试代码

Topic A的消费类,线程睡眠1s,模拟正常业务处理耗时。

@Slf4j
@MQConsumeService(topic = "A")
public class A extends AbstractMQMsgProcessor {@Overrideprotected boolean consumeMessage(String tag, String keys, MessageExt messageExt) {try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}String message = new String(messageExt.getBody(), StandardCharsets.UTF_8);log.info("A message:{}, {}, {}, {}", message, Thread.currentThread().getName(), LocalDateTime.now(), keys);return true;}
}


Topic B的消费类。

@Slf4j
@MQConsumeService(topic = "B")
public class B extends AbstractMQMsgProcessor {@Overrideprotected boolean consumeMessage(String tag, String keys, MessageExt messageExt) {String message = new String(messageExt.getBody(), StandardCharsets.UTF_8);log.info("B message:{}, {}, {}, {}", message, Thread.currentThread().getName(), LocalDateTime.now(), keys);return true;}
}

模拟先写入A类消息5000条,再写入B类消息100条。

@GetMapping(value = "/send")
public void reportList() throws Exception {for (int i = 0; i<5000; i++) {processor.aysncSend("A""*""你好" + i);}Thread.sleep(10000);for (int i = 0; i<100; i++) {processor.aysncSend("B""*""hello" + i);}
}

16:20左右A类消息发送完毕,开始消费。

16:21左右B类消息发送完毕,此时待消费线程池队列中任务数在持续增加,可以看到在持续拉取A类消息,放入队列中待消费。

16:31分B类消息被消费,因为消费者线程是批量消费消息的,所以此时B类的10条消息都是由 ConsumeMessageThread_2 执行的。

后面A、B类消息会持续消费,最终在 16:46分所有消息消费完成。

总结

本文分析的版本是4.x,假如是最新版本,结论可能不一样,RockeMQ 5.0有了全新的消费模式-POP,对原有的消费模型的更新。

5.0之前的客户端架构中,拉取到消息之后会先将消息缓存到 ProcessQueue 中,当需要消费时,会从 ProcessQueue 中取出对应的消息进行消费,当消费成功之后再将消息从 ProcessQueue 中 remove 走。其中重试消息的发送,位点的更新在这个过程中穿插。

新版本在ProcessQueue 中维护了2个队列,有兴趣的同学可以去了解下,这里就不展开了。

参考资料

  1. https://xie.infoq.cn/article/68d0ef479b65ae4431f10f67e

这篇关于RocketMQ中单消费者订阅多个Topic,会阻塞消费吗?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/597398

相关文章

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

ActiveMQ—Queue与Topic区别

Queue与Topic区别 转自:http://blog.csdn.net/qq_21033663/article/details/52458305 队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:         1、点对点(point-to-point,简称PTP)Queue消息传递模型:         通过该消息传递模型,一个应用程序(即消息生产者)可以

springboot体会BIO(阻塞式IO)

使用springboot体会阻塞式IO 大致的思路为: 创建一个socket服务端,监听socket通道,并打印出socket通道中的内容。 创建两个socket客户端,向socket服务端写入消息。 1.创建服务端 public class RedisServer {public static void main(String[] args) throws IOException {

多路转接之select(fd_set介绍,参数详细介绍),实现非阻塞式网络通信

目录 多路转接之select 引入 介绍 fd_set 函数原型 nfds readfds / writefds / exceptfds readfds  总结  fd_set操作接口  timeout timevalue 结构体 传入值 返回值 代码 注意点 -- 调用函数 select的参数填充  获取新连接 注意点 -- 通信时的调用函数 添加新fd到

java线程深度解析(五)——并发模型(生产者-消费者)

http://blog.csdn.net/Daybreak1209/article/details/51378055 三、生产者-消费者模式     在经典的多线程模式中,生产者-消费者为多线程间协作提供了良好的解决方案。基本原理是两类线程,即若干个生产者和若干个消费者,生产者负责提交用户请求任务(到内存缓冲区),消费者线程负责处理任务(从内存缓冲区中取任务进行处理),两类线程之

struts2中的json返回指定的多个参数

要返回指定的多个参数,就必须在struts.xml中的配置如下: <action name="goodsType_*" class="goodsTypeAction" method="{1}"> <!-- 查询商品类别信息==分页 --> <result type="json" name="goodsType_findPgae"> <!--在这一行进行指定,其中lis是一个List集合,但

【Rocketmq入门-基本概念】

Rocketmq入门-基本概念 名词解释名称服务器(NameServer)消息队列(Message Queue)主题(Topic)标签(Tag)生产者(Producer)消费者(Consumer)拉取模式(Pull)推送模式(Push)消息模型(Message Model) 关键组件Broker消息存储工作流程 名词解释 名称服务器(NameServer) 定义: 名称服务器

一款支持同一个屏幕界面同时播放多个视频的视频播放软件

GridPlayer 是一款基于 VLC 的免费开源跨平台多视频同步播放工具,支持在一块屏幕上同时播放多个视频。其主要功能包括: 多视频播放:用户可以在一个窗口中同时播放任意数量的视频,数量仅受硬件性能限制。支持多种格式和流媒体:GridPlayer 支持所有由 VLC 支持的视频格式以及流媒体 URL(如 m3u8 链接)。自定义网格布局:用户可以配置播放器的网格布局,以适应不同的观看需求。硬

多线程篇(阻塞队列- LinkedBlockingDeque)(持续更新迭代)

目录 一、LinkedBlockingDeque是什么 二、核心属性详解 三、核心方法详解 addFirst(E e) offerFirst(E e) putFirst(E e) removeFirst() pollFirst() takeFirst() 其他 四、总结 一、LinkedBlockingDeque是什么 首先queue是一种数据结构,一个集合中

多线程篇(阻塞队列- LinkedBlockingQueue)(持续更新迭代)

目录 一、基本概要 1. 构造函数 2. 内部成员 二、非阻塞式添加元素:add、offer方法原理 offer的实现 enqueue入队操作 signalNotEmpty唤醒 删除线程(如消费者线程) 为什么要判断if (c == 0)时才去唤醒消费线程呢? 三、阻塞式添加元素:put 方法原理 图解:put线程的阻塞过程 四、非阻塞式移除:poll方法原理 dequ