RocketMQ高级特性三-消费者分类

2024-09-04 07:04

本文主要是介绍RocketMQ高级特性三-消费者分类,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

前言

概述

区别

PullConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

SimpleConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

PushConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

总结


前言

RocketMQ中的消费者分类主要包括三种类型:PullConsumerSimpleConsumer、和PushConsumer。每种消费者类型都有其特定的使用场景、原理机制以及优缺点。

注:生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。

概述

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取--->消息处理--->消费状态提交。

针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:

在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。

区别

对比项PushConsumerSimpleConsumerPullConsumer
接口方式使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。业务方自行实现消息处理,并主动调用接口返回消费结果。业务方自行按队列拉取消息,并可选择性地提交消费结果
消费并发度管理由SDK管理消费并发度。由业务方消费逻辑自行管理消费线程。由业务方消费逻辑自行管理消费线程。
负载均衡粒度5.0 SDK是消息粒度,更均衡,早期版本是队列维度消息粒度,更均衡队列粒度,吞吐攒批性能更好,但容易不均衡
接口灵活度高度封装,不够灵活。原子接口,可灵活自定义。原子接口,可灵活自定义。
适用场景适用于无自定义流程的业务消息开发场景。适用于需要高度自定义业务流程的业务开发场景。仅推荐在流处理框架场景下集成使用

PullConsumer

定义与概述

PullConsumer是一种传统的消息拉取模式,在这种模式下,消费者需要主动从Broker中拉取消息,而不是由Broker主动推送消息。这种模式提供了更大的灵活性,允许消费者根据自身的处理能力和业务逻辑,自主控制消息的拉取和消费节奏。

原理机制

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

  • 消息拉取:消费者调用pull方法,从Broker的消息队列中主动拉取消息。拉取的过程通常是循环进行的,消费者可以在每次拉取时指定要拉取的消息数量和位置。
  • 偏移量管理:消费者需要自行管理消息消费的偏移量(offset),确保每次拉取的消息都是未消费过的,防止消息重复消费。
  • 流控管理:消费者可以根据自身的处理能力和当前系统的负载情况,动态调整消息的拉取频率和数量,避免处理能力不足导致的消息积压。
使用场景
  • 批处理:适合需要批量处理消息的场景,例如数据分析、日志处理等。
  • 自定义处理逻辑:在需要对消息进行复杂的过滤、分组或排序处理时,PullConsumer提供了更大的灵活性。

PushConsumer的消费监听器执行结果分为以下三种情况:

  • 返回消费成功:以Java SDK为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。

  • 返回消费失败:以Java SDK为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。

  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。消息超时,请参见PushConsumer消费重试策略。

注:出现消费超时情况时,SDK虽然提交消费失败结果,但是当前消费线程可能仍然无法响应中断,还会继续处理消息。

优缺点
  • 优点
    • 灵活性高:消费者可以完全控制消息拉取的时机和频率,适合对消费策略要求较高的场景。
    • 流控能力强:通过手动控制拉取频率,消费者能够避免消息积压和系统过载。
  • 缺点
    • 实现复杂:消费者需要手动管理偏移量、处理消息的幂等性问题,并实现流控逻辑。
    • 实时性较低:由于消息是被动拉取的,实时性相对较低。
Java 代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.Set;public class PullConsumerExample {public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");for (MessageQueue mq : mqs) {long offset = getMessageQueueOffset(mq);PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);processPullResult(pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());}consumer.shutdown();}private static long getMessageQueueOffset(MessageQueue mq) {// 获取消费队列的偏移量return 0;}private static void putMessageQueueOffset(MessageQueue mq, long offset) {// 更新消费队列的偏移量}private static void processPullResult(PullResult pullResult) {// 处理拉取的消息switch (pullResult.getPullStatus()) {case FOUND:// 处理消息break;case NO_NEW_MSG:// 没有新消息break;default:break;}}
}

SimpleConsumer

定义与概述

SimpleConsumer是RocketMQ 4.8.0版本之后引入的一种消费模式,它是PullConsumer的简化版本,适用于需要拉取消息但不想管理复杂消费逻辑的场景。SimpleConsumer简化了消息拉取和流控的实现,提供了更为直观和易用的API。

原理机制
  • 简化的拉取模型:SimpleConsumer内置了部分PullConsumer的逻辑,如偏移量管理和拉取频率控制,使开发者可以更专注于业务逻辑的实现。
  • 异步拉取:SimpleConsumer支持异步拉取消息,进一步简化了使用流程,并减少了开发者在流控管理上的负担。
  • 偏移量自动管理:SimpleConsumer可以自动管理消息的偏移量,无需手动维护,降低了重复消费的风险。
使用场景
  • 轻量级消费:适合对消费量不大或对消费逻辑要求较简单的场景,例如小型任务处理和轻量级的数据收集。
  • 异步任务处理:适合需要异步处理消息的场景,通过异步拉取简化了业务逻辑的实现。
优缺点
  • 优点
    • 简单易用:相较于PullConsumer,SimpleConsumer大大简化了开发难度,降低了使用门槛。
    • 自动管理:自动偏移量管理和异步拉取降低了消息丢失和重复消费的风险。
  • 缺点
    • 灵活性不足:由于自动管理逻辑的加入,SimpleConsumer在一些复杂场景下可能不如PullConsumer灵活。
    • 适用场景有限:由于其简化的设计,SimpleConsumer更适合处理较为简单的消费逻辑。
