kafka 消费者进行消费数据的各种场景的API(你值得一看)

2024-05-29 07:48

本文主要是介绍kafka 消费者进行消费数据的各种场景的API(你值得一看),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 kafka消费端的参数

 

二 实现案例

2.1 订阅某个主题

创建一个独立消费者,消费 kafka-ljf 主题中数据。
注意: 在消费者 API 代码中必须配置消费者组 id 。命令行启动消费者不填写消费者组
id 会被自动填写随机的消费者组 id
2.消费者代码
package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerTopicDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 14:02:05* @Version: V1.0**/
public class ConsumerTopicDemo {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");// 创建消费者对象KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<String, String>(properties);// 注册要消费的主题(可以消费多个主题)ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}}

3.执行生产者产生数据

 4.消费数据,观察

2.2 订阅某个主题下的某个分区

需求:创建一个独立消费者,消费 kafka-ljf主题 0 号分区的数据。

2.代码

 

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerPartitionDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 14:55:31* @Version: V1.0**/
public class ConsumerPartitionDemo {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(必须),名字可以任意起properties.put(ConsumerConfig.GROUP_ID_CONFIG,"beijing");KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 消费某个主题的某个分区数据,0号分区ArrayList<TopicPartition> topicPartitions = newArrayList<>();topicPartitions.add(new TopicPartition("kafka-ljf", 0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}

 3.生产者生产数据

 4.消费者消费

可见只消费了0号分区上的数据 

2.3  消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

1.consumer代码复制一份,变为两个消费者

 

2. 消费者2:

 3.生产者:

 

4.查看消费者消费信息

 5.查看消费者2消费信息

 结论:即可看到两个消费者在消费不同 分区的数据。消费者一消费分区1的数据,消费者2消费分区2的数据。

2.4  指定offset消费

auto.offset.reset = earliest | latest | none   其中 默认是 latest
Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
1 earliest :自动将偏移量重置为最早的偏移量,相当于   --from-beginning
(2) latest (默认值) :自动将偏移量重置为最新偏移量。
(3) none :如果未找到消费者组的先前偏移量,则向消费者抛出异常。

 (4)任意指定 offset 位移开始消费

代码:

具体代码:

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;/*** @ClassName: ConsumerSeekDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:08:01* @Version: V1.0**/
public class ConsumerSeekDemo {public static void main(String[] args) {// 0 配置信息Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment= new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}// 遍历所有分区,并指定 offset 从 10 的位置开始消费for (TopicPartition tp: assignment) {kafkaConsumer.seek(tp, 10);}// 3 消费该主题数据while (true) {ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}}

 结果:

可以看到都是从0,1分区中,offset为10的位置开始查询的。 

注意:每次执行完,需要修改消费者组名;

2.4  指定offset设置为earliest

auto.offset.reset = earliest | latest | none   其中默认是 latest。本案例设置为earliest。

注意:每次执行完,需要修改消费者组名;每次执行要起一个不同的消费组的名字

代码

 

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerDefineOffset* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:30:01* @Version: V1.0**/
public class ConsumerDefineOffset {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//设置读取的offset的位置properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");// 配置消费者组(必须),名字可以任意起properties.put(ConsumerConfig.GROUP_ID_CONFIG,"shanghai");//注意:每次执行完,需要修改消费者组名;KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 消费某个主题的某个分区数据,0号分区ArrayList<TopicPartition> topicPartitions = newArrayList<>();topicPartitions.add(new TopicPartition("kafka-ljf", 0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}

3.执行结果

 2.5  指定时间消费数据

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
例如要求按照时间消费前一天的数据,怎么处理?
package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** @ClassName: ConsumerRangeTime* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:53:36* @Version: V1.0**/
public class ConsumerRangeTime {public static void main(String[] args) {Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-time");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}HashMap<TopicPartition, Long> timestampToSearch = newHashMap<>();// 封装集合存储,每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition, System.currentTimeMillis() - 5 * 24 * 3600 * 1000);}// 获取从 1 天前开始消费的每个分区的 offsetMap<TopicPartition, OffsetAndTimestamp> offsets =kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区,对每个分区设置消费时间。for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp =offsets.get(topicPartition);// 根据时间指定开始消费的位置if (offsetAndTimestamp != null){kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}}// 3 消费该主题数据while (true) {ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}}

结果:

 

这篇关于kafka 消费者进行消费数据的各种场景的API(你值得一看)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013063

相关文章

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

使用Jackson进行JSON生成与解析的新手指南

《使用Jackson进行JSON生成与解析的新手指南》这篇文章主要为大家详细介绍了如何使用Jackson进行JSON生成与解析处理,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 核心依赖2. 基础用法2.1 对象转 jsON(序列化)2.2 JSON 转对象(反序列化)3.

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面

Python使用自带的base64库进行base64编码和解码

《Python使用自带的base64库进行base64编码和解码》在Python中,处理数据的编码和解码是数据传输和存储中非常普遍的需求,其中,Base64是一种常用的编码方案,本文我将详细介绍如何使... 目录引言使用python的base64库进行编码和解码编码函数解码函数Base64编码的应用场景注意

Java进行文件格式校验的方案详解

《Java进行文件格式校验的方案详解》这篇文章主要为大家详细介绍了Java中进行文件格式校验的相关方案,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、背景异常现象原因排查用户的无心之过二、解决方案Magandroidic Number判断主流检测库对比Tika的使用区分zip

Java使用Curator进行ZooKeeper操作的详细教程

《Java使用Curator进行ZooKeeper操作的详细教程》ApacheCurator是一个基于ZooKeeper的Java客户端库,它极大地简化了使用ZooKeeper的开发工作,在分布式系统... 目录1、简述2、核心功能2.1 CuratorFramework2.2 Recipes3、示例实践3

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1