【RocketMQ】RocketMQ自动容灾切换DLedger集群实践

2023-12-17 20:32

本文主要是介绍【RocketMQ】RocketMQ自动容灾切换DLedger集群实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • DLedger配置介绍
  • 新建DLedger集群
    • 机器规划
      • Broker
      • nameserver
      • console
    • Broker配置
      • broker-a
      • broker-a-s1
      • broker-a-s2
      • broker-b
      • broker-b-s1
      • broker-b-s2
      • broker-c
      • broker-c-s1
      • broker-c-s2
    • 验证
  • 旧集群升级
    • 杀掉旧的 Broker
    • 检查旧的 Commitlog
    • 修改配置
    • 重新启动 Broker
  • Tips

本文主要演示如何搭建一个多主多从的自动容灾切换的RocketMQ集群,以及如何把现有普通的Master/Slave集群升级为自动容灾切换的集群。

DLedger配置介绍

​ 部署自动容灾的RocketMQ集群和普通的Master/Slave集群没太多差别,只需要在Broker.conf开启DLedger相关配置即可,官方称之为自动容灾切换的RocketMQ-on-DLedger Group,DLedger是分布式一致性协议Raft的一个实现,是一个轻量级的Java类库,RocketMQ引入其来实现Leader选举等一致性操作。

  • RocketMQ-on-DLedger Group是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在Leader和Follower之间复制数据以保证高可用。
  • RocketMQ-on-DLedger Group能自动容灾切换,并保证数据一致。
  • RocketMQ-on-DLedger Group是可以水平扩展的,也即可以部署任意多个RocketMQ-on-DLedger Group同时对外提供服务。

相关配置

name含义举例
enableDLegerCommitLog是否启动 DLedger,即是否启用RocketMQ主从切换,默认为falsetrue
dLegerGroupDLedger Raft Group的名字,建议和brokerName保持一致RaftNode00
dLegerPeersDLedger Group 内各节点的端口信息,同一个Group 内的各个节点配置必须要保证一致n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId节点 id, 必须属于 dLegerPeers 中的一个;同Group 内各个节点要唯一n0
sendMessageThreadPoolNums发送线程个数,建议配置成 Cpu 核数16

新建DLedger集群

机器规划

Broker

节点ID机器端口dLegerPeer
broker-a0172.24.29.21320911n0-172.24.29.213:10911
broker-a-s11172.24.29.21420911n1-172.24.29.214:10911
broker-a-s22172.24.29.21520911n2-172.24.29.215:10911
broker-b0172.24.29.21420915n0-172.24.29.214:10915
broker-b-s11172.24.29.21320915n1-172.24.29.213:10915
broker-b-s22172.24.29.21520915n2-172.24.29.215:10915
broker-c0172.24.29.21520919n0-172.24.29.215:10919
broker-c-s11172.24.29.21320919n1-172.24.29.213:10919
broker-c-s22172.24.29.21420919n2-172.24.29.214:10919

nameserver

节点机器端口
nameserver172.24.29.2159976

console

节点机器端口
console172.24.29.2158585

Broker配置

broker-a

listenPort = 20911
rocketmqHome = /neworiental/rocketmq-test/rocketmq-a/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-a
#对外提供服务地址
brokerIP1 = 172.24.29.213
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.213
brokerId = 0
storePathRootDir = /neworiental/rocketmq-test/rocketmq-a/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-a/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-a/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-a
dLegerSelfId = n0
dLegerPeers = n0-172.24.29.213:10911;n1-172.24.29.214:10911;n2-172.24.29.215:10911

broker-a-s1

listenPort = 20911
rocketmqHome = /neworiental/rocketmq-test/rocketmq-a-s1/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-a
#对外提供服务地址
brokerIP1 = 172.24.29.214
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.214
brokerId = 1
storePathRootDir = /neworiental/rocketmq-test/rocketmq-a-s1/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-a-s1/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-a-s1/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-a
dLegerSelfId = n1
dLegerPeers = n0-172.24.29.213:10911;n1-172.24.29.214:10911;n2-172.24.29.215:10911

broker-a-s2

listenPort = 20911
rocketmqHome = /neworiental/rocketmq-test/rocketmq-a-s2/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-a
#对外提供服务地址
brokerIP1 = 172.24.29.215
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.215
brokerId = 2
storePathRootDir = /neworiental/rocketmq-test/rocketmq-a-s2/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-a-s2/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-a-s2/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-a
dLegerSelfId = n2
dLegerPeers = n0-172.24.29.213:10911;n1-172.24.29.214:10911;n2-172.24.29.215:10911

broker-b

