RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message

本文主要是介绍RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

场景

        想创建一个新的consumer去消费一个已经再使用的topic时,默认情况下会从topic中的第一条消息开始消费,大多数情况是需要从最新的消息开始。然后再使用CONSUME_FROM_LAST_OFFSET设置时并不会对新的consumer生效,它只是在停用consumer重新启用时,如果之前订阅OFFSET消息已经不存在了(默认rocketmq中存放的消息是72小时)就会从最后一条开始。所以代码层面无法实现新的consumer订阅topic最新消息开始消费的操作。

如何实现

        RocketMQ为我们提供了一个强大的消息控制台rocketmq-dashboard,其中就有消息相关的控制功能。操作步骤如下:

  1.创建新的consumer,已存在可以跳过这一步

Consumer>add

  • clusterName:选择需要订阅rocketmq集群名称
  • brokerName:选择需要订阅的broker名称,可能是多组集群
  • groupName:输入新建的Consumer名称
  • consumeEnable:是否开启Consumer消费,这里先将消费关闭(灰色),启动后再开启消费。
  2.启动消费组程序,已启动的跳过。

  • 启动消费程序后就可以看到Consumer消费节点数量,延时的消息数量。
  • 因为我们consumeEnable为false关闭状态,所有Consumer并没有消费消息。
  3.设置Consumer消费offset
  • Topic>SKIP_MESSAGE_ACCUMULATE

  • 在Topic栏目中选择需要跳过的topic,点击SKIP_MESSAGE_ACCUMULATE操作
  • 在SKIP_MESSAGE_ACCUMULATE弹出框SubscriptionGroup中选择需要操作的消费组
  • commit后生效,弹出设置的消费信息框如下

  • 如果新的Consumer订阅了多个topic,也需要将其他进行操作。
4.开启consumeEnable开关

Consumer>Config

  • 在Consumer的模块中,找到新增的消费组SubscriptionGroup ,打开后面Config窗口

  •  打开consumeEnable开关(红色),如果有过个broker集群需要都打开。
  • 设置完成后消费程序会开始消费新的消息

这篇关于RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/993515

相关文章

MySQL新增字段后Java实体未更新的潜在问题与解决方案

《MySQL新增字段后Java实体未更新的潜在问题与解决方案》在Java+MySQL的开发中,我们通常使用ORM框架来映射数据库表与Java对象,但有时候,数据库表结构变更(如新增字段)后,开发人员可... 目录引言1. 问题背景:数据库与 Java 实体不同步1.1 常见场景1.2 示例代码2. 不同操作

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

MySQL报错sql_mode=only_full_group_by的问题解决

《MySQL报错sql_mode=only_full_group_by的问题解决》本文主要介绍了MySQL报错sql_mode=only_full_group_by的问题解决,文中通过示例代码介绍的非... 目录报错信息DataGrip 报错还原Navicat 报错还原报错原因解决方案查看当前 sql mo

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

matlab读取NC文件(含group)

matlab读取NC文件(含group): NC文件数据结构: 代码: % 打开 NetCDF 文件filename = 'your_file.nc'; % 替换为你的文件名% 使用 netcdf.open 函数打开文件ncid = netcdf.open(filename, 'NC_NOWRITE');% 查看文件中的组% 假设我们想读取名为 "group1" 的组groupName

Android13_SystemUI下拉框新增音量控制条

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 Android13_SystemUI下拉框新增音量控制条 一、必备知识二、源码分析对比1.brightness模块分析对比2.statusbar/phone 对应模块对比对比初始化类声明对比构造方法 三、源码修改四、相关资源 一、必备知识 在Android12 版本上面已经完成了功能的实现,目前是在And

详解Tomcat 7的七大新特性和新增功能(1)

http://developer.51cto.com/art/201009/228537.htm http://tomcat.apache.org/tomcat-7.0-doc/index.html  Apache发布首个Tomcat 7版本已经发布了有一段时间了,Tomcat 7引入了许多新功能,并对现有功能进行了增强。很多文章列出了Tomcat 7的新功能,但大多数并没有详细解释它们

【Rocketmq入门-基本概念】

Rocketmq入门-基本概念 名词解释名称服务器(NameServer)消息队列(Message Queue)主题(Topic)标签(Tag)生产者(Producer)消费者(Consumer)拉取模式(Pull)推送模式(Push)消息模型(Message Model) 关键组件Broker消息存储工作流程 名词解释 名称服务器(NameServer) 定义: 名称服务器

AI辅助编程里的 Atom Group 的概念和使用

背景 在我们实际的开发当中,一个需求往往会涉及到多个文件修改,而需求也往往有相似性。 举个例子,我经常需要在 auto-coder中需要添加命令行参数,通常是这样的: /coding 添加一个新的命令行参数 --chat_model 默认值为空 实际上这个需求涉及到以下文件列表: /Users/allwefantasy/projects/auto-coder/src/autocoder/auto

centos7 安装rocketmq4.7.0以及RocketMQ-Console-Ng控制台

一、前置工作 1.1安装jdk8 https://blog.csdn.net/pang_ping/article/details/80570011 1.2安装maven https://www.cnblogs.com/116970u/p/11211963.html 1.3安装git https://blog.csdn.net/xwj1992930/article/details/964