Kafka Consumer API 的使用

2024-04-06 18:58
文章标签 使用 api kafka consumer

本文主要是介绍Kafka Consumer API 的使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

导读

Kafka具有两套消费者API:高级API、低级API。本文章将介绍两种API的区别以及使用时需要注意的地方。

低级API

1. 使用方法

  1. find leader broker
  2. build request
  3. fetch data
  4. identify leader change

2. 为什么要find leader

kafka在0.8版本后,引入replication机制。每个partition是有备份的,在某个broker出故障后,用户仍可以从其他备份中读取数据。消费者并不是并行从多个broker上获取同一个partition的数据,而是选举出一个leader broker,这个broker上的该partition将用于读写。其他的partition则复制leader broker上partition的数据,保持同步。

备份个数由创建topic时,replica-factory决定,当该参数为1时,表示不备份,大于1时,每个partition将有多个备份partition,且分布在不同的broker上。

3. 适合场合

  1. 消费起点需要设置
  2. 反复消费某一段数据

备注:笔者项目中有一个需求是,解析数据时,既可以支持从断点消费,又可以支持从当前位移消费。这个需求就使用了这个API实现功能。

高级API

1. 原理

高级API使用,围绕数据流工作,利用的低级API消费数据。用户不用关心leader broker、offset等问题。

2. 线程

使用高级API时,需要关注线程问题。

用户在使用高级API时,需要指定每个topic获取数据的线程数量。一个线程对应一个数据流。但是寻找主分区、创建流、设置offset这些过程中,高级API仍只有一个线程。只有当从partition中获取数据时,每个流才会产生一个fetchRunable的线程。

每个topic的线程数,最好设置为等于或者小于topic的partition个数。

3. Zookeeper

Kafka的使用需要Zookeeper有以下原因:

  1. 动态集群扩展。
  2. broker的注册,保存topic、partition元数据。
  3. consumer的注册。
  4. watcher的注册。

而高级API使用中,上述Zookeeper的作用全都用到了。首先,均衡(balance)partition和consumer时,需要两者信息。第二,kafka通过Watcher知道broker、topic、partition是否有变化。第三,kafka通过与zk通讯监控partition leader存活性.

另外,在笔者实验中发现,用kill指令杀死进程时,该进程中kafka消费者在zk中的注册信息可能并没有及时删除,如果马上拉起这个进程,将会可能出现消费者大于partition个数的情况。这种情况并不是必现,原因可能和kafka与zk的通讯时间有关系(会多查阅一点资料,验证猜想是否正确)

4. Rebalance

因为每个流,其实都需要指定数据来源的partition.每次创建线程,从partion中获取数据时,需要将同一个topic所有的partition和该group中消费该topic的所有线程合理分配,保证每一个partition只被一个线程消费。这个过程叫做balance,由高级API自动完成。

当发生以下三种情况的时候,会触发Kafka高级API的rebalance动作:

  1. 同一个group下,有新的消费者加入。
  2. 同一个group下,有topic的partition个数有变化。
  3. kafka API与zk的连接中断。

前两种情况比较好理解,重点讲第三种情况。我的理解是,zk超时或者断开后,kafka没有注册partition的信息,需要重新连接zk获取最新的注册信息,并根据新获取的信息进行线程、分区之间的分配和均衡。(个人理解,我会多查阅一些资料证实。)

zk超时后rebalance其实是很有可能不成功,并导致更多次的rebalance。原因是,如果kafka rebalance尝试的总时间(即尝试次数*每次尝试时间)小于zk超时时间,那么在zk连接失败重连之前,kafka的rebalance已经失败。这个原因可能会导致高级API不断的rebalance。而高级API默认设置参数,rebalance的尝试总时间是小于zk超时时间的,所以大家使用高级API时要根据实际情况处理这一点。



作者:君子月满楼
链接:https://www.jianshu.com/p/4d03ee74ad66
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

这篇关于Kafka Consumer API 的使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/880579

相关文章

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

使用Python绘制蛇年春节祝福艺术图

《使用Python绘制蛇年春节祝福艺术图》:本文主要介绍如何使用Python的Matplotlib库绘制一幅富有创意的“蛇年有福”艺术图,这幅图结合了数字,蛇形,花朵等装饰,需要的可以参考下... 目录1. 绘图的基本概念2. 准备工作3. 实现代码解析3.1 设置绘图画布3.2 绘制数字“2025”3.3

Jsoncpp的安装与使用方式

《Jsoncpp的安装与使用方式》JsonCpp是一个用于解析和生成JSON数据的C++库,它支持解析JSON文件或字符串到C++对象,以及将C++对象序列化回JSON格式,安装JsonCpp可以通过... 目录安装jsoncppJsoncpp的使用Value类构造函数检测保存的数据类型提取数据对json数

python使用watchdog实现文件资源监控

《python使用watchdog实现文件资源监控》watchdog支持跨平台文件资源监控,可以检测指定文件夹下文件及文件夹变动,下面我们来看看Python如何使用watchdog实现文件资源监控吧... python文件监控库watchdogs简介随着Python在各种应用领域中的广泛使用,其生态环境也

Python中构建终端应用界面利器Blessed模块的使用

《Python中构建终端应用界面利器Blessed模块的使用》Blessed库作为一个轻量级且功能强大的解决方案,开始在开发者中赢得口碑,今天,我们就一起来探索一下它是如何让终端UI开发变得轻松而高... 目录一、安装与配置:简单、快速、无障碍二、基本功能:从彩色文本到动态交互1. 显示基本内容2. 创建链

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

使用Nginx来共享文件的详细教程

《使用Nginx来共享文件的详细教程》有时我们想共享电脑上的某些文件,一个比较方便的做法是,开一个HTTP服务,指向文件所在的目录,这次我们用nginx来实现这个需求,本文将通过代码示例一步步教你使用... 在本教程中,我们将向您展示如何使用开源 Web 服务器 Nginx 设置文件共享服务器步骤 0 —

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Golang使用minio替代文件系统的实战教程

《Golang使用minio替代文件系统的实战教程》本文讨论项目开发中直接文件系统的限制或不足,接着介绍Minio对象存储的优势,同时给出Golang的实际示例代码,包括初始化客户端、读取minio对... 目录文件系统 vs Minio文件系统不足:对象存储:miniogolang连接Minio配置Min

使用Python绘制可爱的招财猫

《使用Python绘制可爱的招财猫》招财猫,也被称为“幸运猫”,是一种象征财富和好运的吉祥物,经常出现在亚洲文化的商店、餐厅和家庭中,今天,我将带你用Python和matplotlib库从零开始绘制一... 目录1. 为什么选择用 python 绘制?2. 绘图的基本概念3. 实现代码解析3.1 设置绘图画