listenPort = 20915
rocketmqHome = /neworiental/rocketmq-test/rocketmq-b/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-b
#对外提供服务地址
brokerIP1 = 172.24.29.214
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.214
brokerId = 0
storePathRootDir = /neworiental/rocketmq-test/rocketmq-b/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-b/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-b/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-b
dLegerSelfId = n0
dLegerPeers = n0-172.24.29.214:10915;n1-172.24.29.213:10915;n2-172.24.29.215:10915

broker-b-s1

listenPort = 20915
rocketmqHome = /neworiental/rocketmq-test/rocketmq-b-s1/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-b
#对外提供服务地址
brokerIP1 = 172.24.29.213
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.213
brokerId = 1
storePathRootDir = /neworiental/rocketmq-test/rocketmq-b-s1/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-b-s1/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-b-s1/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-b
dLegerSelfId = n1
dLegerPeers = n0-172.24.29.214:10915;n1-172.24.29.213:10915;n2-172.24.29.215:10915

broker-b-s2

listenPort = 20915
rocketmqHome = /neworiental/rocketmq-test/rocketmq-b-s2/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-b
#对外提供服务地址
brokerIP1 = 172.24.29.215
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.215
brokerId = 2
storePathRootDir = /neworiental/rocketmq-test/rocketmq-b-s2/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-b-s2/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-b-s2/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-b
dLegerSelfId = n2
dLegerPeers = n0-172.24.29.214:10915;n1-172.24.29.213:10915;n2-172.24.29.215:10915

broker-c

listenPort = 20919
rocketmqHome = /neworiental/rocketmq-test/rocketmq-c/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-c
#对外提供服务地址
brokerIP1 = 172.24.29.215
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.215
brokerId = 0
storePathRootDir = /neworiental/rocketmq-test/rocketmq-c/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-c/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-c/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-c
dLegerSelfId = n0
dLegerPeers = n0-172.24.29.215:10919;n1-172.24.29.213:10919;n2-172.24.29.214:10919

broker-c-s1

listenPort = 20919
rocketmqHome = /neworiental/rocketmq-test/rocketmq-c-s1/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-c
#对外提供服务地址
brokerIP1 = 172.24.29.213
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.213
brokerId = 1
storePathRootDir = /neworiental/rocketmq-test/rocketmq-c-s1/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-c-s1/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-c-s1/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-c
dLegerSelfId = n1
dLegerPeers = n0-172.24.29.215:10919;n1-172.24.29.213:10919;n2-172.24.29.214:10919

broker-c-s2

listenPort = 20919
rocketmqHome = /neworiental/rocketmq-test/rocketmq-c-s2/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-c
#对外提供服务地址
brokerIP1 = 172.24.29.214
#Broker HAIP地址,供slave同步消息的地址,不配此项则不会进行主从复制
brokerIP2= 172.24.29.214
brokerId = 2
storePathRootDir = /neworiental/rocketmq-test/rocketmq-c-s2/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-c-s2/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-c-s2/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-c
dLegerSelfId = n2
dLegerPeers = n0-172.24.29.215:10919;n1-172.24.29.213:10919;n2-172.24.29.214:10919

验证

通过管理命令可以看到集群信息,如下图,每个组有一个Leader,两个Follower,集群搭建成功,日志也正常

Leader日志:

Follower日志:

下面手动将broker-b的Leader杀掉

操作之前:

Leader是在215上

操作之后:

Leader成功选举为214了

再次将215恢复:

可以看到215成为了新的Follower

旧集群升级

如果旧集群采用 Master 方式部署,则每个 Master 都需要转换成一个 RocketMQ-on-DLedger Group。
如果旧集群采用 Master-Slave 方式部署,则每个 Master-Slave 组都需要转换成一个 RocketMQ-on-DLedger Group。

杀掉旧的 Broker

可以通过 kill 命令来完成,也可以调用 bin/mqshutdown broker

检查旧的 Commitlog

​ RocketMQ-on-DLedger 组中的每个节点,可以兼容旧的 Commitlog ,但其 Raft 复制过程,只能针对新增加的消息。因此,为了避免出现异常,需要保证 旧的 Commitlog 是一致的。
​ 如果旧的集群是采用 Master-Slave 方式部署,有可能在shutdown时,其数据并不是一致的,建议通过md5sum 的方式,检查最近的最少 2 个 Commmitlog 文件,如果发现不一致,则通过拷贝的方式进行对齐。

​ 虽然 RocketMQ-on-DLedger Group 也可以以 2 节点方式部署,但其会丧失容灾切换能力(2n + 1 原则,至少需要3个节点才能容忍其中 1 个宕机)。
​ 所以在对齐了 Master 和 Slave 的 Commitlog 之后,还需要准备第 3 台机器,并把旧的 Commitlog 从 Master 拷贝到 第 3 台机器(记得同时拷贝一下 config 文件夹)。

