KafkaConsumer一些概念解释(从官方文档整理而来)

2023-12-13 14:38

本文主要是介绍KafkaConsumer一些概念解释(从官方文档整理而来),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

阅读之前假设您已经对kafka有了一定的了解

详情和API请参考http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


KafkaConsumer类简介

public class KafkaConsumer<K,V>extends Object implements Consumer<K,V> 
(从官方文档copy而来)

KafkaConsumer是kafka服务的一个java版本的客户端。它会自动处理kafka集群中出现的错误,自动适应kafka集群中数据分区的迁移。它还可以以消费群组(consumer groups)的方式同服务器交互,从而实现消息处理负载均衡( load balance consumption)。 
消费者(consumer )跟代理服务器(broker)之间保持着一个TCP连接来获取数据。如果在使用完consumer之后没有调用close()方法来关闭的话,就会导致连接泄露。KafkaConsumer不是线程安全的,切记。(Producer类是线程安全的,通常我们将Producer以单例的模式实现)


偏移量(Offsets )和消费者位置(Consumer Position)

简短解说,偏移量就相当于数据库中的ID,是一个唯一标识符,代表着一条消息在一个主题(topic)的某个分区(partition)中的位置,消费者位置就是某个订阅该主题的消费者在它所使用的那个主题分区中的位置,也就是它目前处理消息的位置。数据库游标知道吧,消息偏移量就相当于数据库中的数据ID,而消费者位置就相当于数据库游标位置。如果消费者不小心挂掉了,再重启还是会从当前消费者位置来读取数据,这就是Consumer Position的作用。


消费者群组(Consumer Groups)和主题订阅(Topic Subscriptions)

kafka使用消费者群组(Consumer Groups)的概念来分割处理消息的工作。一个consumer group中可以有多个消费者,这多个消费者可以在同一台机器上运行,也可以在不同的机器上运行。这样,就有了一定的扩展性和容错功能。 
每个kafka的consumer都可以配置一个群组名(consumer group),并且可以通过subscribe()方法动态设置想要订阅的topic列表。kafka服务器将把每个topic中的每条消息都发送给订阅它的群组,每个群组中只会有一个consumer来处理这条消息。具体的实现机制就是kafka服务器将每个topic的每个partition都分配给订阅它的群组中的一个consumer,从而实现并发处理和负载均衡。简短解说,我们将概念抽象出来,把topic抽象成一包糖,partition相当于一个一个的糖豆,一个consumer group抽象成一堆熊孩子,那一个consumer就是这堆熊孩子中的一个熊孩子了。假设现在这包糖有90个,这堆熊孩子有30个,那每个熊孩子能飞到3个糖果。假设这包糖有89个,熊孩子还是30个,你该说分不匀了,这就不是你管的事了,kafka服务器会去决定哪个熊孩子倒霉,少分一个。这是在一对一的情况下,将这个概念延伸开来,在多对一,一对多,多对多的情况下也是适用的。你只需要将每包糖果看成彼此独立,每堆熊孩子彼此独立,每包糖果对每堆熊孩子之间都相互独立,互不影响应该就能理解。


kafka服务器检测consumer的失效(Detecting Consumer Failures)

当一个consumer订阅一个topic,在该consumer调用poll(long)方法后就会自动加入它所属于的那个群组。poll(long)方法在设计上可以维持该consumer的活性,只要该consumer持续调用poll(方法)方法。表象之下,poll(long)方法在每次被调用的时候都会向kafka服务器发送一个心跳(heartbeat)来告诉kafka服务器自己依然健在。如果你停止调用poll(long)方法(可能是因为异常导致程序挂掉了),那consumer就不会再向kafka服务器发送heartbeat,然后过一段时间,服务器就会认为该consumer挂掉了,然后就会被踢出consumer所属的群组,然后本来被分配给该consumer的partition就会被重新分配就其他的consumer(就相当于是某个熊孩子不小心把自己玩死了,然后他的糖果就会被拿回去,重新分给其他的熊孩子)。这样设计是为了防止某些consumer挂掉之后依旧握着partition不松手,导致某些消息无法被其他健在的consumer处理的情况发生。 
在单线程情况下,这种设计就要求consumer处理接受到的消息的时间要小于调用poll(long)的周期,从而保证heartbeat的正常发送,从而让服务器知道自己依然健在。这里有一个session timeout的概念,session timeout就是consumer发送两次有效heartbeat的最长时间间隔,严格来说就是在不超过多长的时间内,你让服务器接收到heartbeat,从而确定你的consumer的活性。如果你接收到了消息,然后用来处理这些消息的时间过长,从而导致无法调用poll(long)而无法发送heartbeat,那在session timeout之后,服务器就会认为你的consumer挂掉了。如果服务器认为你的consumer挂掉了,那你consumer相应的partition就会被kafka服务器的负载均衡机制给均衡掉,重新分配给其他的consumer。 
KafkaConsumer类有两项配置可以控制这种行为: 
1、session.timeout.ms:从名称就可以看出来,就是heartbeat超时时间,增加该设置值可以给consumer更多处理poll(long)返回的消息的时间。唯一的缺点是如果你的consumer不小心玩脱挂掉了,服务器可能不能及时检测到,这就会导致服务器不能及时根据你consumer的情况进行负载均衡。如果你调用close()方法来告知服务器你的consumer要退出了,服务器会及时进行负载均衡,这种情况不受该设置的影响。 
2、max.poll.records:意思也显而易见,就是每次调用poll(long)方法的时候,最多返回多少条消息记录。消息处理时间通常跟要处理的消息记录的条数是成比例的,所以通常人们希望在每次调用poll(long)的返回条数上做限制。默认情况下,该设置的值为无限制(no limit)。


