本文主要是介绍RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- NameServer 路由注册机制
- 生产者的发送消息流程
NameServer 路由注册机制
在 Broker 启动时,通过 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
向 NameServer 中注册自己
那么 NameServer 中,注册 Broker 信息的入口在: DefaultRequestProcessor # processRequest
-
判断请求码,如果是 Broker 注册,则进行注册 Broker 信息
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {// ... 省略// 如果是 Broker 注册case RequestCode.REGISTER_BROKER:return this.registerBroker(ctx, request);// ... 省略} }
-
this.registerBroker
真正开始注册 Broker 信息在注册信息之前,会先使用
crc32
来检验消息的正确性(安全检查)之后会调用
this.namesrvController.getRouteInfoManager().registerBroker()
来注册 Broker 的信息,这个 Broker 的信息是 BrokerController 启动时通过 Netty 发送过来的通过
getRouteInfoManager
获取 RouteInfoManager,在该类中注册 Broker 信息,那么 RouteInfoManager 肯定是管理了 Broker 的信息可以点进去 RouteInfoManager,可以发现其中管理了很多路由的信息
其中 brokerLiveTable 存储的是存活的 Broker 列表,那么可以查看该变量的引用链,来判断 Nameserver 在哪里进行心跳扫描
可以看到在 scanNotActiveBroker 方法中,会将 brokerLiveTable 中不活跃的 Broker 给剔除掉
生产者的发送消息流程
下面会将整体的一个发送消息的流程图片先展示出来,再通过代码进行一步一步梳理:
既然要看生产者的发送消息流程,就先通过方法的调用作为入口,一步一步探究流程:
那么通过这个 send 方法点进去,入口为:DefaultMQProducer # send(Message msg)
方法,从该方法点击进入,调用链如下:
如果你在看源码的话,可以从上边的调用链一步一步点击,最后发送消息的逻辑就在 this.sendDefaultImpl
方法中展开
- 首先,会先根据 Topic 获取对应的路由信息,表示该 Topic 需要向哪个 MessageQueue 中进行发送,这个路由信息会先从本地缓存中取,如果没有取到,会向 NameServer 发送请求来获取 Topic 的路由信息
- 设置消息发送失败的
重试次数
,同步情况下重试次数为预设次数 +1,异步情况下默认重试次数为 1 - 接下来就根据
重试次数
循环发送消息,为 Topic 选择要发送的队列 MessageQueue 进行消息发送
选择队列之后,就进入到发送消息的核心逻辑:this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
- 在该方法中,先通过队列 MessageQueue 找到对应的 brokerAddr
- 之后,会尝试对消息进行压缩
- 判断是否存在一些需要对消息进行
禁止发送
或前置拦截
的钩子函数,进行一些消息的拦截处理 - 判断通信模式:ASYNC、ONEWAY、SYNC,将消息以对应的方式发送出去,这里以同步
SYNC
为例
如果是同步的话,会通过 this.mQClientFactory.getMQClientAPIImpl().sendMessage()
方法将消息发送出去,接下来又是层层的调用,最后真正通过 Netty 将消息发送出去的地方在 NettyRemotingClient # invokeSync()
的方法中
在这个方法中,还会对消息进行前置拦截和后置拦截,为开发者的使用提供了很多的扩展点,在这里就 真正通过 Netty 将消息发送出去了
这篇关于RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!