​ 在 3 台机器准备好了之后,旧 Commitlog 文件也保证一致之后,就可以开始走下一步修改配置了。

修改配置

参考新集群部署。

重新启动 Broker

参考新集群部署。

Tips

  • dLegerPeers:每个DLedger Group内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致,多个节点用英文分号隔开,单个条目遵循 legerSlefId-IP:端口,这里的端口用作dledge内部通信,注意不要和RocketMQ本身的端口冲突
  • dLegerSelfId:节点 id, 必须属于dLegerPeers中的一个;同 Group 内各个节点要唯一,并且此ID要和dLegerPeers配置的内容中对应上
  • Broker配置文件中的dLedger少了一个d,不知道是笔误还是有意为之

参考:

https://github.com/apache/rocketmq/blob/master/docs/cn/dledger/deploy_guide.md

这篇关于【RocketMQ】RocketMQ自动容灾切换DLedger集群实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/505753

相关文章

Java中实现订单超时自动取消功能(最新推荐)

《Java中实现订单超时自动取消功能(最新推荐)》本文介绍了Java中实现订单超时自动取消功能的几种方法,包括定时任务、JDK延迟队列、Redis过期监听、Redisson分布式延迟队列、Rocket... 目录1、定时任务2、JDK延迟队列 DelayQueue(1)定义实现Delayed接口的实体类 (

shell脚本自动删除30天以前的文件(最新推荐)

《shell脚本自动删除30天以前的文件(最新推荐)》该文章介绍了如何使用Shell脚本自动删除指定目录下30天以前的文件,并通过crontab设置定时任务,此外,还提供了如何使用Shell脚本删除E... 目录shell脚本自动删除30天以前的文件linux按照日期定时删除elasticsearch索引s

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

golang内存对齐的项目实践

《golang内存对齐的项目实践》本文主要介绍了golang内存对齐的项目实践,内存对齐不仅有助于提高内存访问效率,还确保了与硬件接口的兼容性,是Go语言编程中不可忽视的重要优化手段,下面就来介绍一下... 目录一、结构体中的字段顺序与内存对齐二、内存对齐的原理与规则三、调整结构体字段顺序优化内存对齐四、内

Go Mongox轻松实现MongoDB的时间字段自动填充

《GoMongox轻松实现MongoDB的时间字段自动填充》这篇文章主要为大家详细介绍了Go语言如何使用mongox库,在插入和更新数据时自动填充时间字段,从而提升开发效率并减少重复代码,需要的可以... 目录前言时间字段填充规则Mongox 的安装使用 Mongox 进行插入操作使用 Mongox 进行更

C语言中自动与强制转换全解析

《C语言中自动与强制转换全解析》在编写C程序时,类型转换是确保数据正确性和一致性的关键环节,无论是隐式转换还是显式转换,都各有特点和应用场景,本文将详细探讨C语言中的类型转换机制,帮助您更好地理解并在... 目录类型转换的重要性自动类型转换(隐式转换)强制类型转换(显式转换)常见错误与注意事项总结与建议类型

C++实现封装的顺序表的操作与实践

《C++实现封装的顺序表的操作与实践》在程序设计中,顺序表是一种常见的线性数据结构,通常用于存储具有固定顺序的元素,与链表不同,顺序表中的元素是连续存储的,因此访问速度较快,但插入和删除操作的效率可能... 目录一、顺序表的基本概念二、顺序表类的设计1. 顺序表类的成员变量2. 构造函数和析构函数三、顺序表

python实现简易SSL的项目实践

《python实现简易SSL的项目实践》本文主要介绍了python实现简易SSL的项目实践,包括CA.py、server.py和client.py三个模块,文中通过示例代码介绍的非常详细,对大家的学习... 目录运行环境运行前准备程序实现与流程说明运行截图代码CA.pyclient.pyserver.py参

使用C++实现单链表的操作与实践

《使用C++实现单链表的操作与实践》在程序设计中,链表是一种常见的数据结构,特别是在动态数据管理、频繁插入和删除元素的场景中,链表相比于数组,具有更高的灵活性和高效性,尤其是在需要频繁修改数据结构的应... 目录一、单链表的基本概念二、单链表类的设计1. 节点的定义2. 链表的类定义三、单链表的操作实现四、

Oracle数据库如何切换登录用户(system和sys)

《Oracle数据库如何切换登录用户(system和sys)》文章介绍了如何使用SQL*Plus工具登录Oracle数据库的system用户,包括打开登录入口、输入用户名和口令、以及切换到sys用户的... 目录打开登录入口登录system用户总结打开登录入口win+R打开运行对话框,输php入:sqlp