本文主要是介绍Kafka 新的消费组默认的偏移量设置和消费行为,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
个人名片
🎓作者简介:java领域优质创作者
🌐个人主页:码农阿豪
📞工作室:新空间代码工作室(提供各种软件服务)
💌个人邮箱:[2435024119@qq.com]
📱个人微信:15279484656
🌐个人导航网站:www.forff.top
💡座右铭:总有人要赢。为什么不能是我呢?
- 专栏导航:
码农阿豪系列专栏导航
面试专栏:收集了java相关高频面试题,面试实战总结🍻🎉🖥️
Spring5系列专栏:整理了Spring5重要知识点与实战演练,有案例可直接使用🚀🔧💻
Redis专栏:Redis从零到一学习分享,经验总结,案例实战💐📝💡
全栈系列专栏:海纳百川有容乃大,可能你想要的东西里面都有🤸🌱🚀
Kafka 新的消费组默认的偏移量设置和消费行为由 auto-offset-reset
配置项决定。以下是详细说明:
目录
- 默认消费行为
- 是否需要设置偏移量
- 不设置偏移量是否会重复消费
- 1. 新的消费者组
- 2. 现有的消费者组
- 3. 配置 `enable-auto-commit`
- 避免重复消费的建议
- 例外情况
- 小结
默认消费行为
当一个新的消费者组第一次订阅一个主题时,它会根据 auto-offset-reset
的配置来决定从哪里开始消费消息。auto-offset-reset
有三个选项:
- earliest:如果消费者组没有已提交的偏移量(即新的消费者组),则从主题的最早消息开始消费。
- latest:如果消费者组没有已提交的偏移量,则从最新的消息开始消费(即从消费者启动之后生成的消息)。
- none:如果消费者组没有已提交的偏移量,则抛出异常。
例如,默认配置可以是:
kafka:bootstrap-servers: 10.206.*.*:9092,10.206.*.*:9092,10.206.*.*:9092consumer:value-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: new-consumer-group # 新的消费者组IDauto-offset-reset: earliest # 从最早的消息开始消费enable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:partition:assignment:strategy: org.apache.kafka.clients.consumer.RoundRobinAssignorfetch-min-size: 100000
是否需要设置偏移量
-
默认情况下:如果你使用
auto-offset-reset: earliest
或auto-offset-reset: latest
,并且enable-auto-commit: true
,新的消费者组会自动从最早或最新的偏移量开始消费,不需要手动设置偏移量。 -
手动设置偏移量:如果你有特定的需求,需要从某个特定的位置(比如某个标签消息)开始消费,则需要手动设置偏移量。手动设置偏移量的步骤如下:
-
禁用自动提交偏移量:设置
enable-auto-commit: false
。 -
在代码中手动查找并设置偏移量:
例如,在 Java 中:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("your-topic"));// 查找特定偏移量 while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {if (record.value().contains("your-tag")) {consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());break;}}break; }// 从设定的偏移量开始消费 while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync(); }
-
不设置偏移量是否会重复消费
是否会重复消费取决于消费者组的配置和消息处理的具体场景。以下是几种可能的情况及其影响:
1. 新的消费者组
-
第一次消费:如果一个新的消费者组第一次订阅一个主题,Kafka 会根据
auto-offset-reset
配置决定从哪里开始消费:earliest
:从最早的消息开始消费。latest
:从最新的消息开始消费(即从消费者启动之后生成的消息)。none
:如果没有已提交的偏移量,则抛出异常。
在这种情况下,不会出现重复消费的情况,因为没有先前的消费记录。
2. 现有的消费者组
- 已有偏移量:如果消费者组已经有已提交的偏移量,Kafka 将从最后提交的偏移量继续消费,不会出现重复消费。
- 未提交偏移量:如果消费者实例崩溃且未能提交偏移量,重启后可能会从上次提交的偏移量开始重新消费,从而导致部分消息被重复消费。
3. 配置 enable-auto-commit
- 启用自动提交(
enable-auto-commit: true
):偏移量会自动提交,通常不会重复消费消息,除非在自动提交间隔内发生消费者崩溃。 - 禁用自动提交(
enable-auto-commit: false
):需要手动提交偏移量,如果在消费完成后未能及时提交偏移量,可能会导致重启后从最后提交的偏移量开始重复消费。
避免重复消费的建议
-
定期提交偏移量:确保在消费完成后及时提交偏移量。可以使用
consumer.commitSync()
或consumer.commitAsync()
方法。while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 同步提交偏移量consumer.commitSync(); }
-
使用幂等性操作:确保消费者对消息的处理是幂等的,即多次处理同一条消息不会产生副作用。这样即使发生重复消费,也不会影响系统的正确性。
-
监控和日志记录:在日志中记录偏移量信息,便于在出现问题时进行调试和修复。
-
适当的自动提交间隔:如果启用了自动提交,设置合适的自动提交间隔(
auto-commit-interval
),确保偏移量能及时提交。
例外情况
在某些高可用或低延迟要求的场景下,可以考虑启用 Kafka 的事务性生产者和消费者,以确保消息消费和处理的准确性和一致性。
总结来说,不设置偏移量本身并不会直接导致重复消费,但需要确保合理的偏移量提交机制和幂等性操作来避免可能的重复消费问题。
小结
- 默认情况下:新的消费者组根据
auto-offset-reset
配置自动决定从哪里开始消费,不需要手动设置偏移量。 - 特殊需求:如果需要从特定的消息位置开始消费,则需要手动管理偏移量,包括禁用自动提交和手动设置偏移量。
根据你的需求,配置和管理消费者组的偏移量以确保消息的正确消费。
这篇关于Kafka 新的消费组默认的偏移量设置和消费行为的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!