kafka consumer 总结及配置详解学习

2024-05-06 20:32

本文主要是介绍kafka consumer 总结及配置详解学习,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1、Consumer Group 与 topic 订阅

1.1 Consumer 与 partition

1.2 Consumer 与Consumer Group

1.3 Coordinator

1.4 Consumer Group Management

2、Consumer Fetch Message

2.1 poll 方法

2.2 commit offset

3、Consumer的线程安全性

4、Consumer Configuration

5、总结:发生relebance的三种情况


1、Consumer Group 与 topic 订阅

每个Consumer 进程都会划归到一个逻辑的Consumer Group中,逻辑的订阅者是Consumer Group。所以一条message可以被多个订阅message 所在的topic的每一个Consumer Group,也就好像是这条message被广播到每个Consumer Group一样。而每个Consumer Group中,类似于一个Queue(JMS中的Queue)的概念差不多,即一条消息只会被Consumer Group中的一个Consumer消费。

1.1 Consumer 与 partition

    其实上面所说的订阅关系还不够明确,其实topic中的partition被分配到某个consumer上,也就是某个consumer订阅了某个partition。 再重复一下:consumer订阅的是partition,而不是message。所以在同一时间点上,订阅到同一个partition的consumer必然属于不同的Consumer Group。

    在官方网站上,给出了这样一张图:

一个kafka cluster中的某个topic,有4个partition。有两个consumer group (A and B)订阅了该topic。 Consumer Group A有2个partition:p0、p1,Consumer Group B有4个partition:c3,c4,c5,c6。经过分区分配后,consumer与partition的订阅关系如下:

Topic 中的4个partition在Consumer Group A中的分配情况如下:
C1 订阅p0,p3
C2 订阅p1,p2
Topic 中的4个partition在Consumer Group B中的分配情况如下:
C3 订阅p0
C4 订阅p3
C5 订阅p1
C6 订阅p2
 另外要知道的是,partition分配的工作其实是在consumer leader中完成的。

1.2 Consumer 与Consumer Group

Consumer Group与Consumer的关系是动态维护的:

当一个Consumer 进程挂掉 或者是卡住时,该consumer所订阅的partition会被重新分配到该group内的其它的consumer上。当一个consumer加入到一个consumer group中时,同样会从其它的consumer中分配出一个或者多个partition 到这个新加入的consumer。

    当启动一个Consumer时,会指定它要加入的group,使用的是配置项:group.id。

为了维持Consumer 与 Consumer Group的关系,需要Consumer周期性的发送heartbeat到coordinator(协调者,在早期版本,以zookeeper作为协调者。后期版本则以某个broker作为协调者)。当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。而这个过程,被称为rebalance。

那么现在有这样一个问题:如果一个consumer 进程一直在周期性的发送heartbeat,但是它就是不消费消息,这种状态称为livelock状态。那么在这种状态下,它所订阅的partition不消息是否就一直不能被消费呢?

1.3 Coordinator

    Coordinator 协调者,协调consumer、broker。早期版本中Coordinator,使用zookeeper实现,但是这样做,rebalance的负担太重。为了解决scalable的问题,不再使用zookeeper,而是让每个broker来负责一些group的管理,这样consumer就完全不再依赖zookeeper了。

1.3.1 Consumer连接到coordinator

    从Consumer的实现来看,在执行poll或者是join group之前,都要保证已连接到Coordinator。连接到coordinator的过程是:

    1)连接到最后一次连接的broker(如果是刚启动的consumer,则要根据配置中的borker)。它会响应一个包含coordinator信息(host, port等)的response。

    2)连接到coordinator。

1.4 Consumer Group Management

    Consumer Group 管理中,也是需要coordinator的参与。一个Consumer要join到一个group中,或者一个consumer退出时,都要进行rebalance。进行rebalance的流程是:

1)会给一个coordinator发起Join请求(请求中要包括自己的一些元数据,例如自己感兴趣的topics)

2)Coordinator 根据这些consumer的join请求,选择出一个leader,并通知给各个consumer。这里的leader是consumer group 内的leader,是由某个consumer担任,不要与partition的leader混淆。

3)Consumer leader 根据这些consumer的metadata,重新为每个consumer member重新分配partition。分配完毕通过coordinator把最新分配情况同步给每个consumer。

4)Consumer拿到最新的分配后,继续工作。

2、Consumer Fetch Message

