kafka Consumer high-level api 之白名单

2024-04-05 07:38

本文主要是介绍kafka Consumer high-level api 之白名单,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka提供了两套API给Consumer

  1. The high-level Consumer API
  2. The SimpleConsumer API     

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么下面来介绍下第一种API:

使用白名单可以适配多个topic的情况。

示例代码:

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.mysite.constant.Constants;
import com.mysite.util.PropertiesUtil;
import com.mysite.util.Utils;public class KafkaConsumer {private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);private ConsumerIterator<byte[], byte[]> iterator = null;private static ConsumerConfig consumerConfig;private ConsumerConnector connector = null;private List<KafkaStream<byte[], byte[]>> partitions = null;private Whitelist whitelist = null;private int threads = 0;private String[] topics;private String type;private String topic = null;private String message = null;private MessageAndMetadata<byte[], byte[]> next = null;public KafkaConsumer(Properties props) {String topicStr = props.getProperty("topics");if(topicStr==null||topicStr.trim().length()<=0){throw new NullPointerException("请正确填写TOPIC.");}		threads = Integer.parseInt(props.getProperty("threads", "1").trim());consumerConfig = createConsumerConfig(props);// topic的过滤器whitelist = new Whitelist("(" + topicStr + ")");init();}/*** 初始化参数* * @param props* @return*/private static ConsumerConfig createConsumerConfig(Properties props) {logger.info("---init kafka config...");props.put("zookeeper.session.timeout.ms", "30000");props.put("zookeeper.sync.time.ms", "6000");props.put("auto.commit.enable", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "largest");return new ConsumerConfig(props);}private void init() {connector = Consumer.createJavaConsumerConnector(consumerConfig);partitions = connector.createMessageStreamsByFilter(whitelist,threads);if (CollectionUtils.isEmpty(partitions)) {logger.info("empty!");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}logger.info("---connect kafka success!");try{for (KafkaStream<byte[], byte[]> partition : partitions) {iterator = partition.iterator();while (iterator.hasNext()) {next = iterator.next();try {message = new String(next.message(), Constants.UTF8);} catch (UnsupportedEncodingException e) {e.printStackTrace();}logger.info(Thread.currentThread()+",partition:"+partition+",offset:" + next.offset() + ",message:" + message);}}}catch (Exception e) {logger.error("run time error:{}",e);close();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e1) {e1.printStackTrace();}init();}}/*** 销毁资源 未使用* */private void close() {logger.info("close resource...");if (partitions != null)partitions.clear();partitions = null;if (iterator != null)iterator.clearCurrentChunk();iterator = null;if (connector != null)connector.shutdown();connector = null;}/*** 主方法入口* * @param args*/public static void main(String[] args) {FileInputStream fis = null;Properties props = new Properties();Properties kafkaProps = null;Properties syslogProps = null;try {String encode = System.getProperty(Constants.ENCODE, Constants.UTF8).trim();logger.info("encode:{}", encode);String path = System.getProperty(Constants.CONFIG_PATH);logger.info("path:{}", path);if(path==null||path.trim().length()<=0){throw new NullPointerException("请正确填写配置文件路径.");}fis = new FileInputStream(path);props.load(new InputStreamReader(fis, encode));kafkaProps = PropertiesUtil.getProperties(Constants.KAFKA_PREFIX, props);logger.info("kafkaProps:{}", kafkaProps);new KafkaConsumer(kafkaProps);} catch (Exception e) {logger.error("----Runtime error:", e);} finally {if (fis != null) {try {fis.close();} catch (IOException e) {e.printStackTrace();}}if (props != null)props.clear();if (kafkaProps != null)kafkaProps.clear();}}
}


使用到的配置:

zookeeper.connect=192.168.0.25:2181,192.168.0.26:2181
group.id=groupId1
topics=topic1,topic2
threads=2



这篇关于kafka Consumer high-level api 之白名单的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/878007

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

一分钟带你上手Python调用DeepSeek的API

《一分钟带你上手Python调用DeepSeek的API》最近DeepSeek非常火,作为一枚对前言技术非常关注的程序员来说,自然都想对接DeepSeek的API来体验一把,下面小编就来为大家介绍一下... 目录前言免费体验API-Key申请首次调用API基本概念最小单元推理模型智能体自定义界面总结前言最

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

5分钟获取deepseek api并搭建简易问答应用

《5分钟获取deepseekapi并搭建简易问答应用》本文主要介绍了5分钟获取deepseekapi并搭建简易问答应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1、获取api2、获取base_url和chat_model3、配置模型参数方法一:终端中临时将加

使用DeepSeek API 结合VSCode提升开发效率

《使用DeepSeekAPI结合VSCode提升开发效率》:本文主要介绍DeepSeekAPI与VisualStudioCode(VSCode)结合使用,以提升软件开发效率,具有一定的参考价值... 目录引言准备工作安装必要的 VSCode 扩展配置 DeepSeek API1. 创建 API 请求文件2.

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf