KafkaConsumer一些概念解释(从官方文档整理而来)

2023-12-13 14:38

本文主要是介绍KafkaConsumer一些概念解释(从官方文档整理而来),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

阅读之前假设您已经对kafka有了一定的了解

详情和API请参考http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


KafkaConsumer类简介

public class KafkaConsumer<K,V>extends Object implements Consumer<K,V> 
(从官方文档copy而来)

KafkaConsumer是kafka服务的一个java版本的客户端。它会自动处理kafka集群中出现的错误,自动适应kafka集群中数据分区的迁移。它还可以以消费群组(consumer groups)的方式同服务器交互,从而实现消息处理负载均衡( load balance consumption)。 
消费者(consumer )跟代理服务器(broker)之间保持着一个TCP连接来获取数据。如果在使用完consumer之后没有调用close()方法来关闭的话,就会导致连接泄露。KafkaConsumer不是线程安全的,切记。(Producer类是线程安全的,通常我们将Producer以单例的模式实现)


偏移量(Offsets )和消费者位置(Consumer Position)

简短解说,偏移量就相当于数据库中的ID,是一个唯一标识符,代表着一条消息在一个主题(topic)的某个分区(partition)中的位置,消费者位置就是某个订阅该主题的消费者在它所使用的那个主题分区中的位置,也就是它目前处理消息的位置。数据库游标知道吧,消息偏移量就相当于数据库中的数据ID,而消费者位置就相当于数据库游标位置。如果消费者不小心挂掉了,再重启还是会从当前消费者位置来读取数据,这就是Consumer Position的作用。


消费者群组(Consumer Groups)和主题订阅(Topic Subscriptions)

kafka使用消费者群组(Consumer Groups)的概念来分割处理消息的工作。一个consumer group中可以有多个消费者,这多个消费者可以在同一台机器上运行,也可以在不同的机器上运行。这样,就有了一定的扩展性和容错功能。 
每个kafka的consumer都可以配置一个群组名(consumer group),并且可以通过subscribe()方法动态设置想要订阅的topic列表。kafka服务器将把每个topic中的每条消息都发送给订阅它的群组,每个群组中只会有一个consumer来处理这条消息。具体的实现机制就是kafka服务器将每个topic的每个partition都分配给订阅它的群组中的一个consumer,从而实现并发处理和负载均衡。简短解说,我们将概念抽象出来,把topic抽象成一包糖,partition相当于一个一个的糖豆,一个consumer group抽象成一堆熊孩子,那一个consumer就是这堆熊孩子中的一个熊孩子了。假设现在这包糖有90个,这堆熊孩子有30个,那每个熊孩子能飞到3个糖果。假设这包糖有89个,熊孩子还是30个,你该说分不匀了,这就不是你管的事了,kafka服务器会去决定哪个熊孩子倒霉,少分一个。这是在一对一的情况下,将这个概念延伸开来,在多对一,一对多,多对多的情况下也是适用的。你只需要将每包糖果看成彼此独立,每堆熊孩子彼此独立,每包糖果对每堆熊孩子之间都相互独立,互不影响应该就能理解。


kafka服务器检测consumer的失效(Detecting Consumer Failures)

当一个consumer订阅一个topic,在该consumer调用poll(long)方法后就会自动加入它所属于的那个群组。poll(long)方法在设计上可以维持该consumer的活性,只要该consumer持续调用poll(方法)方法。表象之下,poll(long)方法在每次被调用的时候都会向kafka服务器发送一个心跳(heartbeat)来告诉kafka服务器自己依然健在。如果你停止调用poll(long)方法(可能是因为异常导致程序挂掉了),那consumer就不会再向kafka服务器发送heartbeat,然后过一段时间,服务器就会认为该consumer挂掉了,然后就会被踢出consumer所属的群组,然后本来被分配给该consumer的partition就会被重新分配就其他的consumer(就相当于是某个熊孩子不小心把自己玩死了,然后他的糖果就会被拿回去,重新分给其他的熊孩子)。这样设计是为了防止某些consumer挂掉之后依旧握着partition不松手,导致某些消息无法被其他健在的consumer处理的情况发生。 
在单线程情况下,这种设计就要求consumer处理接受到的消息的时间要小于调用poll(long)的周期,从而保证heartbeat的正常发送,从而让服务器知道自己依然健在。这里有一个session timeout的概念,session timeout就是consumer发送两次有效heartbeat的最长时间间隔,严格来说就是在不超过多长的时间内,你让服务器接收到heartbeat,从而确定你的consumer的活性。如果你接收到了消息,然后用来处理这些消息的时间过长,从而导致无法调用poll(long)而无法发送heartbeat,那在session timeout之后,服务器就会认为你的consumer挂掉了。如果服务器认为你的consumer挂掉了,那你consumer相应的partition就会被kafka服务器的负载均衡机制给均衡掉,重新分配给其他的consumer。 
KafkaConsumer类有两项配置可以控制这种行为: 
1、session.timeout.ms:从名称就可以看出来,就是heartbeat超时时间,增加该设置值可以给consumer更多处理poll(long)返回的消息的时间。唯一的缺点是如果你的consumer不小心玩脱挂掉了,服务器可能不能及时检测到,这就会导致服务器不能及时根据你consumer的情况进行负载均衡。如果你调用close()方法来告知服务器你的consumer要退出了,服务器会及时进行负载均衡,这种情况不受该设置的影响。 
2、max.poll.records:意思也显而易见,就是每次调用poll(long)方法的时候,最多返回多少条消息记录。消息处理时间通常跟要处理的消息记录的条数是成比例的,所以通常人们希望在每次调用poll(long)的返回条数上做限制。默认情况下,该设置的值为无限制(no limit)。


