本文主要是介绍kafka+spark streaming例子入门,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
- 启动Kafka Server:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server/properties
- 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 作为producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
KafkaWordCount
object KafkaWordCount{def main(args:Array[String]){if(args.length<4){System.err.println("Usage:KafkaWordCount <zkQuorum> <group><topics><numThreads>")System.exit(1)} StreamingExamples.setStreamingLogLevels()val Array(zkQuorum,group,topics,numThreads)=argsval sparkconf=new SparkConf().setAppName("KafkaWordCount")val ssc=new StreamingContext(SparkConf,Seconds(2))ssc.checkpoint("checkpoint")val topicMap=topics.split(",").map((_,numThreads,toInt)).toMapval lines=KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)val words=lines.flagMap(_.split(" "))val wordCounts=words.map(x=>(x,1L)).reduceByKeyAndWindow(_+_,_-_,Minutes(20),Seconds(2),2)wordCounts.print()ssc.start()ssc.awaitTermination()}}
运行KafkaWordCountProducer
bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
//localhost:9092是producer的地址及其端口 test是Topic,3表示每秒发多少信息,5表示每条消息中有多少个单词
运行kafkawordcount
bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
//localhost:2181表示zookeeper的监听地址;// test-consumer-group表示consumer-group的名称,必须//和$KAFKA_HOME/config/consumer.properties中的group.id配置内
//test表示topic,1表示线程数
这篇关于kafka+spark streaming例子入门的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!