Kafka的Broker运行机制

2024-08-25 23:28
文章标签 kafka broker 运行机制

本文主要是介绍Kafka的Broker运行机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.Zookeeper整体数据

2.Controller Broker选举机制

3.Leader Partition选举机制

4.Leader Partition自平衡机制

5.Partition故障恢复机制

6.HW一致性保障-Epoch更新机制

总结


Kafka依赖很多的存储数据,但是,总体上是有划分的。Kafka将状态信息保存在Zookeeper中,通过Zookeeper中的数据,指导每个Kafka与其他Kafka节点不同的业务逻辑。而将状态信息抽离后,剩下的数据,就可以直接存到Kafka本地,所有Kafka服务都以相同的逻辑运行。这种状态信息分离的设计,让Kafka有非常好的集群扩展性。

本章主要讲解如何保证Partition的消息一致性。

1.Zookeeper整体数据

Zookeeper的Watcher机制也可以很好的减少Broker读取Zookeeper的次数。

Kafka的集群中,最为主要的状态信息有两个。一个是在多个Broker中,需要选举出一个Broker,担任Controller角色由Controller角色来管理整个集群中的分区和副本状态。另一个是在同一个Topic下的多个Partition中,需要选举出一个Leader角色由Leader角色的Partition来负责与客户端进行数据交互

这些状态信息都被Kafka集群注册到了Zookeeper中。Zookeeper数据整体如下图:

brokers/ids下,会记录集群中的所有BrokerId;/topics目录下,会记录当前Kafka的Topic相关的Partition分区等信息。

例如集群中每个Broker启动后,都会往Zookeeper注册一个临时节点/broker/ids/{BrokerId}。如果启动了Zookeeper和Kafka后,服务器非正常关机,这时Zookeeper上的这个临时节点就不会注销下次重新启动Kafka时,就有可能因为无法注册上这个临时节点而报错

2.Controller Broker选举机制

选举Controller的过程就是通过抢占Zookeeper的/controller节点来实现的。Zookeeper会保证在一个集群中,只会有一个broker能够成功创建这个节点,这个注册成功的broker就成了集群当中的Controller节点。

当一个应用在Zookeeper上创建了一个临时节点后,Zookeeper需要这个应用一直保持连接状态。如果Zookeeper长时间检测不到应用的心跳信息,就会删除临时节点。同时,Zookeeper还允许应用监听节点的状态,当应用状态有变化时,会向该节点对应的所有监听器广播节点变化事件

这样,如果集群中的Controller节点服务宕机了,Zookeeper就会删除/controller节点。而其他未注册成功的Broker节点,就会感知到这一事件,然后开始竞争,再次创建临时节点。这就是Kafka基于Zookeeper的Controller选举机制。

Controller Broker的作用

  • 监听Zookeeper中的/brokers/ids节点,感知Broker增减变化
  • 监听/brokers/topics,感知topic以及对应的partition的增减变化
  • 监听/admin/delete_topic节点,处理删除topic的动作
  • Controller还需要负责将元数据推送给其他Broker。

3.Leader Partition选举机制

在Kafka中,一个Topic下的所有消息,是分开存储在不同的Partition中的。在使用kafka-topics.sh脚本创建Topic时,可以通过--partitions 参数指定Topic下包含多少个Partition,还可以通过--replication-factors参数指定每个Partition有几个备份。而在一个Partition的众多备份中,需要选举出一个Leader Partition,负责对接所有的客户端请求,并将消息优先保存,然后再通知其他Follower Partition来同步消息。

几个重要的概念:

  • AR: Assigned Repllicas。 表示Kafka分区中的所有副本(存活的和不存活的)
  • ISR: 表示在所有AR中,服务正常,保持与Leader同步的Follower集合。如果Follower长时间没有向Leader发送通信请求(超时时间由replica.lag.time.max.ms参数设定,默认30S),那么这个Follower就会被提出ISR中。(在老版本的Kafka中,还会考虑Partition与Leader Partition之间同步的消息差值,大于参数replica.lag.max.messages条就会被移除ISR。现在版本已经移除了这个参数。)
  • OSR:表示从ISR中踢出的节点。记录的是那些服务有问题,延迟过多的副本。

他们都被记录在Zookeeper中。

Leader选举机制:在选举Leader Partition时,会按照AR中的排名顺序,靠前的优先选举只要当前Partition在ISR列表中,也就是是存活的,那么这个节点就会被选举成为Leader Partition。

