RocketMQ(二)双主双从集群搭建及入门介绍

2023-11-03 21:10

本文主要是介绍RocketMQ(二)双主双从集群搭建及入门介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

目前这边的配置是双主(master)双从(slave),同步双写,同步刷盘的机制。配置在两盘服务器上。

温故而知新

首先上个从百度图片盗来的图,哦不,程序员的事怎么能说盗呢?
在这里插入图片描述

温故下:

  • Producer : 生产者(消息发送者);
  • Consumer : 消费者(消息接收者);
  • Broker : 负责暂存消息;(类似于邮局);
  • NameServer :管理Broker;(类似于邮政部门,各个邮局的管理机构);
  • ProducerGroup:生产者组;
  • ConsumerGroup:消费者组。值得注意的是,在其他大V博主中的关于多Consumer订阅同一个ConsumerGroup而每个Consumer订阅的Topic又有所不同,照成的数据消费混乱或者无法被消息。这边Rocket应用就是这么个场景的。后面也会加以验证。参考链接: link.
    https://blog.csdn.net/a417930422/article/details/50663639?locationNum=1;
  • Topic : 生产者Producer和消费者Consumer都是通过Topic找到具体某大类进 行消息的投递和消费,Producer可以发送消息给一个或多个Topic。Consumer可以订阅一个或多个Topic消息;
  • Message Queue : 相当于Topic的分区,用于并行发送和接收消息;

  一、由Producer 生产消息,Producer 通过询问NameServer (注:这里NameServer 实现了负载均衡)获得要将消息发送给那个Broker的具体地址,就是红色线流程。同理Consumer消息者携带者ConsumerGroup组别信息和Topic信息对NameServer 进行询问应该去哪个Broker获取消息。(值的注意的是RocketMQ对于消费者有主动推送消息DefaultMQPushConsumer和消费者主站拉取消息DefaultMQPullConsumer两种模式,但其主动推送消息模式底层实现上看也是由消费者主站拉取消息 )

  二、Broker
  1.Broker 分为主从节点,上图Broker Master1 和Broker Master2为主节点,Broker Slave1和Broker Slave2为从节点。主节点主要负责Producer 生产者消息的写入。从节点主要负责Consumer 消费者消息的消费。因为我们即将搭建的是双主双从,所以我们需要由brokerName参数去设置那个主那个从为一个broker组,然后通过brokerId去区分broker组中那个为主从区别,0为主节点,大于0为从节点(注一个master可以对应多个slave,而一个slave只能对应一个master)
  2.Broker 主从节点之间数据同步,分为两种模式:一种异步复制,一种同步双写。其一,异步就是当Producer生产者将消息写入Broker Master以后,Broker Master就将消息成功写入的状态返给Producer了,Broker Master再异步将数据复制给Broker Slave,至于什么时候Broker Master异步复制,是否将数据复制成功Producer生产者是不知道的。异步复制会出现少量的数据丢失。其二,同步呢,顾名思义就是Producer生产者将消息写入Broker Master以后,Broker Master就将消息同步写入Broker Slave,当完成这个 操作以后再返回状态给Producer。异步效率比较高,同步对于数据的一致性安全性比较好。

Linux上双主双从的搭建

这边两台服务器上的IP将用IP1 和IP2区分。
步骤1到步骤3在两台服务器上都要进行配置。
1.hosts中添加信息

vim /etc/hosts
//记得修改IP为服务器地址。
//nameserver
IP1 nameserver1
IP2 nameserver2
//broker
IP1 master1
IP1 master1-slave1
IP2 master2
IP2 master2-slave2
//保存退出
:wq!
//重启网卡
systemctl restart network

2.记得开放RocketMQ服务端口,默认端口分别为
nameserver:9876;
master:10911;
slave:11011;

3.创建消息存储路径


mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index

4.配置broker
 4.1配置服务器IP1上的broker文件broker-a.properties

//根据自己的路径,进入同步文件夹下
cd /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync
//修改配置文件
vim broker-a.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#(注意修改为自己服务器)强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP1

 4.2配置服务器IP1上的broker文件broker-a-s.properties

