本文主要是介绍Spark Streaming(二)—— Spark Streaming基本数据源,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
基本数据源
Spark Streaming 是一个流式计算引擎,就需要对接外部数据源来接收数据。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。
基本数据源有:文件系统、套接字连接、Akka的actor等。
1. 文件流(textFileStream)
监控文件系统的变化,如果有文件增加,读取新的内容
① 这些文件具有相同的格式
② 这些文件通过原子移动或重命名文件的方式在dataDirectory创建
③ 如果在文件中追加内容,这些追加的新数据不会被读取。
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevelobject FileStreaming {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//local[2]代表开启两个线程val conf = new SparkConf().setAppName("FileStreaming").setMaster("local[2]")//接收两个参数,第一个conf,第二个是采样时间间隔val ssc = new StreamingContext(conf, Seconds(3))//监控目录 如果文件系统发生变化 就读取进来val lines = ssc.textFileStream("H:\\tmp_files\\test_file_stream")lines.print()ssc.start()ssc.awaitTermination()}
}
2. RDD队列流(queueStream,队列里是RDD)
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import scala.collection.mutable.Queue
import org.apache.spark.rdd.RDDobject RDDQueueStream {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("RDDQueueStream").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(1))//需要一个RDD队列val rddQueue = new Queue[RDD[Int]]()for( i <- 1 to 3){rddQueue += ssc.sparkContext.makeRDD(1 to 10)Thread.sleep(5000)}//从队列中接收数据 创建DStreamval inputDStream = ssc.queueStream(rddQueue)val result = inputDStream.map(x=>(x,x*2))result.print()ssc.start()ssc.awaitTermination()}
}
3. 套接字流(socketTextStream)
val lines = sc.socketTextStream("192.168.15.131",1234)
lines.print()
这篇关于Spark Streaming(二)—— Spark Streaming基本数据源的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!