使用示例

自动提交位置
     /*从官方文档copy而来*/Properties props = new Properties();/*配置broker*/props.put("bootstrap.servers", "localhost:9092");/*配置group id*/props.put("group.id", "test");/*配置自动提交位置*/props.put("enable.auto.commit", "true");/*配置自动提交的时间,以毫秒为单位*/props.put("auto.commit.interval.ms", "1000");/*配置session timeout时间,以毫秒为单位*/props.put("session.timeout.ms", "30000");/*这两个deserializer一般不要动,直接拿来用就行了*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*创建consumer*/KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);/*配置consumer订阅的主题,这里用 foo 和 bar 做为例子*/consumer.subscribe(Arrays.asList("foo", "bar"));/*一般我们在一个死循环里调用poll(long)和处理消息*/while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
手动提交位置
     /*从官方文档copy而来*/Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");/*这里关掉自动提交*/props.put("enable.auto.commit", "false");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));/*配置一个限制,当消息数量达到这个限制,我们处理消息*/final int minBatchSize = 200;/*消息缓存链表*/List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {/*调用poll(long)来获取消息数据*/ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {/*达到限制,开始处理消息*/insertIntoDb(buffer);/*处理消息后,用同步方法提交consumer position,也就是消费者位置*/consumer.commitSync();buffer.clear();}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

上面的例子是在所有的消息都成功处理完之后一次性提交所有consumer所关联的分区位置,我们还可以更进一步,更细化的控制位置提交的时机,比如我们可以一个分区一个分区的来处理消息,然后每处理完一个分区的消息,我们就提交一下consumer在当前分区的位置。代码如下:

/*从官方文档copy而来*/try {while(running) {/*获取消息数据*/ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);/*遍历消息数据中有关的分区*/for (TopicPartition partition : records.partitions()) {/*取出当前消息分区中的消息*/List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);/*处理消息*/for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}/*计算当前位置*/long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();/*用同步方法提交当前位置*/consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {/*最后别忘了关闭consumer*/consumer.close();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

手动控制分区分配

在之前的例子中,我们只是订阅了我们感兴趣的topic,然后kafka服务器会自动为我们的consumer分配topic下的partition。然而某些情况下我们可能需要手动配置我们的consumer所要使用的partition,例如我们想要重新获取以前已经使用过的消息,我们就需要手动来配置partition。 
如果想要手动配置分区,就不能再调用subscribe()方法,需要调用assign(Collection)来配置,Collection表示所有想要配置的分区的集合。示例代码如下:

     /*代码是从官方文档copy而来*//*想要订阅的topic*/String topic = "foo";/*想要配置的分区们*/TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1);/*配置分区*/consumer.assign(Arrays.asList(partition0, partition1));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

配置完分区之后,具体的使用就跟前面介绍过的一样了。如果想要更换分区,只需重新调用assign()方法就行了。手动配置的分区是没有consumer group的自动负载均衡功能的,所以如果你的consumer挂掉了,并不会引起群组的负载均衡,也就没有其他的consumer自动接管你的consumer的作用,那么消息就不能被该群组处理了。同时,如果一个群组中有多个consumer分配了同一个topic下的同一个分区,那么可能会导致consumer position的commit问题,可能一个consumer提交了一个靠前的位置,而两一个consumer随后提交了一个靠后的位置,从而导致消息重复。为了避免这种冲突,你应该确保使用手动分配partition的群组只有一个consumer,同时这个consumer要分配它所订阅的topic下的所有partition来接受所有的消息。这样,consumer就可以安全的读取任意partition的任意位置的消息了。

