kafka Consumer high-level api 之白名单

2024-04-05 07:38

本文主要是介绍kafka Consumer high-level api 之白名单,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka提供了两套API给Consumer

  1. The high-level Consumer API
  2. The SimpleConsumer API     

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么下面来介绍下第一种API:

使用白名单可以适配多个topic的情况。

示例代码:

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.mysite.constant.Constants;
import com.mysite.util.PropertiesUtil;
import com.mysite.util.Utils;public class KafkaConsumer {private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);private ConsumerIterator<byte[], byte[]> iterator = null;private static ConsumerConfig consumerConfig;private ConsumerConnector connector = null;private List<KafkaStream<byte[], byte[]>> partitions = null;private Whitelist whitelist = null;private int threads = 0;private String[] topics;private String type;private String topic = null;private String message = null;private MessageAndMetadata<byte[], byte[]> next = null;public KafkaConsumer(Properties props) {String topicStr = props.getProperty("topics");if(topicStr==null||topicStr.trim().length()<=0){throw new NullPointerException("请正确填写TOPIC.");}		threads = Integer.parseInt(props.getProperty("threads", "1").trim());consumerConfig = createConsumerConfig(props);// topic的过滤器whitelist = new Whitelist("(" + topicStr + ")");init();}/*** 初始化参数* * @param props* @return*/private static ConsumerConfig createConsumerConfig(Properties props) {logger.info("---init kafka config...");props.put("zookeeper.session.timeout.ms", "30000");props.put("zookeeper.sync.time.ms", "6000");props.put("auto.commit.enable", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "largest");return new ConsumerConfig(props);}private void init() {connector = Consumer.createJavaConsumerConnector(consumerConfig);partitions = connector.createMessageStreamsByFilter(whitelist,threads);if (CollectionUtils.isEmpty(partitions)) {logger.info("empty!");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}logger.info("---connect kafka success!");try{for (KafkaStream<byte[], byte[]> partition : partitions) {iterator = partition.iterator();while (iterator.hasNext()) {next = iterator.next();try {message = new String(next.message(), Constants.UTF8);} catch (UnsupportedEncodingException e) {e.printStackTrace();}logger.info(Thread.currentThread()+",partition:"+partition+",offset:" + next.offset() + ",message:" + message);}}}catch (Exception e) {logger.error("run time error:{}",e);close();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e1) {e1.printStackTrace();}init();}}/*** 销毁资源 未使用* */private void close() {logger.info("close resource...");if (partitions != null)partitions.clear();partitions = null;if (iterator != null)iterator.clearCurrentChunk();iterator = null;if (connector != null)connector.shutdown();connector = null;}/*** 主方法入口* * @param args*/public static void main(String[] args) {FileInputStream fis = null;Properties props = new Properties();Properties kafkaProps = null;Properties syslogProps = null;try {String encode = System.getProperty(Constants.ENCODE, Constants.UTF8).trim();logger.info("encode:{}", encode);String path = System.getProperty(Constants.CONFIG_PATH);logger.info("path:{}", path);if(path==null||path.trim().length()<=0){throw new NullPointerException("请正确填写配置文件路径.");}fis = new FileInputStream(path);props.load(new InputStreamReader(fis, encode));kafkaProps = PropertiesUtil.getProperties(Constants.KAFKA_PREFIX, props);logger.info("kafkaProps:{}", kafkaProps);new KafkaConsumer(kafkaProps);} catch (Exception e) {logger.error("----Runtime error:", e);} finally {if (fis != null) {try {fis.close();} catch (IOException e) {e.printStackTrace();}}if (props != null)props.clear();if (kafkaProps != null)kafkaProps.clear();}}
}


使用到的配置:

zookeeper.connect=192.168.0.25:2181,192.168.0.26:2181
group.id=groupId1
topics=topic1,topic2
threads=2



这篇关于kafka Consumer high-level api 之白名单的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/878007

相关文章

