RocketMQ广播消费消息

2024-09-06 07:52
文章标签 广播 消息 rocketmq 消费

本文主要是介绍RocketMQ广播消费消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、 基础概念

RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。

集群消费模式(Cluster)
在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。

广播消费模式(Broadcast)
在广播消费模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。

怎么使用广播消费模式呢?其实很简单,通过在消费者的 @RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING,即可将消费者设置为广播模式。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,从而实现了消息的广播消费。

2、 实现

消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** 广播模式*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//根据情况修改消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式//此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费/**这里可以设置两种模式: 默认都是CLUSTERING("CLUSTERING")*     BROADCASTING("BROADCASTING") 广播模式*     CLUSTERING("CLUSTERING") 集群模式*/consumer.setMessageModel(MessageModel.BROADCASTING);//根据情况修改消费的topicconsumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}
}

生产者

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("defaultGroup");//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定mq的地址producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {{Message msg = new Message("TopicTest", // 发送的topic"AAA",  //tags"BBB", // keys"CCC".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容);//同步传递消息,消息会发给集群中的一个Broker节点。//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的//不知道消息是否发送成功,反正Producer发送完了就不管了 .producer.sendOneway(msg);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}

这篇关于RocketMQ广播消费消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141435

相关文章

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

【Rocketmq入门-基本概念】

Rocketmq入门-基本概念 名词解释名称服务器(NameServer)消息队列(Message Queue)主题(Topic)标签(Tag)生产者(Producer)消费者(Consumer)拉取模式(Pull)推送模式(Push)消息模型(Message Model) 关键组件Broker消息存储工作流程 名词解释 名称服务器(NameServer) 定义: 名称服务器

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

centos7 安装rocketmq4.7.0以及RocketMQ-Console-Ng控制台

一、前置工作 1.1安装jdk8 https://blog.csdn.net/pang_ping/article/details/80570011 1.2安装maven https://www.cnblogs.com/116970u/p/11211963.html 1.3安装git https://blog.csdn.net/xwj1992930/article/details/964

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Android 友盟消息推送集成遇到的问题

友盟消息推送遇到的问题 集成友盟消息推送,步骤根据提供的技术文档接入便可。可是当你集成到项目中去的时候,可能并不是一帆风顺就搞定,因为你项目里面是可能集成了其他的sdk(比如支付宝,微信,七鱼等等三方的sdk)。那么这个时候,再加上友盟的消息推送sdk集成可能就会出现问题。 问题清单 友盟消息推送sdk和支付宝sdk冲突问题 后台配置了消息推送,也显示发送成功,但是手机没有收到消息通知

消息队列的理解和应用场景

知乎上的一个通俗理解的优秀答案 by 祁达方 小红是小明的姐姐。 小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。 后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看。 书架就是一个消息队列,小红是生产者,小明是

RocketMQ 介绍

前言 消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360°的刁难。 作为一个在互联网公司面一次拿一次Offer的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。 于是在一个寂寞难耐的夜晚,我痛定思痛,决定开始写《吊打面试官》系列,希望能帮助各位读者以后面试势如破竹,

Linux多线程——POSIX信号量与环形队列版本之生产消费模型

文章目录 POSIX信号量POSIX的操作初始化销毁等待信号量(申请资源)发布信号量(放下资源) 环形队列之生产消费模型 POSIX信号量 POSIX信号量和System V信号量是不同的标准 但是实现的功能是一样的,都是为了解决同步的问题 我们说信号量指的就是资源的数量 在生产者与消费者模型里面,生产者与消费者所认为的资源是不同的 生产者认为空间是资源,因为每次都要