//这边是在IP1上配置broker-a.properties和broker-a-s.properties,当IP1宕机的话broker-a组也就宕掉了。所以你们也可以交叉配置,比如在IP1上配置broker-a.properties和broker-b-s.properties
vim broker-a-s.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10924
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
# 这里使用了同步刷盘,建议使用异步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP1

 4.3配置服务器IP2上的broker文件broker-b.properties

vim broker-b.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#(注意修改为自己服务器)强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP2

4.4配置服务器IP2上的broker文件broker-b-s.properties

//这边是在IP1上配置broker-a.properties和broker-a-s.properties,当IP1宕机的话broker-a组也就宕掉了。所以你们也可以交叉配置,比如在IP1上配置broker-a.properties和broker-b-s.properties
vim broker-b-s.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10924
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
# 这里使用了同步刷盘,建议使用异步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP2

以上broker配置完毕,上篇文章我们只修改了一个服务器的启动内存,另外一个服务器rocketMQ的启动内存也记得修改。
5.启动服务

//进入到服务器IP1下的bin目录
cd /usr/local/rocketmq/rocketmq-4.7.0/bin
//启动nameserver集群
nohup sh mqnamesrv &//启动master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-a.properties &
//启动slave1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-a-s.properties &
//jps查看
jps46688 NamesrvStartup
47106 BrokerStartup
46815 BrokerStartup
//进入到服务器IP2下的bin目录
cd /usr/local/rocketmq/rocketmq-4.7.0/bin
//启动nameserver集群
nohup sh mqnamesrv &//启动master2
nohup sh mqbroker /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-b.properties &
//启动slave2
nohup sh mqbroker /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-b-s.properties &
//jps查看
jps

本章结束

这篇关于RocketMQ(二)双主双从集群搭建及入门介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/340879

相关文章

Redis分片集群的实现

《Redis分片集群的实现》Redis分片集群是一种将Redis数据库分散到多个节点上的方式,以提供更高的性能和可伸缩性,本文主要介绍了Redis分片集群的实现,具有一定的参考价值,感兴趣的可以了解一... 目录1. Redis Cluster的核心概念哈希槽(Hash Slots)主从复制与故障转移2.

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

MySQL中慢SQL优化的不同方式介绍

《MySQL中慢SQL优化的不同方式介绍》慢SQL的优化,主要从两个方面考虑,SQL语句本身的优化,以及数据库设计的优化,下面小编就来给大家介绍一下有哪些方式可以优化慢SQL吧... 目录避免不必要的列分页优化索引优化JOIN 的优化排序优化UNION 优化慢 SQL 的优化,主要从两个方面考虑,SQL 语

C++中函数模板与类模板的简单使用及区别介绍

《C++中函数模板与类模板的简单使用及区别介绍》这篇文章介绍了C++中的模板机制,包括函数模板和类模板的概念、语法和实际应用,函数模板通过类型参数实现泛型操作,而类模板允许创建可处理多种数据类型的类,... 目录一、函数模板定义语法真实示例二、类模板三、关键区别四、注意事项 ‌在C++中,模板是实现泛型编程

Python实现html转png的完美方案介绍

《Python实现html转png的完美方案介绍》这篇文章主要为大家详细介绍了如何使用Python实现html转png功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 1.增强稳定性与错误处理建议使用三层异常捕获结构:try: with sync_playwright(

Java使用多线程处理未知任务数的方案介绍

《Java使用多线程处理未知任务数的方案介绍》这篇文章主要为大家详细介绍了Java如何使用多线程实现处理未知任务数,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 知道任务个数,你可以定义好线程数规则,生成线程数去跑代码说明:1.虚拟线程池:使用 Executors.newVir

使用DeepSeek搭建个人知识库(在笔记本电脑上)

《使用DeepSeek搭建个人知识库(在笔记本电脑上)》本文介绍了如何在笔记本电脑上使用DeepSeek和开源工具搭建个人知识库,通过安装DeepSeek和RAGFlow,并使用CherryStudi... 目录部署环境软件清单安装DeepSeek安装Cherry Studio安装RAGFlow设置知识库总

Linux搭建Mysql主从同步的教程

《Linux搭建Mysql主从同步的教程》:本文主要介绍Linux搭建Mysql主从同步的教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux搭建mysql主从同步1.启动mysql服务2.修改Mysql主库配置文件/etc/my.cnf3.重启主库my