本文主要是介绍kafka的offset存储位置以及offset的提交方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一 offset的存储位置
1.1 存储位置
1.从 0.9 版本开始, consumer 默认将 offset 保存在 Kafka 一个内置的topic 中,该 topic 为 __consumer_offsets
2. Kafka0.9 版本之前, consumer 默认将 offset 保存在 Zookeeper 中。
__consumer_offsets 主题里面采 用 key 和 value 的方式存储数据 。 key 是 group.id+topic+
分区号 , value 就是当前 offset 的值。每隔一段时间, kafka 内部会对这个 topic 进行
compact ,也就是每个 group.id+topic+分区号就保留最新数据 。
1.2 消费offset案例
1.首先在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,
默认是 true ,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
2. 采用命令行方式,创建一个新的 topic 。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --
replication-factor 2
3.启动生产者往 atguigu 生产数据
bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092
4.消费数据
bin/kafka-console-consumer.sh -- bootstrap-server hadoop102:9092 --topic atguigu --group test
注意:指定消费者组名称,更好观察数据存储位置( key 是 group.id+topic+ 分区号)。
5. 查看消费者消费主题 __consumer_offsets
二 offset的提交方式
2.1 自动提交方式
为了使我们能够专注于自己的业务逻辑, Kafka提供了自动提交offset的功能。
自动提交 offset 的相关参数:
⚫ enable.auto.commit : 是否开启自动提交 offset 功能,默认是 true
⚫ auto.commit.interval.ms : 自动提交 offset 的时间间隔,默认是 5s
2.1.1 代码部分设置
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
// 提交 offset 的时间周期 1000ms ,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
1000);
2.2 手动提交方式
虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset 的 API 。
手动提交 offset 的方法有两种:分别是 commitSync (同步提交) 和 commitAsync (异步提交) 。两者的相 同点是,都会将 本次提交的一批数据最高的偏移量提交 ;不同点是, 同步提交阻塞当前线程 ,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 异步提交则没有失败重试机制,故 有可能提交失败。
•
commitSync (同步提交):必须等待 offset提交完毕,再去消费下一批数据 。并且会自动失败重试
•
commitAsync (异步提交) :发送完提交 offset请求后,就开始消费下一批数据了。没有失败重试机制
2.2.1 同步提交
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。以下为同步提交 offset 的示例
2.3.2 异步提交
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此
吞吐量会受到很大的影响。 因此更多的情况下,会选用异步提交 offset 的方式。
这篇关于kafka的offset存储位置以及offset的提交方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!