Java 代码示例
import org.apache.rocketmq.client.consumer.SimpleConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;public class SimpleConsumerExample {public static void main(String[] args) throws MQClientException {SimpleConsumer consumer = new SimpleConsumer("ConsumerGroupName", "localhost:9876");MessageQueue mq = new MessageQueue("TopicTest", "BrokerName", 0);long offset = consumer.searchOffset(mq, System.currentTimeMillis());List<Message> messages = consumer.pull(mq, "*", offset, 10);for (Message message : messages) {System.out.printf("Received Message: %s%n", new String(message.getBody()));}consumer.shutdown();}
}

PushConsumer

定义与概述

PushConsumer是RocketMQ中最常用的消费模式,Broker主动将消息推送给消费者,消费者只需关注如何处理接收到的消息。PushConsumer是事件驱动的消息消费模式,适用于需要实时处理消息的场景。

原理机制
  • 消息推送:Broker会自动将新到达的消息推送到消费者,消费者只需实现相应的处理逻辑。
  • 并发消费:PushConsumer支持多线程并发消费,通过配置线程池可以提高消息处理的并发性。
  • 消费模式:支持两种消费模式:并发消费(ConsumeConcurrently)和顺序消费(ConsumeOrderly)。并发消费不保证消息的顺序性,而顺序消费保证同一个队列内的消息按顺序消费。
使用场景
  • 实时处理:适合需要实时处理消息的场景,如金融交易、监控告警、实时数据分析等。
  • 并发消费:适合需要高并发处理的场景,通过配置多线程提高消费吞吐量。
优缺点
  • 优点
    • 实时性高:消息被推送到消费者后可以立即处理,适用于需要实时响应的业务场景。
    • 易于实现:PushConsumer实现简单,开发者只需关注业务逻辑的实现,而无需关心消息拉取和偏移量管理。
  • 缺点
    • 流控难度大:推模式下流控难度较大,可能出现消息积压的情况,尤其是在消费处理速度跟不上消息推送速度时。
    • 并发限制:虽然支持多线程并发消费,但对于顺序消费模式,仍然可能存在并发处理的局限性。
Java 代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class PushConsumerExample {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("Received Message: %s%n", new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

总结

在RocketMQ的消费者分类中:

  • PullConsumer提供了灵活的消息拉取机制,适用于需要自主控制消费节奏的场景。
  • SimpleConsumer在PullConsumer的基础上进行了简化,适用于简单的消费场景。
  • PushConsumer则是最常用的模式,适用于需要实时处理消息的场景。

通过对不同消费者类型的理解和选择,可以更好地满足不同业务场景的需求。

这篇关于RocketMQ高级特性三-消费者分类的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1135329

相关文章

新特性抢先看! Ubuntu 25.04 Beta 发布:Linux 6.14 内核

《新特性抢先看!Ubuntu25.04Beta发布:Linux6.14内核》Canonical公司近日发布了Ubuntu25.04Beta版,这一版本被赋予了一个活泼的代号——“Plu... Canonical 昨日(3 月 27 日)放出了 Beta 版 Ubuntu 25.04 系统镜像,代号“Pluc

kotlin中的行为组件及高级用法

《kotlin中的行为组件及高级用法》Jetpack中的四大行为组件:WorkManager、DataBinding、Coroutines和Lifecycle,分别解决了后台任务调度、数据驱动UI、异... 目录WorkManager工作原理最佳实践Data Binding工作原理进阶技巧Coroutine

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

Rust中的Drop特性之解读自动化资源清理的魔法

《Rust中的Drop特性之解读自动化资源清理的魔法》Rust通过Drop特性实现了自动清理机制,确保资源在对象超出作用域时自动释放,避免了手动管理资源时可能出现的内存泄漏或双重释放问题,智能指针如B... 目录自动清理机制:Rust 的析构函数提前释放资源:std::mem::drop android的妙

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

深入解析Spring TransactionTemplate 高级用法(示例代码)

《深入解析SpringTransactionTemplate高级用法(示例代码)》TransactionTemplate是Spring框架中一个强大的工具,它允许开发者以编程方式控制事务,通过... 目录1. TransactionTemplate 的核心概念2. 核心接口和类3. TransactionT

五大特性引领创新! 深度操作系统 deepin 25 Preview预览版发布

《五大特性引领创新!深度操作系统deepin25Preview预览版发布》今日,深度操作系统正式推出deepin25Preview版本,该版本集成了五大核心特性:磐石系统、全新DDE、Tr... 深度操作系统今日发布了 deepin 25 Preview,新版本囊括五大特性:磐石系统、全新 DDE、Tree

Python中列表的高级索引技巧分享

《Python中列表的高级索引技巧分享》列表是Python中最常用的数据结构之一,它允许你存储多个元素,并且可以通过索引来访问这些元素,本文将带你深入了解Python列表的高级索引技巧,希望对... 目录1.基本索引2.切片3.负数索引切片4.步长5.多维列表6.列表解析7.切片赋值8.删除元素9.反转列表

正则表达式高级应用与性能优化记录

《正则表达式高级应用与性能优化记录》本文介绍了正则表达式的高级应用和性能优化技巧,包括文本拆分、合并、XML/HTML解析、数据分析、以及性能优化方法,通过这些技巧,可以更高效地利用正则表达式进行复杂... 目录第6章:正则表达式的高级应用6.1 模式匹配与文本处理6.1.1 文本拆分6.1.2 文本合并6

基于人工智能的图像分类系统

目录 引言项目背景环境准备 硬件要求软件安装与配置系统设计 系统架构关键技术代码示例 数据预处理模型训练模型预测应用场景结论 1. 引言 图像分类是计算机视觉中的一个重要任务,目标是自动识别图像中的对象类别。通过卷积神经网络(CNN)等深度学习技术,我们可以构建高效的图像分类系统,广泛应用于自动驾驶、医疗影像诊断、监控分析等领域。本文将介绍如何构建一个基于人工智能的图像分类系统,包括环境