Kafka Consumer API 的使用

2024-04-06 18:58
文章标签 使用 api kafka consumer

本文主要是介绍Kafka Consumer API 的使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

导读

Kafka具有两套消费者API:高级API、低级API。本文章将介绍两种API的区别以及使用时需要注意的地方。

低级API

1. 使用方法

  1. find leader broker
  2. build request
  3. fetch data
  4. identify leader change

2. 为什么要find leader

kafka在0.8版本后,引入replication机制。每个partition是有备份的,在某个broker出故障后,用户仍可以从其他备份中读取数据。消费者并不是并行从多个broker上获取同一个partition的数据,而是选举出一个leader broker,这个broker上的该partition将用于读写。其他的partition则复制leader broker上partition的数据,保持同步。

备份个数由创建topic时,replica-factory决定,当该参数为1时,表示不备份,大于1时,每个partition将有多个备份partition,且分布在不同的broker上。

3. 适合场合

  1. 消费起点需要设置
  2. 反复消费某一段数据

备注:笔者项目中有一个需求是,解析数据时,既可以支持从断点消费,又可以支持从当前位移消费。这个需求就使用了这个API实现功能。

高级API

1. 原理

高级API使用,围绕数据流工作,利用的低级API消费数据。用户不用关心leader broker、offset等问题。

2. 线程

使用高级API时,需要关注线程问题。

用户在使用高级API时,需要指定每个topic获取数据的线程数量。一个线程对应一个数据流。但是寻找主分区、创建流、设置offset这些过程中,高级API仍只有一个线程。只有当从partition中获取数据时,每个流才会产生一个fetchRunable的线程。

每个topic的线程数,最好设置为等于或者小于topic的partition个数。

3. Zookeeper

Kafka的使用需要Zookeeper有以下原因:

  1. 动态集群扩展。
  2. broker的注册,保存topic、partition元数据。
  3. consumer的注册。
  4. watcher的注册。

而高级API使用中,上述Zookeeper的作用全都用到了。首先,均衡(balance)partition和consumer时,需要两者信息。第二,kafka通过Watcher知道broker、topic、partition是否有变化。第三,kafka通过与zk通讯监控partition leader存活性.

另外,在笔者实验中发现,用kill指令杀死进程时,该进程中kafka消费者在zk中的注册信息可能并没有及时删除,如果马上拉起这个进程,将会可能出现消费者大于partition个数的情况。这种情况并不是必现,原因可能和kafka与zk的通讯时间有关系(会多查阅一点资料,验证猜想是否正确)

4. Rebalance

因为每个流,其实都需要指定数据来源的partition.每次创建线程,从partion中获取数据时,需要将同一个topic所有的partition和该group中消费该topic的所有线程合理分配,保证每一个partition只被一个线程消费。这个过程叫做balance,由高级API自动完成。

当发生以下三种情况的时候,会触发Kafka高级API的rebalance动作:

  1. 同一个group下,有新的消费者加入。
  2. 同一个group下,有topic的partition个数有变化。
  3. kafka API与zk的连接中断。

前两种情况比较好理解,重点讲第三种情况。我的理解是,zk超时或者断开后,kafka没有注册partition的信息,需要重新连接zk获取最新的注册信息,并根据新获取的信息进行线程、分区之间的分配和均衡。(个人理解,我会多查阅一些资料证实。)

zk超时后rebalance其实是很有可能不成功,并导致更多次的rebalance。原因是,如果kafka rebalance尝试的总时间(即尝试次数*每次尝试时间)小于zk超时时间,那么在zk连接失败重连之前,kafka的rebalance已经失败。这个原因可能会导致高级API不断的rebalance。而高级API默认设置参数,rebalance的尝试总时间是小于zk超时时间的,所以大家使用高级API时要根据实际情况处理这一点。



作者:君子月满楼
链接:https://www.jianshu.com/p/4d03ee74ad66
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

这篇关于Kafka Consumer API 的使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/880579

相关文章

使用Python从PPT文档中提取图片和图片信息(如坐标、宽度和高度等)

《使用Python从PPT文档中提取图片和图片信息(如坐标、宽度和高度等)》PPT是一种高效的信息展示工具,广泛应用于教育、商务和设计等多个领域,PPT文档中常常包含丰富的图片内容,这些图片不仅提升了... 目录一、引言二、环境与工具三、python 提取PPT背景图片3.1 提取幻灯片背景图片3.2 提取

使用Python实现图像LBP特征提取的操作方法

《使用Python实现图像LBP特征提取的操作方法》LBP特征叫做局部二值模式,常用于纹理特征提取,并在纹理分类中具有较强的区分能力,本文给大家介绍了如何使用Python实现图像LBP特征提取的操作方... 目录一、LBP特征介绍二、LBP特征描述三、一些改进版本的LBP1.圆形LBP算子2.旋转不变的LB

Maven的使用和配置国内源的保姆级教程

《Maven的使用和配置国内源的保姆级教程》Maven是⼀个项目管理工具,基于POM(ProjectObjectModel,项目对象模型)的概念,Maven可以通过一小段描述信息来管理项目的构建,报告... 目录1. 什么是Maven?2.创建⼀个Maven项目3.Maven 核心功能4.使用Maven H

Python中__init__方法使用的深度解析

《Python中__init__方法使用的深度解析》在Python的面向对象编程(OOP)体系中,__init__方法如同建造房屋时的奠基仪式——它定义了对象诞生时的初始状态,下面我们就来深入了解下_... 目录一、__init__的基因图谱二、初始化过程的魔法时刻继承链中的初始化顺序self参数的奥秘默认

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

Spring Boot 集成 Quartz并使用Cron 表达式实现定时任务

《SpringBoot集成Quartz并使用Cron表达式实现定时任务》本篇文章介绍了如何在SpringBoot中集成Quartz进行定时任务调度,并通过Cron表达式控制任务... 目录前言1. 添加 Quartz 依赖2. 创建 Quartz 任务3. 配置 Quartz 任务调度4. 启动 Sprin

Linux下如何使用C++获取硬件信息

《Linux下如何使用C++获取硬件信息》这篇文章主要为大家详细介绍了如何使用C++实现获取CPU,主板,磁盘,BIOS信息等硬件信息,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录方法获取CPU信息:读取"/proc/cpuinfo"文件获取磁盘信息:读取"/proc/diskstats"文

Java使用SLF4J记录不同级别日志的示例详解

《Java使用SLF4J记录不同级别日志的示例详解》SLF4J是一个简单的日志门面,它允许在运行时选择不同的日志实现,这篇文章主要为大家详细介绍了如何使用SLF4J记录不同级别日志,感兴趣的可以了解下... 目录一、SLF4J简介二、添加依赖三、配置Logback四、记录不同级别的日志五、总结一、SLF4J

使用Python实现一个优雅的异步定时器

《使用Python实现一个优雅的异步定时器》在Python中实现定时器功能是一个常见需求,尤其是在需要周期性执行任务的场景下,本文给大家介绍了基于asyncio和threading模块,可扩展的异步定... 目录需求背景代码1. 单例事件循环的实现2. 事件循环的运行与关闭3. 定时器核心逻辑4. 启动与停

如何使用Nginx配置将80端口重定向到443端口

《如何使用Nginx配置将80端口重定向到443端口》这篇文章主要为大家详细介绍了如何将Nginx配置为将HTTP(80端口)请求重定向到HTTPS(443端口),文中的示例代码讲解详细,有需要的小伙... 目录1. 创建或编辑Nginx配置文件2. 配置HTTP重定向到HTTPS3. 配置HTTPS服务器