RocketMQ(二)双主双从集群搭建及入门介绍

2023-11-03 21:10

本文主要是介绍RocketMQ(二)双主双从集群搭建及入门介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

目前这边的配置是双主(master)双从(slave),同步双写,同步刷盘的机制。配置在两盘服务器上。

温故而知新

首先上个从百度图片盗来的图,哦不,程序员的事怎么能说盗呢?
在这里插入图片描述

温故下:

  • Producer : 生产者(消息发送者);
  • Consumer : 消费者(消息接收者);
  • Broker : 负责暂存消息;(类似于邮局);
  • NameServer :管理Broker;(类似于邮政部门,各个邮局的管理机构);
  • ProducerGroup:生产者组;
  • ConsumerGroup:消费者组。值得注意的是,在其他大V博主中的关于多Consumer订阅同一个ConsumerGroup而每个Consumer订阅的Topic又有所不同,照成的数据消费混乱或者无法被消息。这边Rocket应用就是这么个场景的。后面也会加以验证。参考链接: link.
    https://blog.csdn.net/a417930422/article/details/50663639?locationNum=1;
  • Topic : 生产者Producer和消费者Consumer都是通过Topic找到具体某大类进 行消息的投递和消费,Producer可以发送消息给一个或多个Topic。Consumer可以订阅一个或多个Topic消息;
  • Message Queue : 相当于Topic的分区,用于并行发送和接收消息;

  一、由Producer 生产消息,Producer 通过询问NameServer (注:这里NameServer 实现了负载均衡)获得要将消息发送给那个Broker的具体地址,就是红色线流程。同理Consumer消息者携带者ConsumerGroup组别信息和Topic信息对NameServer 进行询问应该去哪个Broker获取消息。(值的注意的是RocketMQ对于消费者有主动推送消息DefaultMQPushConsumer和消费者主站拉取消息DefaultMQPullConsumer两种模式,但其主动推送消息模式底层实现上看也是由消费者主站拉取消息 )

  二、Broker
  1.Broker 分为主从节点,上图Broker Master1 和Broker Master2为主节点,Broker Slave1和Broker Slave2为从节点。主节点主要负责Producer 生产者消息的写入。从节点主要负责Consumer 消费者消息的消费。因为我们即将搭建的是双主双从,所以我们需要由brokerName参数去设置那个主那个从为一个broker组,然后通过brokerId去区分broker组中那个为主从区别,0为主节点,大于0为从节点(注一个master可以对应多个slave,而一个slave只能对应一个master)
  2.Broker 主从节点之间数据同步,分为两种模式:一种异步复制,一种同步双写。其一,异步就是当Producer生产者将消息写入Broker Master以后,Broker Master就将消息成功写入的状态返给Producer了,Broker Master再异步将数据复制给Broker Slave,至于什么时候Broker Master异步复制,是否将数据复制成功Producer生产者是不知道的。异步复制会出现少量的数据丢失。其二,同步呢,顾名思义就是Producer生产者将消息写入Broker Master以后,Broker Master就将消息同步写入Broker Slave,当完成这个 操作以后再返回状态给Producer。异步效率比较高,同步对于数据的一致性安全性比较好。

Linux上双主双从的搭建

这边两台服务器上的IP将用IP1 和IP2区分。
步骤1到步骤3在两台服务器上都要进行配置。
1.hosts中添加信息

vim /etc/hosts
//记得修改IP为服务器地址。
//nameserver
IP1 nameserver1
IP2 nameserver2
//broker
IP1 master1
IP1 master1-slave1
IP2 master2
IP2 master2-slave2
//保存退出
:wq!
//重启网卡
systemctl restart network

2.记得开放RocketMQ服务端口,默认端口分别为
nameserver:9876;
master:10911;
slave:11011;

3.创建消息存储路径


mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index

4.配置broker
 4.1配置服务器IP1上的broker文件broker-a.properties

//根据自己的路径,进入同步文件夹下
cd /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync
//修改配置文件
vim broker-a.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#(注意修改为自己服务器)强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP1

 4.2配置服务器IP1上的broker文件broker-a-s.properties

//这边是在IP1上配置broker-a.properties和broker-a-s.properties,当IP1宕机的话broker-a组也就宕掉了。所以你们也可以交叉配置,比如在IP1上配置broker-a.properties和broker-b-s.properties
vim broker-a-s.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10924
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
# 这里使用了同步刷盘,建议使用异步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP1

 4.3配置服务器IP2上的broker文件broker-b.properties

vim broker-b.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#(注意修改为自己服务器)强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP2

4.4配置服务器IP2上的broker文件broker-b-s.properties

//这边是在IP1上配置broker-a.properties和broker-a-s.properties,当IP1宕机的话broker-a组也就宕掉了。所以你们也可以交叉配置,比如在IP1上配置broker-a.properties和broker-b-s.properties
vim broker-b-s.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10924
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
# 这里使用了同步刷盘,建议使用异步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP2

