本文主要是介绍Spark Streaming(二十五)初始化StreamingContext、初识DStream,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
初始化StreamingContext
初始化一个SparkStreaming
程序,必须创建StreamingContext
对象,因为它是SparkStreaming
处理流式数据的入口。
def main(args: Array[String]): Unit = {//初始化SparkConfval conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")/*** 初始化StreamingContext,并设置2秒一次批处理* appName就是展示在SparkUI上应用的名称* master:就是Spark、Mesos、Yarn cluster Url,或者指定为"local[*]"运行在本地、实际应用程序运行在集群上,* 我们不应该将master硬编码在程序中,但是作为本地测试,你可以用"local[*]"这种方式,如果提交到集群上* 不要用这种方式,要按照实际的环境有外部传入该参数。*/val streaming = new StreamingContext(conf, Seconds(2))
一个StreamingContext
对象也可以由已经存在的SparkContext
进行创建
def main(args: Array[String]): Unit = {//创建SparkConfval conf = new SparkConf().setAppName("").setMaster("local[2]")//创建SparkContextval sc = new SparkContext(conf)//由已经存在的SparkContext创建StreamingContextval streaming = new StreamingContext(sc, Seconds(2))}
SparkStreaming编码开发流程
当StreamingContext
创建完成以后,那么我们就会进行一下步骤,开始应用程序的开发
- 定义一个输入源来创建
DStream
- 定义
DStream
的转换操作和输出操作 - 开始等待数据的输入和处理
streamingContext.start()
- 等待正在处理的程序停止
streamingContext.awaitTerminathion()
- 通过
stremingContext.stop()
手动停止处理程序
SparkStreaming开发过程要注意的事项
- 当一个
StreamingContext
已经启动了,就不能添加或者设置新的流式计算。也就是在streamingContext.start()
的代码后边就不能再利用streamingContext
创建新的流式计算。 StreamingContext
停止后,就会无法启动。也就是说在streamingContext.stop()
后边在此执行streamingContext.start()
是无效的。- 在虚拟机中只能同时激活一个
StreamingContext
StreamingContext
的stop
方法,也会停止SparkContext
,如果执行停止StreamingContext
,只需要在stop
方法内指定是否终止SparkContext
,默认是true
,需要指定为false
。streamingContext.stop(false)
- 只要在创建下一个
StreamingContext
的时候停止前一个StreamingContext
(不停止SparkContext
),就可以重复利用SparkContext
创建多个StreamingContext
离散流(DStream)
DStream(Discretized Stream)
是SparkStreaming
提供的一种抽象。它是一种连续的数据流,它可以使从接收到的输入数据流 ,也可以是通过转换输入流后得到的数据流。数据流的内部就是一系列的RDD
。DStream
中每个RDD
都是特定时间间隔内的数据。操作数据流最终都会转换最底层的RDD上的操作。如图所示
这篇关于Spark Streaming(二十五)初始化StreamingContext、初识DStream的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!