本文主要是介绍RocketMQ(二)双主双从集群搭建及入门介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前言
目前这边的配置是双主(master)双从(slave),同步双写,同步刷盘的机制。配置在两盘服务器上。
温故而知新
首先上个从百度图片盗来的图,哦不,程序员的事怎么能说盗呢?
温故下:
- Producer : 生产者(消息发送者);
- Consumer : 消费者(消息接收者);
- Broker : 负责暂存消息;(类似于邮局);
- NameServer :管理Broker;(类似于邮政部门,各个邮局的管理机构);
- ProducerGroup:生产者组;
- ConsumerGroup:消费者组。值得注意的是,在其他大V博主中的关于多Consumer订阅同一个ConsumerGroup而每个Consumer订阅的Topic又有所不同,照成的数据消费混乱或者无法被消息。这边Rocket应用就是这么个场景的。后面也会加以验证。参考链接: link.
https://blog.csdn.net/a417930422/article/details/50663639?locationNum=1; - Topic : 生产者Producer和消费者Consumer都是通过Topic找到具体某大类进 行消息的投递和消费,Producer可以发送消息给一个或多个Topic。Consumer可以订阅一个或多个Topic消息;
- Message Queue : 相当于Topic的分区,用于并行发送和接收消息;
一、由Producer 生产消息,Producer 通过询问NameServer (注:这里NameServer 实现了负载均衡)获得要将消息发送给那个Broker的具体地址,就是红色线流程。同理Consumer消息者携带者ConsumerGroup组别信息和Topic信息对NameServer 进行询问应该去哪个Broker获取消息。(值的注意的是RocketMQ对于消费者有主动推送消息DefaultMQPushConsumer和消费者主站拉取消息DefaultMQPullConsumer两种模式,但其主动推送消息模式底层实现上看也是由消费者主站拉取消息 )
二、Broker
1.Broker 分为主从节点,上图Broker Master1 和Broker Master2为主节点,Broker Slave1和Broker Slave2为从节点。主节点主要负责Producer 生产者消息的写入。从节点主要负责Consumer 消费者消息的消费。因为我们即将搭建的是双主双从,所以我们需要由brokerName参数去设置那个主那个从为一个broker组,然后通过brokerId去区分broker组中那个为主从区别,0为主节点,大于0为从节点(注一个master可以对应多个slave,而一个slave只能对应一个master)
2.Broker 主从节点之间数据同步,分为两种模式:一种异步复制,一种同步双写。其一,异步就是当Producer生产者将消息写入Broker Master以后,Broker Master就将消息成功写入的状态返给Producer了,Broker Master再异步将数据复制给Broker Slave,至于什么时候Broker Master异步复制,是否将数据复制成功Producer生产者是不知道的。异步复制会出现少量的数据丢失。其二,同步呢,顾名思义就是Producer生产者将消息写入Broker Master以后,Broker Master就将消息同步写入Broker Slave,当完成这个 操作以后再返回状态给Producer。异步效率比较高,同步对于数据的一致性安全性比较好。
Linux上双主双从的搭建
这边两台服务器上的IP将用IP1 和IP2区分。
步骤1到步骤3在两台服务器上都要进行配置。
1.hosts中添加信息
vim /etc/hosts
//记得修改IP为服务器地址。
//nameserver
IP1 nameserver1
IP2 nameserver2
//broker
IP1 master1
IP1 master1-slave1
IP2 master2
IP2 master2-slave2
//保存退出
:wq!
//重启网卡
systemctl restart network
2.记得开放RocketMQ服务端口,默认端口分别为
nameserver:9876;
master:10911;
slave:11011;
3.创建消息存储路径
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
4.配置broker
4.1配置服务器IP1上的broker文件broker-a.properties
//根据自己的路径,进入同步文件夹下
cd /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync
//修改配置文件
vim broker-a.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#(注意修改为自己服务器)强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP1
4.2配置服务器IP1上的broker文件broker-a-s.properties
//这边是在IP1上配置broker-a.properties和broker-a-s.properties,当IP1宕机的话broker-a组也就宕掉了。所以你们也可以交叉配置,比如在IP1上配置broker-a.properties和broker-b-s.properties
vim broker-a-s.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10924
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
# 这里使用了同步刷盘,建议使用异步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP1
4.3配置服务器IP2上的broker文件broker-b.properties
vim broker-b.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#(注意修改为自己服务器)强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP2
4.4配置服务器IP2上的broker文件broker-b-s.properties
//这边是在IP1上配置broker-a.properties和broker-a-s.properties,当IP1宕机的话broker-a组也就宕掉了。所以你们也可以交叉配置,比如在IP1上配置broker-a.properties和broker-b-s.properties
vim broker-b-s.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10924
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
# 这里使用了同步刷盘,建议使用异步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP2
以上broker配置完毕,上篇文章我们只修改了一个服务器的启动内存,另外一个服务器rocketMQ的启动内存也记得修改。
5.启动服务
//进入到服务器IP1下的bin目录
cd /usr/local/rocketmq/rocketmq-4.7.0/bin
//启动nameserver集群
nohup sh mqnamesrv &//启动master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-a.properties &
//启动slave1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-a-s.properties &
//jps查看
jps46688 NamesrvStartup
47106 BrokerStartup
46815 BrokerStartup
//进入到服务器IP2下的bin目录
cd /usr/local/rocketmq/rocketmq-4.7.0/bin
//启动nameserver集群
nohup sh mqnamesrv &//启动master2
nohup sh mqbroker /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-b.properties &
//启动slave2
nohup sh mqbroker /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-b-s.properties &
//jps查看
jps
本章结束
这篇关于RocketMQ(二)双主双从集群搭建及入门介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!