RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程

本文主要是介绍RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • NameServer 路由注册机制
      • 生产者的发送消息流程

NameServer 路由注册机制

在 Broker 启动时,通过 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); 向 NameServer 中注册自己

那么 NameServer 中,注册 Broker 信息的入口在: DefaultRequestProcessor # processRequest

  • 判断请求码,如果是 Broker 注册,则进行注册 Broker 信息

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {// ... 省略// 如果是 Broker 注册case RequestCode.REGISTER_BROKER:return this.registerBroker(ctx, request);// ... 省略}
    }
    

  • this.registerBroker 真正开始注册 Broker 信息

    在注册信息之前,会先使用 crc32 来检验消息的正确性(安全检查)

    在这里插入图片描述

    之后会调用 this.namesrvController.getRouteInfoManager().registerBroker() 来注册 Broker 的信息,这个 Broker 的信息是 BrokerController 启动时通过 Netty 发送过来的

    通过 getRouteInfoManager 获取 RouteInfoManager,在该类中注册 Broker 信息,那么 RouteInfoManager 肯定是管理了 Broker 的信息

    可以点进去 RouteInfoManager,可以发现其中管理了很多路由的信息

    在这里插入图片描述

    其中 brokerLiveTable 存储的是存活的 Broker 列表,那么可以查看该变量的引用链,来判断 Nameserver 在哪里进行心跳扫描

    在这里插入图片描述

    可以看到在 scanNotActiveBroker 方法中,会将 brokerLiveTable 中不活跃的 Broker 给剔除掉

    在这里插入图片描述

生产者的发送消息流程

下面会将整体的一个发送消息的流程图片先展示出来,再通过代码进行一步一步梳理:

在这里插入图片描述

既然要看生产者的发送消息流程,就先通过方法的调用作为入口,一步一步探究流程:

在这里插入图片描述

那么通过这个 send 方法点进去,入口为:DefaultMQProducer # send(Message msg) 方法,从该方法点击进入,调用链如下:

在这里插入图片描述

如果你在看源码的话,可以从上边的调用链一步一步点击,最后发送消息的逻辑就在 this.sendDefaultImpl 方法中展开

  1. 首先,会先根据 Topic 获取对应的路由信息,表示该 Topic 需要向哪个 MessageQueue 中进行发送,这个路由信息会先从本地缓存中取,如果没有取到,会向 NameServer 发送请求来获取 Topic 的路由信息
  2. 设置消息发送失败的 重试次数 ,同步情况下重试次数为预设次数 +1,异步情况下默认重试次数为 1
  3. 接下来就根据 重试次数 循环发送消息,为 Topic 选择要发送的队列 MessageQueue 进行消息发送

选择队列之后,就进入到发送消息的核心逻辑:this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

  1. 在该方法中,先通过队列 MessageQueue 找到对应的 brokerAddr
  2. 之后,会尝试对消息进行压缩
  3. 判断是否存在一些需要对消息进行 禁止发送前置拦截 的钩子函数,进行一些消息的拦截处理
  4. 判断通信模式:ASYNC、ONEWAY、SYNC,将消息以对应的方式发送出去,这里以同步 SYNC 为例

如果是同步的话,会通过 this.mQClientFactory.getMQClientAPIImpl().sendMessage() 方法将消息发送出去,接下来又是层层的调用,最后真正通过 Netty 将消息发送出去的地方在 NettyRemotingClient # invokeSync() 的方法中

在这个方法中,还会对消息进行前置拦截和后置拦截,为开发者的使用提供了很多的扩展点,在这里就 真正通过 Netty 将消息发送出去了

在这里插入图片描述

这篇关于RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/511876

相关文章

C#提取PDF表单数据的实现流程

《C#提取PDF表单数据的实现流程》PDF表单是一种常见的数据收集工具,广泛应用于调查问卷、业务合同等场景,凭借出色的跨平台兼容性和标准化特点,PDF表单在各行各业中得到了广泛应用,本文将探讨如何使用... 目录引言使用工具C# 提取多个PDF表单域的数据C# 提取特定PDF表单域的数据引言PDF表单是一

PyCharm接入DeepSeek实现AI编程的操作流程

《PyCharm接入DeepSeek实现AI编程的操作流程》DeepSeek是一家专注于人工智能技术研发的公司,致力于开发高性能、低成本的AI模型,接下来,我们把DeepSeek接入到PyCharm中... 目录引言效果演示创建API key在PyCharm中下载Continue插件配置Continue引言

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Python手搓邮件发送客户端

《Python手搓邮件发送客户端》这篇文章主要为大家详细介绍了如何使用Python手搓邮件发送客户端,支持发送邮件,附件,定时发送以及个性化邮件正文,感兴趣的可以了解下... 目录1. 简介2.主要功能2.1.邮件发送功能2.2.个性签名功能2.3.定时发送功能2. 4.附件管理2.5.配置加载功能2.6.

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1

Java如何通过反射机制获取数据类对象的属性及方法

《Java如何通过反射机制获取数据类对象的属性及方法》文章介绍了如何使用Java反射机制获取类对象的所有属性及其对应的get、set方法,以及如何通过反射机制实现类对象的实例化,感兴趣的朋友跟随小编一... 目录一、通过反射机制获取类对象的所有属性以及相应的get、set方法1.遍历类对象的所有属性2.获取

MySQL中的锁和MVCC机制解读

《MySQL中的锁和MVCC机制解读》MySQL事务、锁和MVCC机制是确保数据库操作原子性、一致性和隔离性的关键,事务必须遵循ACID原则,锁的类型包括表级锁、行级锁和意向锁,MVCC通过非锁定读和... 目录mysql的锁和MVCC机制事务的概念与ACID特性锁的类型及其工作机制锁的粒度与性能影响多版本