Leader Partitoin选举机制能够保证每一个Partition同一时刻有且仅有一个Leader Partition。但是,是不是只要分配好了Leader Partition就够了呢?当然不是。重新选举Leader后,可能会导致过多的Leader集中在同一Broker上,从而导致这个Broker的压力过大

 

4.Leader Partition自平衡机制

默认情况下,Kafka会尽量将Leader Partition分配到不同的Broker节点上,用以保证整个集群的性能压力能够比较平均。

但是,经过Leader Partition选举后,这种平衡就有可能会被打破,让Leader Partition过多的集中到同一个Broker上。这样,这个Broker的压力就会明显高于其他Broker,从而影响到集群的整体性能。

Kafka的自平衡机制

Kafka会认为AR当中的第一个节点就应该是Leader节点这种选举结果成为preferred election理想选举结果。Controller会定期检测集群的Partition平衡情况,在开始检测时,Controller会依次检查所有的Broker。当发现这个Broker上的不平衡的Partition比例高于leader.imbalance.per.broker.percentage阈值时,就会触发一次Leader Partiton的自平衡。

注意:Leader Partition自平衡的过程是一个非常重要的操作,因为要涉及到大量消息的转移与同步。并且,在这个过程中,会有丢消息的可能。所以在很多对性能要求比较高的线上环境,会选择将参数auto.leader.rebalance.enable设置为false,关闭Kafka的Leader Partition自平衡操作,而用其他运维的方式,在业务不繁忙的时间段,手动进行Leader Partiton自平衡,尽量减少自平衡过程对业务的影响。

5.Partition故障恢复机制

Kafka为了保证消息能够在多个Parititon中保持数据同步,内部记录了两个关键的数据:

  • LEO(Log End Offset): 每个Partition的最后一个Offset

每个Partition都会记录自己保存的消息偏移量。leader partition收到并记录了生产者发送的一条消息,就将LEO加1。而接下来,follower partition需要从leader partition同步消息,每同步到一个消息,自己的LEO就加1。通过LEO值,就知道各个follower partition与leader partition之间的消息差距。

  • HW(High Watermark): 一组Partiton中最小的LEO

follower partition每次往leader partition同步消息时,都会同步自己的LEO给leader partition。这样leader partition就可以计算出这个HW值,并最终会同步给各个follower partition。leader partition认为这个HW值以前的消息,都是在所有follower partition之间完成了同步的,是安全的。这些安全的消息就可以被消费者拉取过去了。而HW值之后的消息,就是不安全的,是可能丢失的。这些消息如果被消费者拉取过去消费了,就有可能造成数据不一致。

在所有服务都正常的情况下,当一个消息写入到Leader Partition后,并不会立即让消费者感知。而是会等待其他Follower Partition同步。这个过程中就会推进HW。当HW超过当前消息时,才会让消费者感知。比如在上图中,4号往后的消息,虽然写入了Leader Partition,但是消费者是消费不到的。

当服务出现故障时,如果是Follower发生故障,这不会影响消息写入,只不过是少了一个备份而已。处理相对简单一点。

  1. 将故障的Follower节点临时剔除ISR集合。而其他Leader和Follower继续正常接收消息。
  2. 出现故障的Follower节点恢复后,不会立即加入ISR集合。该Follower节点会读取本地记录的上一次的HW,将自己的日志中高于HW的部分信息全部删除掉,然后从HW开始,向Leader进行消息同步。
  3. 等到该Follower的LEO大于等于整个Partiton的HW后,就重新加入到ISR集合中。这也就是说这个
    Follower的消息进度追上了Leader。 

如果是Leader节点出现故障,Kafka为了保证消息的一致性,处理就会相对复杂一点。

  1. Leader发生故障,会从ISR中进行选举,将一个原本是Follower的Partition提升为新的Leader。这时,消息有可能没有完成同步,所以新的Leader的LEO会低于之前Leader的LEO。
  2. Kafka中的消息都只能以Leader中的备份为准。其他Follower会将各自的Log文件中高于HW的部分全部清理掉,然后从新的Leader中同步数据
  3. 旧的Leader恢复后,将作为Follower节点,进行数据恢复。

Kafka注重的是保护多个副本之间的数据一致性。但是这样,消息的安全性就得不到保障。例如在下图中,原本Partition0中的4,5,6,7号消息就被丢失掉了。

这个机制中有一个很重要的前提,就是各个Broker中记录的HW是一致的。但是HW和LEO同样是一个分布式的值,怎么保证HW在多个Broker中是一致的呢?答案是,没法保证,所以引入了Epoch机制。 