使用示例

自动提交位置
     /*从官方文档copy而来*/Properties props = new Properties();/*配置broker*/props.put("bootstrap.servers", "localhost:9092");/*配置group id*/props.put("group.id", "test");/*配置自动提交位置*/props.put("enable.auto.commit", "true");/*配置自动提交的时间,以毫秒为单位*/props.put("auto.commit.interval.ms", "1000");/*配置session timeout时间,以毫秒为单位*/props.put("session.timeout.ms", "30000");/*这两个deserializer一般不要动,直接拿来用就行了*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*创建consumer*/KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);/*配置consumer订阅的主题,这里用 foo 和 bar 做为例子*/consumer.subscribe(Arrays.asList("foo", "bar"));/*一般我们在一个死循环里调用poll(long)和处理消息*/while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
手动提交位置
     /*从官方文档copy而来*/Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");/*这里关掉自动提交*/props.put("enable.auto.commit", "false");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));/*配置一个限制,当消息数量达到这个限制,我们处理消息*/final int minBatchSize = 200;/*消息缓存链表*/List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {/*调用poll(long)来获取消息数据*/ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {/*达到限制,开始处理消息*/insertIntoDb(buffer);/*处理消息后,用同步方法提交consumer position,也就是消费者位置*/consumer.commitSync();buffer.clear();}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

上面的例子是在所有的消息都成功处理完之后一次性提交所有consumer所关联的分区位置,我们还可以更进一步,更细化的控制位置提交的时机,比如我们可以一个分区一个分区的来处理消息,然后每处理完一个分区的消息,我们就提交一下consumer在当前分区的位置。代码如下:

/*从官方文档copy而来*/try {while(running) {/*获取消息数据*/ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);/*遍历消息数据中有关的分区*/for (TopicPartition partition : records.partitions()) {/*取出当前消息分区中的消息*/List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);/*处理消息*/for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}/*计算当前位置*/long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();/*用同步方法提交当前位置*/consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {/*最后别忘了关闭consumer*/consumer.close();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

手动控制分区分配

在之前的例子中,我们只是订阅了我们感兴趣的topic,然后kafka服务器会自动为我们的consumer分配topic下的partition。然而某些情况下我们可能需要手动配置我们的consumer所要使用的partition,例如我们想要重新获取以前已经使用过的消息,我们就需要手动来配置partition。 
如果想要手动配置分区,就不能再调用subscribe()方法,需要调用assign(Collection)来配置,Collection表示所有想要配置的分区的集合。示例代码如下:

     /*代码是从官方文档copy而来*//*想要订阅的topic*/String topic = "foo";/*想要配置的分区们*/TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1);/*配置分区*/consumer.assign(Arrays.asList(partition0, partition1));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

配置完分区之后,具体的使用就跟前面介绍过的一样了。如果想要更换分区,只需重新调用assign()方法就行了。手动配置的分区是没有consumer group的自动负载均衡功能的,所以如果你的consumer挂掉了,并不会引起群组的负载均衡,也就没有其他的consumer自动接管你的consumer的作用,那么消息就不能被该群组处理了。同时,如果一个群组中有多个consumer分配了同一个topic下的同一个分区,那么可能会导致consumer position的commit问题,可能一个consumer提交了一个靠前的位置,而两一个consumer随后提交了一个靠后的位置,从而导致消息重复。为了避免这种冲突,你应该确保使用手动分配partition的群组只有一个consumer,同时这个consumer要分配它所订阅的topic下的所有partition来接受所有的消息。这样,consumer就可以安全的读取任意partition的任意位置的消息了。

