本文主要是介绍35_SparkStreaming一,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
SparkStreaming
1 实时任务简介
Spark流是对于Spark核心API的拓展,从而支持对于实时数据流的可拓展,高吞吐量和容错性流处理。数据可以由多个源取得,例如:Kafka,Flume,Twitter,ZeroMQ,Kinesis或者TCP接口,同时可以使用由如map,reduce,join和window这样的高层接口描述的复杂算法进行处理。最终,处理过的数据可以被推送到文件系统,数据库和HDFS。
2 SparkStreaming程序入口
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
3 什么是DStream
离散数据流或者DStream是SS提供的基本抽象。其表现数据的连续流,这个输入数据流可以来自于源,也可以来自于转换输入流产生的已处理数据流。内部而言,一个DStream以一系列连续的RDDs所展现,这些RDD是Spark对于不变的,分布式数据集的抽象。一个DStream中的每个RDD都包含来自一定间隔的数据,如下图:
在DStream上使用的任何操作都会转换为针对底层RDD的操作。例如:之前那个将行的流转变为词流的例子中,flatMap操作应用于行DStream的每个RDD上 从而产生words DStream的RDD。如下图:
4 入门程序演示
pom.xml配置:
<properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.8</scala.version><spark.version>2.2.1</spark.version><hadoop.version>2.7.5</hadoop.version><encoding>UTF-8</encoding></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency></dependencies><build><pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
scala版本
object WordCount {def main(args: Array[String]): Unit = {//步骤一:初始化程序入口val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))//步骤二:获取数据流val lines = ssc.socketTextStream("localhost", 9999)//步骤三:数据处理val words = lines.flatMap(_.split(" "))val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)//步骤四: 数据输出wordCounts.print()//步骤五:启动任务ssc.start()ssc.awaitTermination()ssc.stop()}}
java版本
/*** 单词统计*/
public class WordCount {public static void main(String[] args) throws Exception{//步骤一:初始化程序入口SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));//步骤二:获取数据源JavaReceiverInputDStream<String> lines = jssc.socketTextStream("10.148.15.10", 9999);//步骤三:数据处理JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);//步骤四:数据输出wordCounts.print();//步骤五:启动程序jssc.start();jssc.awaitTermination();jssc.stop();}
}
5 数据源
5.1 Socket数据源
见4.3 入门程序演示
5.2 HDFS数据源
注:如果HDFS使用的高可用模式,那么把集群的core-site.xml,hdfs-site.xml文件导入到项目的resources目录里面。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MamV901O-1578366683388)(assets/1567305900019.png)]
/*** HDFS 数据源*/
object WordCountForHDFSSource {def main(args: Array[String]): Unit = {//步骤一:初始化程序入口val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))//步骤二:获取数据流val lines = ssc.textFileStream("/tmp");//步骤三:数据处理val words = lines.flatMap(_.split(" "))val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)//步骤四: 数据输出wordCounts.print()//步骤五:启动任务ssc.start()ssc.awaitTermination()ssc.stop()}}
5.3 自定义数据源
/*** 自定义一个Receiver,这个Receiver从socket中接收数据* 接收过来的数据解析成以 \n 分隔开的text使用方式:nc -lk 9999*/
object CustomReceiver {def main(args: Array[String]) {// Create the context with a 1 second batch sizeval sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[2]")val sc = new SparkContext(sparkConf)val ssc = new StreamingContext(sc, Seconds(1))// 调用 receiverStream api,将自定义的Receiver传进去val lines = ssc.receiverStream(new CustomReceiver("10.148.15.10", 9999))val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)wordCounts.print()ssc.start()ssc.awaitTermination()ssc.stop(false)}
}class CustomReceiver(host: String, port: Int)extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {def onStart() {// 启动一个线程,开始接收数据new Thread("Socket Receiver") {override def run() { receive() }}.start()}def onStop() {// There is nothing much to do as the thread calling receive()// is designed to stop by itself isStopped() returns false}/** Create a socket connection and receive data until receiver is stopped */private def receive() {var socket: Socket = nullvar userInput: String = nulltry {logInfo("Connecting to " + host + ":" + port)socket = new Socket(host, port)logInfo("Connected to " + host + ":" + port)val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))userInput = reader.readLine()while(!isStopped && userInput != null) {store(userInput)userInput = reader.readLine()}reader.close()socket.close()logInfo("Stopped receiving")restart("Trying to connect again")} catch {case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)case t: Throwable =>restart("Error receiving data", t)}}
}
5.4 Kafka数据源
见第三天
这篇关于35_SparkStreaming一的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!