注意:

kafka不支持将手动分区分配和自动动态分区分配混合使用,也就是说如果你的群组中有一个consumer是手动分配,则其他的都会成为手动分配,所以建议手动分配的consumer group只配置一个consumer。


控制consumer分配的partition

假设你使用了手动配置分区,且你的consumer group中只有一个consumer,这时你就可以调用seek(TopicPartition,long)方法来读取任意位置的消息了。

转载自  https://blog.csdn.net/lianjunzongsiling/article/details/52622864

这篇关于KafkaConsumer一些概念解释(从官方文档整理而来)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/488829

相关文章

SpringBoot3集成swagger文档的使用方法

《SpringBoot3集成swagger文档的使用方法》本文介绍了Swagger的诞生背景、主要功能以及如何在SpringBoot3中集成Swagger文档,Swagger可以帮助自动生成API文档... 目录一、前言1. API 文档自动生成2. 交互式 API 测试3. API 设计和开发协作二、使用

基于C#实现将图片转换为PDF文档

《基于C#实现将图片转换为PDF文档》将图片(JPG、PNG)转换为PDF文件可以帮助我们更好地保存和分享图片,所以本文将介绍如何使用C#将JPG/PNG图片转换为PDF文档,需要的可以参考下... 目录介绍C# 将单张图片转换为PDF文档C# 将多张图片转换到一个PDF文档介绍将图片(JPG、PNG)转

wolfSSL参数设置或配置项解释

1. wolfCrypt Only 解释:wolfCrypt是一个开源的、轻量级的、可移植的加密库,支持多种加密算法和协议。选择“wolfCrypt Only”意味着系统或应用将仅使用wolfCrypt库进行加密操作,而不依赖其他加密库。 2. DTLS Support 解释:DTLS(Datagram Transport Layer Security)是一种基于UDP的安全协议,提供类似于

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

数论入门整理(updating)

一、gcd lcm 基础中的基础,一般用来处理计算第一步什么的,分数化简之类。 LL gcd(LL a, LL b) { return b ? gcd(b, a % b) : a; } <pre name="code" class="cpp">LL lcm(LL a, LL b){LL c = gcd(a, b);return a / c * b;} 例题:

计算机毕业设计 大学志愿填报系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不到哟~Java毕业设计项目~热门选题推荐《1000套》 目录 1.技术选型 2.开发工具 3.功能

【VUE】跨域问题的概念,以及解决方法。

目录 1.跨域概念 2.解决方法 2.1 配置网络请求代理 2.2 使用@CrossOrigin 注解 2.3 通过配置文件实现跨域 2.4 添加 CorsWebFilter 来解决跨域问题 1.跨域概念 跨域问题是由于浏览器实施了同源策略,该策略要求请求的域名、协议和端口必须与提供资源的服务相同。如果不相同,则需要服务器显式地允许这种跨域请求。一般在springbo

【MRI基础】TR 和 TE 时间概念

重复时间 (TR) 磁共振成像 (MRI) 中的 TR(重复时间,repetition time)是施加于同一切片的连续脉冲序列之间的时间间隔。具体而言,TR 是施加一个 RF(射频)脉冲与施加下一个 RF 脉冲之间的持续时间。TR 以毫秒 (ms) 为单位,主要控制后续脉冲之前的纵向弛豫程度(T1 弛豫),使其成为显著影响 MRI 中的图像对比度和信号特性的重要参数。 回声时间 (TE)

rtmp流媒体编程相关整理2013(crtmpserver,rtmpdump,x264,faac)

转自:http://blog.163.com/zhujiatc@126/blog/static/1834638201392335213119/ 相关资料在线版(不定时更新,其实也不会很多,也许一两个月也不会改) http://www.zhujiatc.esy.es/crtmpserver/index.htm 去年在这进行rtmp相关整理,其实内容早有了,只是整理一下看着方

笔记整理—内核!启动!—kernel部分(2)从汇编阶段到start_kernel

kernel起始与ENTRY(stext),和uboot一样,都是从汇编阶段开始的,因为对于kernel而言,还没进行栈的维护,所以无法使用c语言。_HEAD定义了后面代码属于段名为.head .text的段。         内核起始部分代码被解压代码调用,前面关于uboot的文章中有提到过(eg:zImage)。uboot启动是无条件的,只要代码的位置对,上电就工作,kern