本文主要是介绍Kafka-消费者-KafkaConsumer分析-ConsumerCoordinator,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在前面介绍了Kafka中Rebalance操作的相关方案和原理。
在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端的GroupCoordinator的交互,ConsumerCoordinator继承了AbstractCoordinator抽象类。
下面我们先来介绍AbstractCoordinator的核心字段,如图所示。
- heartbeat:心跳任务的辅助类,其中记录了两次发送心跳消息的间隔(interval字段)、最近发送心跳的时间(lastHeartbeatSend字段)、最后收到心跳响应的时间(lastHeartbeatReceive字段)、过期时间(timeout字段)、心跳任务重置时间(lastSessionReset字段),同时还提供了计算下次发送心跳的时间(timeToNextHeartbeat()方法)、检测是否过期的方法(sessionTimeoutExpired()方法)。
- heartbeatTask:HeartbeatTask是一个定时任务,负责定时发送心跳请求和心跳响应的处理,会被添加到前面介绍的ConsumerNetworkClient.delayedTasks定时任务队列中。
- groupld:当前消费者所属的Consumer Group的Id。
- client:ConsumerNetworkClient对象,负责网络通信和执行定时任务。
- needsJoinPrepare:标记是否需要执行发送JoinGroupRequest请求前的准备操作。
- rejoinNeeded:此字段是否重新发送JoinGroupRequest请求的条件之一。
下面先简单了解修改其值的地方和含义,如图所示。
上图①处是收到正常的JoinGroupResponse响应,将rejoinNeeded设置为false,防止重复发送JoinGroupRequest请求。
②、③、④三处分别是收到异常的SyncGroupResponse或HeartbeatResponse或消费者离开Consumer Group时执行的操作,它们都会将rejoinNeeded设置为true,表示可以重新发送JoinGroupRequest。
- coordinator:Node类型,记录服务端GroupCoordinator所在的Node节点。
- memberld:服务端GroupCoordinator返回的分配给消费者的唯一Id。
- generation:服务端GroupCoordinator返回的年代信息,用来区分两次Rebalance操作。由于网络延迟等问题,在执行Rebalance操作时可能收到上次Rebalance过程的请求,避免这种干扰,每次Rebalance操作都会递增generation的值。
下面是ConsumerCoordinator的核心字段。
-
assignors:PartitionAssignor列表。在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息,GroupCoordinator从所有消费者都支持的分配策略中选择一个,通知Leader使用此分配策略进行分区分配。此字段的值通过partition.assignment.strategy参数配置,可以配置多个。
-
metadata:记录了Kafka集群的元数据。
-
subscriptions:SubscriptionState对象,参考SubscriptionState小节。
-
autoCommitEnabled:是否开启了自动提交offset。
-
autoCommitTask:自动提交offset的定时任务。
-
interceptors:ConsumerInterceptor集合。
-
excludeInternalTopics:标识是否排除内部Topic。
-
metadataSnapshot:用来存储Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。在ConsumerCoordinator的构造方法中,会为Metadata添加一个监听器,当Metadata更新时会做下面几件事。
-
如果是AUTO_PATTERN模式,则使用用户自定义的正则表达式过滤Topic,得到需要订阅的Topic集合后,设置到SubscriptionState的subscription集合和groupSubscription集合中。
-
如果是AUTO_PATTERN或AUTO_TOPICS模式,为当前Metadata做一个快照,这个快照底层是使用HashMap记录每个Topic中Partition的个数。将新旧快照进行比较,发生变化的话,则表示消费者订阅的Topic发生分区数量变化,则将SubscriptionState的needsPartitionAssignment字段置为true,需要重新进行分区分配。
-
使用metadataSnapshot字段记录变化后的新快照。
-
-
assignmentSnapshot:也是用来存储Metadata的快照信息,不过是用来检测Partition分配的过程中有没有发生分区数量变化。具体是在Leader消费者开始分区分配操作前,使用此字段记录Metadata快照;收到SyncGroupResponse后,会比较此字段记录的快照与当前Metadata是否发生变化。如果发生变化,则要重新进行分区分配。
这篇关于Kafka-消费者-KafkaConsumer分析-ConsumerCoordinator的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!