以上broker配置完毕,上篇文章我们只修改了一个服务器的启动内存,另外一个服务器rocketMQ的启动内存也记得修改。
5.启动服务

//进入到服务器IP1下的bin目录
cd /usr/local/rocketmq/rocketmq-4.7.0/bin
//启动nameserver集群
nohup sh mqnamesrv &//启动master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-a.properties &
//启动slave1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-a-s.properties &
//jps查看
jps46688 NamesrvStartup
47106 BrokerStartup
46815 BrokerStartup
//进入到服务器IP2下的bin目录
cd /usr/local/rocketmq/rocketmq-4.7.0/bin
//启动nameserver集群
nohup sh mqnamesrv &//启动master2
nohup sh mqbroker /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-b.properties &
//启动slave2
nohup sh mqbroker /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-b-s.properties &
//jps查看
jps

本章结束

这篇关于RocketMQ(二)双主双从集群搭建及入门介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/340879

相关文章

MySQL MHA集群详解(数据库高可用)

《MySQLMHA集群详解(数据库高可用)》MHA(MasterHighAvailability)是开源MySQL高可用管理工具,用于自动故障检测与转移,支持异步或半同步复制的MySQL主从架构,本... 目录mysql 高可用方案:MHA 详解与实战1. MHA 简介2. MHA 的组件组成(1)MHA

SpringCloud Stream 快速入门实例教程

《SpringCloudStream快速入门实例教程》本文介绍了SpringCloudStream(SCS)组件在分布式系统中的作用,以及如何集成到SpringBoot项目中,通过SCS,可... 目录1.SCS 组件的出现的背景和作用2.SCS 集成srping Boot项目3.Yml 配置4.Sprin

MongoDB搭建过程及单机版部署方法

《MongoDB搭建过程及单机版部署方法》MongoDB是一个灵活、高性能的NoSQL数据库,特别适合快速开发和大规模分布式系统,本文给大家介绍MongoDB搭建过程及单机版部署方法,感兴趣的朋友跟随... 目录前言1️⃣ 核心特点1、文档存储2、无模式(Schema-less)3、高性能4、水平扩展(Sh

golang实现nacos获取配置和服务注册-支持集群详解

《golang实现nacos获取配置和服务注册-支持集群详解》文章介绍了如何在Go语言中使用Nacos获取配置和服务注册,支持集群初始化,客户端结构体中的IpAddresses可以配置多个地址,新客户... 目录golang nacos获取配置和服务注册-支持集群初始化客户端可选参数配置new一个客户端 支

Redis的安全机制详细介绍及配置方法

《Redis的安全机制详细介绍及配置方法》本文介绍Redis安全机制的配置方法,包括绑定IP地址、设置密码、保护模式、禁用危险命令、防火墙限制、TLS加密、客户端连接限制、最大内存使用和日志审计等,通... 目录1. 绑定 IP 地址2. 设置密码3. 保护模式4. 禁用危险命令5. 通过防火墙限制访问6.

MySQL集群高可用架构的两种使用小结

《MySQL集群高可用架构的两种使用小结》本文介绍了MySQL的两种高可用解决方案:组复制(MGR)和MasterHighAvailability(MHA),文中通过示例代码介绍的非常详细,对大家的学... 目录一、mysql高可用之组复制(MGR)1.1 组复制核心特性与优势1.2 组复制架构原理1.3

SpringMVC配置、映射与参数处理​入门案例详解

《SpringMVC配置、映射与参数处理​入门案例详解》文章介绍了SpringMVC框架的基本概念和使用方法,包括如何配置和编写Controller、设置请求映射规则、使用RestFul风格、获取请求... 目录1.SpringMVC概述2.入门案例①导入相关依赖②配置web.XML③配置SpringMVC

Docker + Redis 部署集群的实现步骤

《Docker+Redis部署集群的实现步骤》本文详细介绍了在三台服务器上部署高可用Redis集群的完整流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录一、环境准备1. 服务器规划(3 台服务器)2. 防火墙配置(三台服务器均执行)3. 安装 docke

springBoot (springCloud2025)集成redisCluster 集群的操作方法

《springBoot(springCloud2025)集成redisCluster集群的操作方法》文章介绍了如何使用SpringBoot集成RedisCluster集群,并详细说明了pom.xm... 目录pom.XMLapplication.yamlcluster配置类其他配置类连接池配置类Redis

MySQL索引踩坑合集从入门到精通

《MySQL索引踩坑合集从入门到精通》本文详细介绍了MySQL索引的使用,包括索引的类型、创建、使用、优化技巧及最佳实践,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友... 目录mysql索引完整教程:从入门到入土(附实战踩坑指南)一、索引是什么?为什么需要它?1.1 什么