在Kafka partition中,每个消息有一个唯一标识,即partition内的offset。每个consumer group中的订阅到某个partition的consumer在从partition中读取数据时,是依次读取的。

    上图中,Consumer A、B分属于不用的Consumer Group。Consumer B读取到offset =11,Consumer A读取到offset=9 。这个值表示Consumer Group中的某个Consumer 在下次读取该partition时会从哪个offset的 message开始读取,即 Consumer Group A 中的Consumer下次会从offset = 9 的message 读取, Consumer Group B 中的Consumer下次会从offset = 11 的message 读取。

    这里并没有说是Consumer A 下次会从offset = 9 的message读取,原因是Consumer A可能会退出Group ,然后Group A 进行rebalance,即重新分配分区。

2.1 poll 方法

Consumer读取partition中的数据是通过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。但是这个poll方法只是可能会发起fetch请求。原因是:Consumer每次发起fetch请求时,读取到的数据是有限制的,通过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项个max.poll.records来限制一次最多pool多少个record。

那么就可能出现这样的情况: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。

    在consumer中,还有另外一个配置项:max.poll.interval.ms ,它表示最大的poll数据间隔,如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer退出consumer group。所以为了不使Consumer 自己被退出,Consumer 应该不停的发起poll(timeout)操作。而这个动作 KafkaConsumer Client是不会帮我们做的,这就需要自己在程序中不停的调用poll方法了。

2.2 commit offset

    当一个consumer因某种原因退出Group时,进行重新分配partition后,同一group中的另一个consumer在读取该partition时,怎么能够知道上一个consumer该从哪个offset的message读取呢?也是是如何保证同一个group内的consumer不重复消费消息呢?上面说了一次走网络的fetch请求会拉取到一定量的数据,但是这些数据还没有被消息完毕,Consumer就挂掉了,下一次进行数据fetch时,是否会从上次读到的数据开始读取,而导致Consumer消费的数据丢失吗?

    为了做到这一点,当使用完poll从本地缓存拉取到数据之后,需要client调用commitSync方法(或者commitAsync方法)去commit 下一次该去读取 哪一个offset的message。

    而这个commit方法会通过走网络的commit请求将offset在coordinator中保留,这样就能够保证下一次读取(不论进行了rebalance)时,既不会重复消费消息,也不会遗漏消息。

    对于offset的commit,Kafka Consumer Java Client支持两种模式:由KafkaConsumer自动提交,或者是用户通过调用commitSync、commitAsync方法的方式完成offset的提交。

自动提交的例子:

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}

手动提交的例子: 

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));final int minBatchSize = 200;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);consumer.commitSync();buffer.clear();}}

在手动提交时,需要注意的一点是:要提交的是下一次要读取的offset,例如: 

try {while(running) {// 取得消息ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);// 根据分区来遍历数据:for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);// 数据处理for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}// 取得当前读取到的最后一条记录的offsetlong lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 提交offset,记得要 + 1consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}

3、Consumer的线程安全性

KafkaProducer是线程安全的,上一节已经了解到。但Consumer却没有设计成线程安全的。当用户想要在在多线程环境下使用kafkaConsumer时,需要自己来保证synchronized。如果没有这样的保证,就会抛出ConcurrentModificatinException的。

当你想要关闭Consumer或者为也其它的目的想要中断Consumer的处理时,可以调用consumer的wakeup方法。这个方法会抛出WakeupException。

例如:

public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(10000);// Handle new records}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}}

  

4、Consumer Configuration(适合2.+版本)

    在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置如下:

·bootstrap.servers

在启动consumer时配置的broker地址的。不需要将cluster中所有的broker都配置上,因为启动后会自动的发现cluster所有的broker。

    它配置的格式是:host1:port1;host2:port2…

·key.descrializervalue.descrializer

Message record 的key, value的反序列化类。

·group.id

用于表示该consumer想要加入到哪个group中。默认值是 “”。

·heartbeat.interval.ms

心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活,加入或者退出group的有效手段。

    这个值必须设置的小于session.timeout.ms,因为:

当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。通常设置的值要低于session.timeout.ms的1/3。其默认值是:3000 (3s)

·session.timeout.ms

Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。其默认值是:10000 (10 s)。

·enable.auto.commit

Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit,默认值是true。

·auto.commit.interval.ms

    自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

·auto.offset.reset

    这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:

1) earliest:自动重置到最早的offset。

2) latest:看上去重置到最晚的offset。

3) none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。

4) 如果不是上述3种,只抛出异常给consumer。

默认值是latest。

·connections.max.idle.ms

连接空闲超时时间。因为consumer只与broker有连接(coordinator也是一个broker),所以这个配置的是consumer到broker之间的。默认值是:5 * 60 * 1000ms(5 min)

·fetch.max.wait.ms

Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。默认值是:500ms

·fetch.min.bytes

当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。

取值范围是:[0, Integer.Max],默认值是1。

