本文主要是介绍1.8.8 大数据-SparkStreaming-Kafka集成,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
- IDEA客户端MAVEN POM中引入
- Linux JAR包放入 jars目录
- 或者执行jar包时 引入jar包
- 启动kafka传输消息
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com:9092 --topic weblogs
DirectKafka8WordCount
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><hadoop.version>2.5.0</hadoop.version><scala.binary.version>2.11</scala.binary.version><spark.version>2.2.0</spark.version></properties><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency>
package com.spark.kfkimport kafka.serializer.StringDecoder
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._object DirectKafka8WordCount {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("straming").getOrCreate()val sc = spark.sparkContext;sc.setLogLevel("WARN");val ssc = new StreamingContext(sc, Seconds(5))// Create direct kafka stream with brokers and topicsval topicsSet = Set("weblogs")val kafkaParams = Map[String, String]("metadata.broker.list" -> "bigdata-pro01.kfk.com:9092")val KafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet)val lines = KafkaStream.map(x=>x._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x,1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}
}
DirectKafka10WordCount
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version>
</dependency>
package com.spark.kfkimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject DirectKafka10WordCount {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("straming").getOrCreate()val sc = spark.sparkContext;sc.setLogLevel("WARN");val ssc = new StreamingContext(sc, Seconds(5))// Create direct kafka stream with brokers and topics//val topicsSet = Set("weblogs")val kafkaParams = Map[String, Object]("bootstrap.servers" -> "bigdata-pro01.kfk.com:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val topicsArray = Array("weblogs")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topicsArray, kafkaParams))val lines = stream.map(x => x.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x,1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}
}
这篇关于1.8.8 大数据-SparkStreaming-Kafka集成的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!