rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费

本文主要是介绍rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  1. 业务描述

    由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个broker组成的集群): 
    图1

  2. producer如何实现

    producer只需发送消息时调用如下方法即可

    /*** 发送有序消息** @param messageMap 消息数据* @param selector   队列选择器,发送时会回调* @param order      回调队列选择器时,此参数会传入队列选择方法,提供配需规则* @return 发送结果*/
    public Result<SendResult> send(Message msg, MessageQueueSelector selector, Object arg)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    关键是如何实现MessageQueueSelector:

    class IDHashMessageQueueSelector implements MessageQueueSelector{public MessageQueue select(List<MessageQueue> mqs, Message msg,Object arg) {int id = Integer.parseInt(arg.toString());int size = mqs.size();int index = id%size;return mqs.get(index);}
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这样,所有的消息会根据消息的尾号,轮询的落到相应的queue上。参考图2,假设id=10001231,由于一共有20个queue,所以10001231%20=11,故消息会落到broker-b queue-1上。 
    图2

  3. consumer端如何实现

    针对consumer由于没有限制是顺序消费,故可以采用集群消费模式的DefaultMQPushConsumer,由于一个消费者消费一类queue,故需要10个consumer group,比如consumer group0需要消费的queue为broker-a queue-0和broker-b queue-0,如下图的概示: 
    这里写图片描述
    那么需要自己实现一个AllocateMessageQueueStrategy进行queue的分配,我们假设consumer group的名字格式需要提前定好,如xxx{queueid}ConsumerGroup,那么实现如下:

    public class AllocateMessageQueueByHashAveragely extends AllocateMessageQueueAveragely{
    private final Logger log = ClientLogger.getLog();
    @Override
    public String getName() {return super.getName()+"ByIDHash";
    }@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID,List<MessageQueue> mqAll, List<String> cidAll) {//解析queue idchar idChar = consumerGroup.charAt(consumerGroup.length() - "ConsumerGroup".length() - 1);int id = Integer.parseInt(idChar+"");List<MessageQueue> submq = new ArrayList<MessageQueue>();//根据queue id分配相应的MessageQueuefor(MessageQueue mq : mqAll) {if(mq.getQueueId() == idChar || mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {submq.add(mq);}}if(submq.size() == 0) {log.warn("allocate err:"+consumerGroup+","+currentCID+","+cidAll+","+mqAll);}return super.allocate(consumerGroup, currentCID, submq, cidAll);
    }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    借助AllocateMessageQueueAveragely来实现,以便有多个jvm的消费者时,能够进行集群消费,但是针对上面这个例子,消费者jvm实例不能超过2个,至于为什么,参照下图: 
    这里写图片描述

这篇关于rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1089600

相关文章

linux生产者,消费者问题

pthread_cond_wait() :用于阻塞当前线程,等待别的线程使用pthread_cond_signal()或pthread_cond_broadcast来唤醒它。 pthread_cond_wait() 必须与pthread_mutex 配套使用。pthread_cond_wait()函数一进入wait状态就会自动release mutex。当其他线程通过pthread

问题:第一次世界大战的起止时间是 #其他#学习方法#微信

问题:第一次世界大战的起止时间是 A.1913 ~1918 年 B.1913 ~1918 年 C.1914 ~1918 年 D.1914 ~1919 年 参考答案如图所示

C++工程编译链接错误汇总VisualStudio

目录 一些小的知识点 make工具 可以使用windows下的事件查看器崩溃的地方 dumpbin工具查看dll是32位还是64位的 _MSC_VER .cc 和.cpp 【VC++目录中的包含目录】 vs 【C/C++常规中的附加包含目录】——头文件所在目录如何怎么添加,添加了以后搜索头文件就会到这些个路径下搜索了 include<> 和 include"" WinMain 和

2024.6.24 IDEA中文乱码问题(服务器 控制台 TOMcat)实测已解决

1.问题产生原因: 1.文件编码不一致:如果文件的编码方式与IDEA设置的编码方式不一致,就会产生乱码。确保文件和IDEA使用相同的编码,通常是UTF-8。2.IDEA设置问题:检查IDEA的全局编码设置和项目编码设置是否正确。3.终端或控制台编码问题:如果你在终端或控制台看到乱码,可能是终端的编码设置问题。确保终端使用的是支持你的文件的编码方式。 2.解决方案: 1.File -> S

vcpkg安装opencv中的特殊问题记录(无法找到opencv_corexd.dll)

我是按照网上的vcpkg安装opencv方法进行的(比如这篇:从0开始在visual studio上安装opencv(超详细,针对小白)),但是中间出现了一些别人没有遇到的问题,虽然原因没有找到,但是本人给出一些暂时的解决办法: 问题1: 我在安装库命令行使用的是 .\vcpkg.exe install opencv 我的电脑是x64,vcpkg在这条命令后默认下载的也是opencv2:x6

问题-windows-VPN不正确关闭导致网页打不开

为什么会发生这类事情呢? 主要原因是关机之前vpn没有关掉导致的。 至于为什么没关掉vpn会导致网页打不开,我猜测是因为vpn建立的链接没被更改。 正确关掉vpn的时候,会把ip链接断掉,如果你不正确关掉,ip链接没有断掉,此时你vpn又是没启动的,没有域名解析,所以就打不开网站。 你可以在打不开网页的时候,把vpn打开,你会发现网络又可以登录了。 方法一 注意:方法一虽然方便,但是可能会有

vue同页面多路由懒加载-及可能存在问题的解决方式

先上图,再解释 图一是多路由页面,图二是路由文件。从图一可以看出每个router-view对应的name都不一样。从图二可以看出层路由对应的组件加载方式要跟图一中的name相对应,并且图二的路由层在跟图一对应的页面中要加上components层,多一个s结尾,里面的的方法名就是图一路由的name值,里面还可以照样用懒加载的方式。 页面上其他的路由在路由文件中也跟图二是一样的写法。 附送可能存在

vue+elementui--$message提示框被dialog遮罩层挡住问题解决

最近碰到一个先执行this.$message提示内容,然后接着弹出dialog带遮罩层弹框。那么问题来了,message提示框会默认被dialog遮罩层挡住,现在就是要解决这个问题。 由于都是弹框,问题肯定是出在z-index比重问题。由于用$message方式是写在js中而不是写在html中所以不是很好直接去改样式。 不过好在message组件中提供了customClass 属性,我们可以利用

Visual Studio中,MSBUild版本问题

假如项目规定了MSBUild版本,那么在安装完Visual Studio后,假如带的MSBUild版本与项目要求的版本不符合要求,那么可以把需要的MSBUild添加到系统中,然后即可使用。步骤如下:            假如项目需要使用V12的MSBUild,而安装的Visual Studio带的MSBUild版本为V14。 ①到MSDN下载V12 MSBUild包,把V12包解压到目录(

YOLO v3 训练速度慢的问题

一天一夜出了两个模型,仅仅迭代了200次   原因:编译之前没有将Makefile 文件里的GPU设置为1,编译的是CPU版本,必须训练慢   解决方案: make clean  vim Makefile make   再次训练 速度快了,5分钟迭代了500次