默认值设置为1的目的是:使得consumer的请求能够尽快的返回。

·fetch.max.bytes

一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。

    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。

取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)

·max.partition.fetch.bytes

一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。默认值是:1 * 1024 * 1024byte(1 MB)

    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。

·max.poll.interval.ms

前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。默认值是:300000ms(5 min)

·max.poll.records

    Consumer每次调用poll()时取到的records的最大数。默认值是:500。

·receive.buffer.byte

Consumer receiver buffer (SO_RCVBUF)的大小。这个值在创建Socket连接时会用到。

取值范围是:[-1, Integer.MAX]。默认值是:65536 (64 KB)

如果值设置为-1,则会使用操作系统默认的值。

·request.timeout.ms

请求发起后,并不一定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:30000ms(30s)。

·client.id

Consumer进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。

·interceptor.classes

    用户自定义interceptor。

·metadata.max.age.ms

Metadata数据的刷新间隔。即便没有任何的partition订阅关系变更也行执行。

范围是:[0, Integer.MAX],默认值是:300000 (5 min)

5、总结:发生relebance的三种情况
 

在以下三种情况下会发生relebance:订阅topic的数量发生变化、topic分区发生变化、消费端的消费者组成员变化。

  • 订阅主题数发生变化

这种是我们的业务调整才会,所以这种基本要么不发生要么就是不可避免。

  • 主题分区发生变化

这种情况发生会相对多一点,但是也有限,我们就需要考虑到该集群的容量,以便来确定好分区数。虽然不一定一步到位,但是调整的次数应该是极其有限的,一般也可以选择在半夜低峰的时候进行调整,影响不大。

  • 消费端的消费者组成员变化
  1. 心跳超时
    如果消费者在指定的session.timeout.ms时间内没有汇报心跳, 那么Kafka就会认为该消费已经dead了(session.timeout.ms默认值10000ms, 即10秒)。
  2. 消费者处理消息超时
    即如果消费者处理消费的消息的时间超过了Kafka集群配置的 max.poll.interval.ms 的值,那么该消费者将会自动离组(max.poll.interval.ms默认值300000ms, 即5分钟)。

kafka中文教程地址:https://www.orchome.com/kafka/index

这篇关于kafka consumer 总结及配置详解学习的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965299

相关文章

详解Vue如何使用xlsx库导出Excel文件

《详解Vue如何使用xlsx库导出Excel文件》第三方库xlsx提供了强大的功能来处理Excel文件,它可以简化导出Excel文件这个过程,本文将为大家详细介绍一下它的具体使用,需要的小伙伴可以了解... 目录1. 安装依赖2. 创建vue组件3. 解释代码在Vue.js项目中导出Excel文件,使用第三

SQL注入漏洞扫描之sqlmap详解

《SQL注入漏洞扫描之sqlmap详解》SQLMap是一款自动执行SQL注入的审计工具,支持多种SQL注入技术,包括布尔型盲注、时间型盲注、报错型注入、联合查询注入和堆叠查询注入... 目录what支持类型how---less-1为例1.检测网站是否存在sql注入漏洞的注入点2.列举可用数据库3.列举数据库

Kubernetes常用命令大全近期总结

《Kubernetes常用命令大全近期总结》Kubernetes是用于大规模部署和管理这些容器的开源软件-在希腊语中,这个词还有“舵手”或“飞行员”的意思,使用Kubernetes(有时被称为“... 目录前言Kubernetes 的工作原理为什么要使用 Kubernetes?Kubernetes常用命令总

Linux之软件包管理器yum详解

《Linux之软件包管理器yum详解》文章介绍了现代类Unix操作系统中软件包管理和包存储库的工作原理,以及如何使用包管理器如yum来安装、更新和卸载软件,文章还介绍了如何配置yum源,更新系统软件包... 目录软件包yumyum语法yum常用命令yum源配置文件介绍更新yum源查看已经安装软件的方法总结软

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

python管理工具之conda安装部署及使用详解

《python管理工具之conda安装部署及使用详解》这篇文章详细介绍了如何安装和使用conda来管理Python环境,它涵盖了从安装部署、镜像源配置到具体的conda使用方法,包括创建、激活、安装包... 目录pytpshheraerUhon管理工具:conda部署+使用一、安装部署1、 下载2、 安装3

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

windos server2022的配置故障转移服务的图文教程

《windosserver2022的配置故障转移服务的图文教程》本文主要介绍了windosserver2022的配置故障转移服务的图文教程,以确保服务和应用程序的连续性和可用性,文中通过图文介绍的非... 目录准备环境:步骤故障转移群集是 Windows Server 2022 中提供的一种功能,用于在多个