注意:

kafka不支持将手动分区分配和自动动态分区分配混合使用,也就是说如果你的群组中有一个consumer是手动分配,则其他的都会成为手动分配,所以建议手动分配的consumer group只配置一个consumer。


控制consumer分配的partition

假设你使用了手动配置分区,且你的consumer group中只有一个consumer,这时你就可以调用seek(TopicPartition,long)方法来读取任意位置的消息了。

转载自  https://blog.csdn.net/lianjunzongsiling/article/details/52622864

这篇关于KafkaConsumer一些概念解释(从官方文档整理而来)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/488829

相关文章

Python 迭代器和生成器概念及场景分析

《Python迭代器和生成器概念及场景分析》yield是Python中实现惰性计算和协程的核心工具,结合send()、throw()、close()等方法,能够构建高效、灵活的数据流和控制流模型,这... 目录迭代器的介绍自定义迭代器省略的迭代器生产器的介绍yield的普通用法yield的高级用法yidle

Java利用docx4j+Freemarker生成word文档

《Java利用docx4j+Freemarker生成word文档》这篇文章主要为大家详细介绍了Java如何利用docx4j+Freemarker生成word文档,文中的示例代码讲解详细,感兴趣的小伙伴... 目录技术方案maven依赖创建模板文件实现代码技术方案Java 1.8 + docx4j + Fr

使用C#代码在PDF文档中添加、删除和替换图片

《使用C#代码在PDF文档中添加、删除和替换图片》在当今数字化文档处理场景中,动态操作PDF文档中的图像已成为企业级应用开发的核心需求之一,本文将介绍如何在.NET平台使用C#代码在PDF文档中添加、... 目录引言用C#添加图片到PDF文档用C#删除PDF文档中的图片用C#替换PDF文档中的图片引言在当

详解C#如何提取PDF文档中的图片

《详解C#如何提取PDF文档中的图片》提取图片可以将这些图像资源进行单独保存,方便后续在不同的项目中使用,下面我们就来看看如何使用C#通过代码从PDF文档中提取图片吧... 当 PDF 文件中包含有价值的图片,如艺术画作、设计素材、报告图表等,提取图片可以将这些图像资源进行单独保存,方便后续在不同的项目中使

Mysql中深分页的五种常用方法整理

《Mysql中深分页的五种常用方法整理》在数据量非常大的情况下,深分页查询则变得很常见,这篇文章为大家整理了5个常用的方法,文中的示例代码讲解详细,大家可以根据自己的需求进行选择... 目录方案一:延迟关联 (Deferred Join)方案二:有序唯一键分页 (Cursor-based Paginatio

Python实现合并与拆分多个PDF文档中的指定页

《Python实现合并与拆分多个PDF文档中的指定页》这篇文章主要为大家详细介绍了如何使用Python实现将多个PDF文档中的指定页合并生成新的PDF以及拆分PDF,感兴趣的小伙伴可以参考一下... 安装所需要的库pip install PyPDF2 -i https://pypi.tuna.tsingh

Python批量调整Word文档中的字体、段落间距及格式

《Python批量调整Word文档中的字体、段落间距及格式》这篇文章主要为大家详细介绍了如何使用Python的docx库来批量处理Word文档,包括设置首行缩进、字体、字号、行间距、段落对齐方式等,需... 目录关键代码一级标题设置  正文设置完整代码运行结果最近关于批处理格式的问题我查了很多资料,但是都没

Mysql中InnoDB与MyISAM索引差异详解(最新整理)

《Mysql中InnoDB与MyISAM索引差异详解(最新整理)》InnoDB和MyISAM在索引实现和特性上有差异,包括聚集索引、非聚集索引、事务支持、并发控制、覆盖索引、主键约束、外键支持和物理存... 目录1. 索引类型与数据存储方式InnoDBMyISAM2. 事务与并发控制InnoDBMyISAM

StarRocks索引详解(最新整理)

《StarRocks索引详解(最新整理)》StarRocks支持多种索引类型,包括主键索引、前缀索引、Bitmap索引和Bloomfilter索引,这些索引类型适用于不同场景,如唯一性约束、减少索引空... 目录1. 主键索引(Primary Key Index)2. 前缀索引(Prefix Index /

Python自动化Office文档处理全攻略

《Python自动化Office文档处理全攻略》在日常办公中,处理Word、Excel和PDF等Office文档是再常见不过的任务,手动操作这些文档不仅耗时耗力,还容易出错,幸运的是,Python提供... 目录一、自动化处理Word文档1. 安装python-docx库2. 读取Word文档内容3. 修改