kafka的offset存储位置以及offset的提交方式

2024-05-29 07:48

本文主要是介绍kafka的offset存储位置以及offset的提交方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 offset的存储位置

1.1 存储位置

1.从 0.9 版本开始, consumer 默认将 offset 保存在 Kafka 一个内置的topic 中,该 topic __consumer_offsets
2. Kafka0.9 版本之前, consumer 默认将 offset 保存在 Zookeeper 中。

 

__consumer_offsets 主题里面采 用 key 和 value 的方式存储数据 key 是 group.id+topic+
分区号 value 就是当前 offset 的值。每隔一段时间, kafka 内部会对这个 topic 进行
compact ,也就是每个 group.id+topic+分区号就保留最新数据

1.2 消费offset案例

1.首先在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false

默认是 true ,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
2. 采用命令行方式,创建一个新的 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --
replication-factor 2
3.启动生产者往 atguigu 生产数据
bin/kafka-console-producer.sh --topic  atguigu --bootstrap-server hadoop102:9092
4.消费数据
bin/kafka-console-consumer.sh  -- bootstrap-server hadoop102:9092 --topic atguigu --group test
注意:指定消费者组名称,更好观察数据存储位置( key group.id+topic+ 分区号)。
5. 查看消费者消费主题 __consumer_offsets

 二  offset的提交方式

2.1 自动提交方式

为了使我们能够专注于自己的业务逻辑, Kafka提供了自动提交offset的功能。
自动提交 offset 的相关参数:
enable.auto.commit 是否开启自动提交 offset 功能,默认是 true
auto.commit.interval.ms 自动提交 offset 的时间间隔,默认是 5s

 

 2.1.1 代码部分设置

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
// 提交 offset 的时间周期 1000ms ,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
1000);

2.2 手动提交方式

虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset API
手动提交 offset 的方法有两种:分别是 commitSync (同步提交) commitAsync (异步提交) 。两者的相 同点是,都会将 本次提交的一批数据最高的偏移量提交 ;不同点是, 同步提交阻塞当前线程 ,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 异步提交则没有失败重试机制,故 有可能提交失败。
commitSync (同步提交):必须等待 offset提交完毕,再去消费下一批数据 。并且会自动失败重试
commitAsync (异步提交) :发送完提交 offset请求后,就开始消费下一批数据了。没有失败重试机制

2.2.1 同步提交

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。以下为同步提交 offset 的示例

 

 2.3.2 异步提交

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此
吞吐量会受到很大的影响。 因此更多的情况下,会选用异步提交 offset 的方式。

 

这篇关于kafka的offset存储位置以及offset的提交方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013065

相关文章

SpringBoot中@Value注入静态变量方式

《SpringBoot中@Value注入静态变量方式》SpringBoot中静态变量无法直接用@Value注入,需通过setter方法,@Value(${})从属性文件获取值,@Value(#{})用... 目录项目场景解决方案注解说明1、@Value("${}")使用示例2、@Value("#{}"php

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

HTTP 与 SpringBoot 参数提交与接收协议方式

《HTTP与SpringBoot参数提交与接收协议方式》HTTP参数提交方式包括URL查询、表单、JSON/XML、路径变量、头部、Cookie、GraphQL、WebSocket和SSE,依据... 目录HTTP 协议支持多种参数提交方式,主要取决于请求方法(Method)和内容类型(Content-Ty

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

使用shardingsphere实现mysql数据库分片方式

《使用shardingsphere实现mysql数据库分片方式》本文介绍如何使用ShardingSphere-JDBC在SpringBoot中实现MySQL水平分库,涵盖分片策略、路由算法及零侵入配置... 目录一、ShardingSphere 简介1.1 对比1.2 核心概念1.3 Sharding-Sp

Spring创建Bean的八种主要方式详解

《Spring创建Bean的八种主要方式详解》Spring(尤其是SpringBoot)提供了多种方式来让容器创建和管理Bean,@Component、@Configuration+@Bean、@En... 目录引言一、Spring 创建 Bean 的 8 种主要方式1. @Component 及其衍生注解

python中的显式声明类型参数使用方式

《python中的显式声明类型参数使用方式》文章探讨了Python3.10+版本中类型注解的使用,指出FastAPI官方示例强调显式声明参数类型,通过|操作符替代Union/Optional,可提升代... 目录背景python函数显式声明的类型汇总基本类型集合类型Optional and Union(py

Linux系统管理与进程任务管理方式

《Linux系统管理与进程任务管理方式》本文系统讲解Linux管理核心技能,涵盖引导流程、服务控制(Systemd与GRUB2)、进程管理(前台/后台运行、工具使用)、计划任务(at/cron)及常用... 目录引言一、linux系统引导过程与服务控制1.1 系统引导的五个关键阶段1.2 GRUB2的进化优

使用SpringBoot+InfluxDB实现高效数据存储与查询

《使用SpringBoot+InfluxDB实现高效数据存储与查询》InfluxDB是一个开源的时间序列数据库,特别适合处理带有时间戳的监控数据、指标数据等,下面详细介绍如何在SpringBoot项目... 目录1、项目介绍2、 InfluxDB 介绍3、Spring Boot 配置 InfluxDB4、I