Kafka-消费者-KafkaConsumer分析-ConsumerCoordinator

2024-01-18 07:04

本文主要是介绍Kafka-消费者-KafkaConsumer分析-ConsumerCoordinator,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在前面介绍了Kafka中Rebalance操作的相关方案和原理。

在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端的GroupCoordinator的交互,ConsumerCoordinator继承了AbstractCoordinator抽象类。

下面我们先来介绍AbstractCoordinator的核心字段,如图所示。

在这里插入图片描述- heartbeat:心跳任务的辅助类,其中记录了两次发送心跳消息的间隔(interval字段)、最近发送心跳的时间(lastHeartbeatSend字段)、最后收到心跳响应的时间(lastHeartbeatReceive字段)、过期时间(timeout字段)、心跳任务重置时间(lastSessionReset字段),同时还提供了计算下次发送心跳的时间(timeToNextHeartbeat()方法)、检测是否过期的方法(sessionTimeoutExpired()方法)。

  • heartbeatTask:HeartbeatTask是一个定时任务,负责定时发送心跳请求和心跳响应的处理,会被添加到前面介绍的ConsumerNetworkClient.delayedTasks定时任务队列中。
  • groupld:当前消费者所属的Consumer Group的Id。
  • client:ConsumerNetworkClient对象,负责网络通信和执行定时任务。
  • needsJoinPrepare:标记是否需要执行发送JoinGroupRequest请求前的准备操作。
  • rejoinNeeded:此字段是否重新发送JoinGroupRequest请求的条件之一。

下面先简单了解修改其值的地方和含义,如图所示。

在这里插入图片描述
上图①处是收到正常的JoinGroupResponse响应,将rejoinNeeded设置为false,防止重复发送JoinGroupRequest请求。

②、③、④三处分别是收到异常的SyncGroupResponse或HeartbeatResponse或消费者离开Consumer Group时执行的操作,它们都会将rejoinNeeded设置为true,表示可以重新发送JoinGroupRequest。

  • coordinator:Node类型,记录服务端GroupCoordinator所在的Node节点。
  • memberld:服务端GroupCoordinator返回的分配给消费者的唯一Id。
  • generation:服务端GroupCoordinator返回的年代信息,用来区分两次Rebalance操作。由于网络延迟等问题,在执行Rebalance操作时可能收到上次Rebalance过程的请求,避免这种干扰,每次Rebalance操作都会递增generation的值。

下面是ConsumerCoordinator的核心字段。

  • assignors:PartitionAssignor列表。在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息,GroupCoordinator从所有消费者都支持的分配策略中选择一个,通知Leader使用此分配策略进行分区分配。此字段的值通过partition.assignment.strategy参数配置,可以配置多个。

  • metadata:记录了Kafka集群的元数据。

  • subscriptions:SubscriptionState对象,参考SubscriptionState小节。

  • autoCommitEnabled:是否开启了自动提交offset。

  • autoCommitTask:自动提交offset的定时任务。

  • interceptors:ConsumerInterceptor集合。

  • excludeInternalTopics:标识是否排除内部Topic。

  • metadataSnapshot:用来存储Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。在ConsumerCoordinator的构造方法中,会为Metadata添加一个监听器,当Metadata更新时会做下面几件事。

    • 如果是AUTO_PATTERN模式,则使用用户自定义的正则表达式过滤Topic,得到需要订阅的Topic集合后,设置到SubscriptionState的subscription集合和groupSubscription集合中。

    • 如果是AUTO_PATTERN或AUTO_TOPICS模式,为当前Metadata做一个快照,这个快照底层是使用HashMap记录每个Topic中Partition的个数。将新旧快照进行比较,发生变化的话,则表示消费者订阅的Topic发生分区数量变化,则将SubscriptionState的needsPartitionAssignment字段置为true,需要重新进行分区分配。

    • 使用metadataSnapshot字段记录变化后的新快照。

  • assignmentSnapshot:也是用来存储Metadata的快照信息,不过是用来检测Partition分配的过程中有没有发生分区数量变化。具体是在Leader消费者开始分区分配操作前,使用此字段记录Metadata快照;收到SyncGroupResponse后,会比较此字段记录的快照与当前Metadata是否发生变化。如果发生变化,则要重新进行分区分配。

这篇关于Kafka-消费者-KafkaConsumer分析-ConsumerCoordinator的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/618338

相关文章

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

Spring中Bean有关NullPointerException异常的原因分析

《Spring中Bean有关NullPointerException异常的原因分析》在Spring中使用@Autowired注解注入的bean不能在静态上下文中访问,否则会导致NullPointerE... 目录Spring中Bean有关NullPointerException异常的原因问题描述解决方案总结

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

python-nmap实现python利用nmap进行扫描分析

《python-nmap实现python利用nmap进行扫描分析》Nmap是一个非常用的网络/端口扫描工具,如果想将nmap集成进你的工具里,可以使用python-nmap这个python库,它提供了... 目录前言python-nmap的基本使用PortScanner扫描PortScannerAsync异

Oracle数据库执行计划的查看与分析技巧

《Oracle数据库执行计划的查看与分析技巧》在Oracle数据库中,执行计划能够帮助我们深入了解SQL语句在数据库内部的执行细节,进而优化查询性能、提升系统效率,执行计划是Oracle数据库优化器为... 目录一、什么是执行计划二、查看执行计划的方法(一)使用 EXPLAIN PLAN 命令(二)通过 S

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置