本文主要是介绍storm消费kafka数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
http://blog.csdn.net/tonylee0329/article/details/43016385
使用storm-kafka模块读取kafka中的数据,按照以下两步进行构建(我使用的版本是0.9.3)
1. 使用BrokerHosts接口来配置kafka broker host与partition的mapping信息;
2. 使用KafkaConfig来配置一些与kafka自身相关的选项,如fetchSizeBytes、socketTimeoutMs
下面分别介绍这两块的实现:
对于配置1,目前支持两种实现方式:zk配置、静态ip端口方式
第一种方式:Zk读取(比较常见)
[html] view plain copy
ZkHosts支持两种创建方式,
public ZkHosts(String brokerZkStr, String brokerZkPath)
//使用默认brokerZkPath:”/brokers”
public ZkHosts(String brokerZkStr)
通过这种方式访问的时候,经过60s会刷新一下host->partition的mapping
第二步:构建KafkaConfig对象
目前提供两种构造函数,
[html] view plain copy
public KafkaConfig(BrokerHosts hosts, String topic)
//clientId如果不想每次随机生成的话,就自己设置一个
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
代码参考:
[html] view plain copy
//这个地方其实就是kafka配置文件里边的zookeeper.connect这个参数,可以去那里拿过来。
String brokerZkStr = “10.100.90.201:2181/kafka_online_sample”;
String brokerZkPath = “/brokers”;
ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);
String topic = "mars-wap"; //以下:将offset汇报到哪个zk集群,相应配置
// String offsetZkServers = “10.199.203.169”;
String offsetZkServers = “10.100.90.201”;
String offsetZkPort = “2181”;
List zkServersList = new ArrayList();
zkServersList.add(offsetZkServers);
//汇报offset信息的root路径
String offsetZkRoot = “/stormExample”;
//存储该spout id的消费offset信息,譬如以topoName来命名
String offsetZkId = “storm-example”;
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId); kafkaConfig.zkRoot = offsetZkRoot; kafkaConfig.zkPort = Integer.parseInt(offsetZkPort); kafkaConfig.zkServers = zkServersList; kafkaConfig.id = offsetZkId; kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout spout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", spout, 1); builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology()); // cluster submit.
// try {
// StormSubmitter.submitTopology(“storm-kafka-example”,config,builder.createTopology());
// } catch (AlreadyAliveException e) {
// e.printStackTrace();
// } catch (InvalidTopologyException e) {
// e.printStackTrace();
// }
第二种方式:静态ip端口方式
[html] view plain copy
String kafkaHost = “10.100.90.201”;
Broker brokerForPartition0 = new Broker(kafkaHost);//localhost:9092
Broker brokerForPartition1 = new Broker(kafkaHost, 9092);//localhost:9092 but we specified the port explicitly
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
StaticHosts hosts = new StaticHosts(partitionInfo);
String topic="mars-wap"; String offsetZkRoot ="/stormExample"; String offsetZkId="staticHost"; String offsetZkServers = "10.100.90.201"; String offsetZkPort = "2181"; List<String> zkServersList = new ArrayList<String>(); zkServersList.add(offsetZkServers); SpoutConfig kafkaConfig = new SpoutConfig(hosts,topic,offsetZkRoot,offsetZkId); kafkaConfig.zkPort = Integer.parseInt(offsetZkPort); kafkaConfig.zkServers = zkServersList; kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout spout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", spout, 1); builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology());
完整的使用例子,见github源码
https://github.com/tonylee0329/storm-example/blob/master/src/main/java/org/tony/storm_kafka/common/
参考:
https://github.com/apache/storm/blob/v0.9.3/external/storm-kafka/README.md
https://github.com/tonylee0329/storm-example/blob/master/src/main/java/org/tony/storm_kafka/common/ZkTopology.java
Kafka之Consumer获取消费数据全过程图解
字数198 阅读557 评论0 喜欢1
这篇文章是作为:跟我学Kafka源码之Consumer分析 的补充材料,看过我们之前源码分析的同学可能知道。
本文将从客户端程序如何调用Consumer获取到最终Kafka消息的全过程以图解的方式作一个源码级别的梳理。
废话不多说,请图看
时序图
Business Process Model.jpg
流程图
20140809174809543.png
文章短小的目的是便于大家快速找到内容的核心加以理解,避免文章又臭又长抓不住重点。
对于Kafka技术,如果大家对此有任何疑问,请给我留言,我们可以深入探讨。
清晰的UML时序图在这里可以看:
http://dl2.iteye.com/upload/attachment/0115/5649/70a096f4-c649-3efd-84bb-2379927dee36.jpg
这篇关于storm消费kafka数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!