6.HW一致性保障-Epoch更新机制

实际上,HW值在一组Partition里并不是总是一致的

Follower Partition的HW更新的流程:

对于Follower Partition,他需要先将消息从Leader Partition拉取到本地,才能向Leader Partition上报LEO值所有Follower Partition上报后,Leader Partition才能更新HW的值,然后Follower Partition在下次拉取消息时,才能更新HW值

所以,Leader Partiton的LEO更新和Follower Partition的LEO更新,在时间上是有延迟的。这也导致了Leader Partition上更新HW值的时刻与Follower Partition上更新HW值的时刻,是会出现延迟的。这样,如果有多个Follower Partition,这些Partition保存的HW的值是不统一的。当然,如果服务一切正常,最终Leader Partition还是会正常推进HW,能够保证HW的最终一致性。但是,当Leader Partition出现切换,所有的Follower Partition都按照自己的HW进行数据恢复,就会出现数据不一致的情况

Kafka还设计了Epoch机制,来保证HW的一致性

  1. Epoch是一个单调递增的版本号,每当Leader Partition发生变更时,该版本号就会更新。所以,当有多个Epoch时,只有最新的Epoch才是有效的,而其他Epoch对应的Leader Partition就是过期的,无用的Leader。
  2. 每个Leader Partition在上任之初,都会新增一个新的Epoch记录这个记录包含更新后的epoch版本号,以及当前Leader Partition写入的第一个消息的偏移量。例如(1,100)。表示epoch版本号是1,当前Leader Partition写入的第一条消息是100,Broker会将这个epoch数据保存到内存中,并且会持久化到本地一个leader-epoch-checkpoint文件当中。
  3. 这个leader-epoch-checkpoint会在所有Follower Partition中同步。当Leader Partition有变更时,新的Leader Partition就会读取这个Epoch记录,更新后添加自己的Epoch记录。
  4. 接下来其他Follower Partition要更新数据时,就可以不再依靠自己记录的HW值判断拉取消息的起点。而可以根据这个最新的epoch条目来判断。(从新Leader写的第一条数据开始更新)

总结

Kafka其实天生就是为了集群而生,即使单个节点运行Kafka,他其实也是作为一个集群运行的。而Kafka为了保证在各种网络抽风,服务器不稳定等复杂情况下,保证集群的高性能,高可用,高可扩展三高,做了非常多的设计。

 

这篇关于Kafka的Broker运行机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1106940

相关文章

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Oracle ADG Broker主备切换报错失败处理

Oracle ADG Broker主备切换报错失败处理 问题症状解决办法 ⭐️ Oracle数据库版本为19.18。 问题症状 通过DG Broker发起主备切换: DGMGRL> show configurationConfiguration - dg_msgdbProtection Mode: MaxPerformanceMembers:msgdb_1 - Prim

Kafka 实战演练:创建、配置与测试 Kafka全面教程

文章目录 1.配置文件2.消费者1.注解方式2.KafkaConsumer 3.依赖1.注解依赖2.KafkaConsumer依赖 本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得 1.配置文件 Yml配置 spring:kafka:bootstrap-servers: cons

MQTT broker搭建并用SSL加密

系统为centos,基于emqx搭建broker,流程参考官方。 安装好后,用ssl加密。 进入/etc/emqx/certs,可以看到 分别为 cacert.pem CA 文件cert.pem 服务端证书key.pem 服务端keyclient-cert.pem 客户端证书client-key.pem 客户端key 编辑emqx配置:vim /etc/emqx/emqx.conf,添加s

Kafka【十二】消费者拉取主题分区的分配策略

【1】消费者组、leader和follower 消费者想要拉取主题分区的数据,首先必须要加入到一个组中。 但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办? 所以这里,我们需要学习Kafka中基本的消费者组中的消费者和分区之间的分配规则: 同一个消费者组的消费者都订

Kafka【十三】消费者消费消息的偏移量

偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据。如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。 【1】起始偏移量 在消费者的配置中,我们可以增加偏移量相关参数auto.offset.re

Kafka的三高设计原理

1.生产者缓存机制--高性能 生产者缓存机制的主要目的是将消息打包,减少网络IO频率 kafka生产者端存在消息累加器RecordAccumulator,它会对每个Partition维护一个双端队列,队列中消息到达一定数量后 或者 到达一定时间后,通过sender线程批量的将消息发送给kafka服务端。(批量发送) 2.发送应答机制--高可用 发送应发机制保证了消息可以安全到达服务端!