通过高德api查询所有店铺地址信息

通过高德api查询所有店铺地址电话信息 需求:通过高德api查询所有店铺地址信息需求分析具体实现1、申请高德appkey2、下载types city 字典值3、具体代码调用 需求:通过高德api查询所有店铺地址信息 需求分析 查询现有高德api发现现有接口关键字搜索API服务地址: https://developer.amap.com/api/webservice/gui

DDei在线设计器-API-DDeiSheet

DDeiSheet   DDeiSheet是代表一个页签,一个页签含有一个DDeiStage用于显示图形。   DDeiSheet实例包含了一个页签的所有数据,在获取后可以通过它访问其他内容。DDeiFile中的sheets属性记录了当前文件的页签列表。   一个DDeiFile实例至少包含一个DDeiSheet实例。   本篇最后提供的示例可以在DDei文档直接预览 属性 属性名说明数

API-环境对象

学习目标: 掌握环境对象 学习内容: 环境对象作用 环境对象: 指的是函数内部特殊的变量this,它代表着当前函数运行时所处的环境。 作用: 弄清楚this的指向,可以让我们代码更简洁。 函数的调用方式不同,this指代的对象也不同。【谁调用,this就是谁】是判断this指向的粗略规则。直接调用函数,其实相当于是window.函数,所以this指代window。

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

SDAutoLayout/UIView+SDAutoLayout.h 的基本使用,记住这4个Api就足够了~

1.1 > leftSpaceToView(self.view, 10) 方法名中带有“SpaceToView”的方法表示到某个参照view的间距,需要传递2个参数:(UIView)参照view 和 (CGFloat)间距数值 1.2 > widthRatioToView(self.view, 1) 方法名中带有“RatioToView”的方法表示view的宽度或者高度等属性相对于参

Java compiler level does not match the version of the installed Java project facet. map解决方法

右键项目“Properties”,在弹出的“Properties”窗口左侧,单击“Project Facets”,打开“Project Facets”页面。 在页面中的“Java”下拉列表中,选择相应版本就OK了。

linux常用API接口

linux常用API接口 文章目录 linux常用API接口1.应用层内存映射mmap取消内存映射munmap终端打印可用方式1.puts 函数2.文件操作函数 fprintf3.字符输出函数 putchar4.fwrite 函数 2.内核层 1.应用层 内存映射mmap mmap 是一个用于内存映射的系统调用,它可以将一个文件或设备中的内容映射到进程的地址空间中,允许程

深入探索 Nuxt3 Composables:掌握目录架构与内置API的高效应用

title: 深入探索 Nuxt3 Composables:掌握目录架构与内置API的高效应用 date: 2024/6/23 updated: 2024/6/23 author: cmdragon excerpt: 摘要:“本文深入探讨了Nuxt3 Composables,重点介绍了其目录架构和内置API的高效应用。通过学习本文,读者将能够更好地理解和利用Nuxt3 Composabl

几何内核开发-实现自己的NURBS曲线生成API

我去年有一篇帖子,介绍了NURBS曲线生成与显示的实现代码。 https://blog.csdn.net/stonewu/article/details/133387469?spm=1001.2014.3001.5501文章浏览阅读323次,点赞4次,收藏2次。搞3D几何内核算法研究,必须学习NURBS样条曲线曲面。看《非均匀有理B样条 第2版》这本书,学习起来,事半功倍。在《插件化算法研究平台

使用ig507金融数据库的股票API接口经验有感:Java与Python

一、Java技术: 1. Java调用ig507金融数据库(ig507.com)股票API接口 引言: 随着金融科技的不断发展,数据驱动的投资策略变得越来越重要。本文将介绍如何使用Java语言调用ig507金融数据库的股票API接口,以获取实时股票数据,并展示基本的编程步骤和注意事项。 步骤一:引入依赖库 在Java项目中,首先需要引入用于处理HTTP请求和网络通信的库,如Apache