RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message

本文主要是介绍RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

场景

        想创建一个新的consumer去消费一个已经再使用的topic时,默认情况下会从topic中的第一条消息开始消费,大多数情况是需要从最新的消息开始。然后再使用CONSUME_FROM_LAST_OFFSET设置时并不会对新的consumer生效,它只是在停用consumer重新启用时,如果之前订阅OFFSET消息已经不存在了(默认rocketmq中存放的消息是72小时)就会从最后一条开始。所以代码层面无法实现新的consumer订阅topic最新消息开始消费的操作。

如何实现

        RocketMQ为我们提供了一个强大的消息控制台rocketmq-dashboard,其中就有消息相关的控制功能。操作步骤如下:

  1.创建新的consumer,已存在可以跳过这一步

Consumer>add

  • clusterName:选择需要订阅rocketmq集群名称
  • brokerName:选择需要订阅的broker名称,可能是多组集群
  • groupName:输入新建的Consumer名称
  • consumeEnable:是否开启Consumer消费,这里先将消费关闭(灰色),启动后再开启消费。
  2.启动消费组程序,已启动的跳过。

  • 启动消费程序后就可以看到Consumer消费节点数量,延时的消息数量。
  • 因为我们consumeEnable为false关闭状态,所有Consumer并没有消费消息。
  3.设置Consumer消费offset
  • Topic>SKIP_MESSAGE_ACCUMULATE

  • 在Topic栏目中选择需要跳过的topic,点击SKIP_MESSAGE_ACCUMULATE操作
  • 在SKIP_MESSAGE_ACCUMULATE弹出框SubscriptionGroup中选择需要操作的消费组
  • commit后生效,弹出设置的消费信息框如下

  • 如果新的Consumer订阅了多个topic,也需要将其他进行操作。
4.开启consumeEnable开关

Consumer>Config

  • 在Consumer的模块中,找到新增的消费组SubscriptionGroup ,打开后面Config窗口

  •  打开consumeEnable开关(红色),如果有过个broker集群需要都打开。
  • 设置完成后消费程序会开始消费新的消息

这篇关于RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/993515

相关文章

vue+elementui--$message提示框被dialog遮罩层挡住问题解决

最近碰到一个先执行this.$message提示内容,然后接着弹出dialog带遮罩层弹框。那么问题来了,message提示框会默认被dialog遮罩层挡住,现在就是要解决这个问题。 由于都是弹框,问题肯定是出在z-index比重问题。由于用$message方式是写在js中而不是写在html中所以不是很好直接去改样式。 不过好在message组件中提供了customClass 属性,我们可以利用

一二三应用开发平台应用开发示例(4)——视图类型介绍以及新增、修改、查看视图配置

调整上级属性类型 前面为了快速展示平台的低代码配置功能,将实体文件夹的数据模型上级属性的数据类型暂时配置为文本类型,现在我们调整下,将其数据类型调整为实体,如下图所示: 数据类型需要选择实体,并在实体选择框中选择自身“文件夹” 这时候,再点击生成代码,平台会报错,提示“实体【文件夹】未设置主参照视图”。这是因为文件夹选择的功能页面,同样是基于配置产生的,因为视图我们还没有配置,所以会报错。

rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费

业务描述 由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个broker组成的集群):  producer如何实现 producer只需发送消息时调用如下方法即可 /*** 发送有序消息** @param messageMap 消息数据* @param

jmeter之Thread Group(线程组)

Thread Group(线程组) 1.线程组,或者可以叫用户组,进行性能测试时的用户资源池。 2.是任何一个测试计划执行的开始点。 3.上一篇提到的“控制器”和“HTTP请求”(采集器)必须在线程组内;监听器等其他组件,可以直接放在测试计划下。 线程组设置参数的意义 我们以下图为例,进行详细说明。见下图:  区域1(在取样器错误后要执行的动作) 这个区域的主要作用很明显,在线程内

USB - USB在消费领域的应用

Switching in USB Consumer Applications 通用串行总线(USB)已成为满足终端设备之间日益增长的快速数据传输需求的主流接口--例如,在个人电脑和便携式设备(如手机、数码相机和个人媒体播放器)之间下载和上传数据。 The universal serial bus (USB) has become a dominant

【C++11 之新增容器 array、foward_list、tuple、unordered_(multi)map/set】应知应会

C++11 标准中新增了多个容器,这些容器为 C++ 程序员提供了更多的选择,以满足不同的编程需求。以下是对这些新容器的介绍和使用案例: std::array 介绍: std::array 是一个固定大小的数组容器,它在栈上分配内存,并提供了类似于标准库容器的接口。它提供了更好的类型安全性和范围检查,同时保持了与原生数组相似的性能。std::array 的大小必须在编译时确定,并且不能更改。

VUE2 elementui 动态表单嵌套新增移除

VUE2 elementui 动态表单嵌套新增移除 代码 <template><div><el-button type="text" @click="dialogTableVisible = true">打开嵌套表格的 Dialog</el-button><el-dialog title="收货地址" :visible.sync="dialogTableVisible"><el-form r

mysql的GROUP_CONCAT

完整的语法如下:  group_concat([DISTINCT] 要连接的字段 [Order BY ASC/DESC 排序字段] [Separator '分隔符'])  如果GROUP_CONCAT  后面什么都没有      就是单纯的吧查出的数据分到一个组里面

微服务——重复消费(幂等解决方案)

目录 一、唯一ID机制二、幂等性设计三、状态检查机制四、利用缓存和消息队列五、分布式锁总结 在微服务中,防止重复消费的核心思想是通过设计使得操作一次与多次产生相同的效果,并为每次操作生成唯一的ID。这样,即使在消息被重复发送的情况下,系统也能通过检查ID来避免重复处理。 一、唯一ID机制 在微服务架构中,为每次操作生成唯一ID是防止重复消费的关键步骤。通过为每个操作分配一

SpringCloud Alibaba微服务实战(二) - Nacos服务注册与restTemplate消费

说在前面 基础环境搭建,理论,请看上一篇,在这就不扯理论了,直接上代码。 项目结构 代码实现 第一步:在父pom的项目中引入dependencyManagement 在引入父pom之前咱们先来回顾下dependencyManagement与使用他的原因 什么是dependencyManagement:他可以统一管理项目的版本号,确保应用的各个子项目的依赖和版本一致,当需要变更版本号