source - kafka 高级API和低级API

2024-04-06 18:58
文章标签 api 高级 kafka source 低级

本文主要是介绍source - kafka 高级API和低级API,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

高级 API  produce

 
package com.sinoiov.kafka.test;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;/*** Created by caoyu on 16/4/21.* By 中交兴路 大数据中心-基础平台部*/
public class Kafka_produce extends Thread{private String topic;private SimpleDateFormat sdf = new SimpleDateFormat("MM-dd hh:mm:ss");public Kafka_produce(String topic){super();this.topic = topic;}@Overridepublic void run() {Producer<String, String> producer = createProducer();long i = 0;while(true){i++;long now = System.currentTimeMillis();KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic,sdf.format(new Date(now))+"_"+i+"");producer.send(message);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}private Producer<String,String> createProducer(){Properties properties = new Properties();properties.put("metadata.broker.list","192.168.110.81:9092,192.168.110.82:9092,192.168.110.83:9092");properties.put("serializer.class", StringEncoder.class.getName());properties.put("zookeeper.connect", "nnn1:2181,nnn2:2181,nslave1:2181");return new Producer<String, String>(new ProducerConfig(properties));}
}

consumer

 
package com.sinoiov.kafka.test;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;/*** Created by caoyu on 16/4/21.* By 中交兴路 大数据中心-基础平台部*/
public class Kafka_consumer extends Thread {private String topic;private ConsumerConnector consumer;public Kafka_consumer(String topic){super();this.topic = topic;consumer = createConsumer();}public void shutDown(){if(consumer != null)consumer.shutdown();}@Overridepublic void run() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, 1);Map<String, List<KafkaStream<byte[], byte[]>>> messageSteam = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> steam = messageSteam.get(topic).get(0);ConsumerIterator<byte[], byte[]> iterator = steam.iterator();while(iterator.hasNext()){String message = new String(iterator.next().message());System.out.println(message);}}private ConsumerConnector createConsumer(){Properties properties = new Properties();properties.put("zookeeper.connect","nnn1:2181,nnn2:2181,nslave1:2181");properties.put("group.id", "testsecond");return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));}
}

低级 API 示例代码

 
package com.sinoiov.kafka.test;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;/*** Created by caoyu on 16/4/26.* By 中交兴路 大数据中心-基础平台部*/
public class SimpleExample {private List<String> m_replicaBrokers = new ArrayList<String>();public SimpleExample() {m_replicaBrokers = new ArrayList<String>();}public static void main(String args[]) {SimpleExample example = new SimpleExample();// 最大读取消息数量long maxReads = Long.parseLong("3");// 要订阅的topicString topic = "test1";// 要查找的分区int partition = Integer.parseInt("0");// broker节点的ipList<String> seeds = new ArrayList<String>();seeds.add("192.168.110.81");seeds.add("192.168.110.82");seeds.add("192.168.110.83");// 端口int port = Integer.parseInt("9092");try {example.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println("Oops:" + e);e.printStackTrace();}}public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {// 获取指定Topic partition的元数据PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);}FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5)break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for// the last element to resetreadOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}numErrors = 0;long numRead = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null)consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}/*** @param a_oldLeader* @param a_topic* @param a_partition* @param a_port* @return String* @throws Exception*             找一个leader broker*/private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give// ZooKeeper a second to recover// second time, assume the broker did recover before failover,// or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData = null;loop: for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);} finally {if (consumer != null)consumer.close();}}if (returnMetaData != null) {m_replicaBrokers.clear();for (BrokerEndPoint replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}}return returnMetaData;}
}

 

这篇关于source - kafka 高级API和低级API的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/880583

相关文章

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

【LabVIEW学习篇 - 21】:DLL与API的调用

文章目录 DLL与API调用DLLAPIDLL的调用 DLL与API调用 LabVIEW虽然已经足够强大,但不同的语言在不同领域都有着自己的优势,为了强强联合,LabVIEW提供了强大的外部程序接口能力,包括DLL、CIN(C语言接口)、ActiveX、.NET、MATLAB等等。通过DLL可以使用户很方便地调用C、C++、C#、VB等编程语言写的程序以及windows自带的大

如何更优雅地对接第三方API

如何更优雅地对接第三方API 本文所有示例完整代码地址:https://github.com/yu-linfeng/BlogRepositories/tree/master/repositories/third 我们在日常开发过程中,有不少场景会对接第三方的API,例如第三方账号登录,第三方服务等等。第三方服务会提供API或者SDK,我依稀记得早些年Maven还没那么广泛使用,通常要对接第三方

Java基础回顾系列-第七天-高级编程之IO

Java基础回顾系列-第七天-高级编程之IO 文件操作字节流与字符流OutputStream字节输出流FileOutputStream InputStream字节输入流FileInputStream Writer字符输出流FileWriter Reader字符输入流字节流与字符流的区别转换流InputStreamReaderOutputStreamWriter 文件复制 字符编码内存操作流(

Java基础回顾系列-第五天-高级编程之API类库

Java基础回顾系列-第五天-高级编程之API类库 Java基础类库StringBufferStringBuilderStringCharSequence接口AutoCloseable接口RuntimeSystemCleaner对象克隆 数字操作类Math数学计算类Random随机数生成类BigInteger/BigDecimal大数字操作类 日期操作类DateSimpleDateForma

Restful API 原理以及实现

先说说API 再说啥是RESRFUL API之前,咱先说说啥是API吧。API大家应该都知道吧,简称接口嘛。随着现在移动互联网的火爆,手机软件,也就是APP几乎快爆棚了。几乎任何一个网站或者应用都会出一款iOS或者Android APP,相比网页版的体验,APP确实各方面性能要好很多。 那么现在问题来了。比如QQ空间网站,如果我想获取一个用户发的说说列表。 QQ空间网站里面需要这个功能。

京东物流查询|开发者调用API接口实现

快递聚合查询的优势 1、高效整合多种快递信息。2、实时动态更新。3、自动化管理流程。 聚合国内外1500家快递公司的物流信息查询服务,使用API接口查询京东物流的便捷步骤,首先选择专业的数据平台的快递API接口:物流快递查询API接口-单号查询API - 探数数据 以下示例是参考的示例代码: import requestsurl = "http://api.tanshuapi.com/a

Mysql高级篇(中)——索引介绍

Mysql高级篇(中)——索引介绍 一、索引本质二、索引优缺点三、索引分类(1)按数据结构分类(2)按功能分类(3) 按存储引擎分类(4) 按存储方式分类(5) 按使用方式分类 四、 索引基本语法(1)创建索引(2)查看索引(3)删除索引(4)ALTER 关键字创建/删除索引 五、适合创建索引的情况思考题 六、不适合创建索引的情况 一、索引本质 索引本质 是 一种数据结构,它用

WordPress开发中常用的工具或api文档

http://php.net/ http://httpd.apache.org/ https://wordpress.org/ https://cn.wordpress.org/ https://core.svn.wordpress.org/ zh-cn:开发者文档: https://codex.wordpress.org/zh-cn:%E5%BC%80%E5%8F%91%E8%80%