本文主要是介绍大数据-SparkStreaming(五),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
大数据-SparkStreaming(五)
SparkStreaming和SparkSQL整合
pom.xml里面添加
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.3.3</version>
</dependency>
代码开发
package com.kaikeba.streamingimport org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/*** sparkStreaming整合sparksql*/
object SocketWordCountForeachRDDDataFrame {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)// todo: 1、创建SparkConf对象val sparkConf: SparkConf = new SparkConf().setAppName("NetworkWordCountForeachRDDDataFrame").setMaster("local[2]")// todo: 2、创建StreamingContext对象val ssc = new StreamingContext(sparkConf,Seconds(2))//todo: 3、接受socket数据val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//todo: 4、对数据进行处理val words: DStream[String] = socketTextStream.flatMap(_.split(" "))//todo: 5、对DStream进行处理,将RDD转换成DataFramewords.foreachRDD(rdd=>{//获取得到sparkSessin,由于将RDD转换成DataFrame需要用到SparkSession对象val sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()import sparkSession.implicits._val dataFrame: DataFrame = rdd.toDF("word")//将dataFrame注册成表dataFrame.createOrReplaceTempView("words")//统计每个单词出现的次数val result: DataFrame = sparkSession.sql("select word,count(*) as count from words group by word")//展示结果result.show()})//todo: 6、开启流式计算ssc.start()ssc.awaitTermination()}
}
SparkStreaming容错
- SparkStreaming运行流程回顾
- Executor失败
Tasks和Receiver自动的重启,不需要做任何的配置。
- Driver失败
用checkpoint机制恢复失败的Driver
定期的将Driver信息写入到HDFS中。
步骤一:设置自动重启Driver程序
Standalone:
在spark-submit中增加以下两个参数:
--deploy-mode cluster
--supervise #失败后是否重启Driver
使用示例:
spark-submit \
--master spark://node01:7077 \
--deploy-mode cluster \
--supervise \
--class com.kaikeba.streaming.Demo \
--executor-memory 1g \
--total-executor-cores 2 \
original-sparkStreamingStudy-1.0-SNAPSHOT.jar
Yarn:
在spark-submit中增加以下参数:
--deploy-mode cluster
在yarn配置中设置yarn.resourcemanager.am.max-attemps参数 ,默认为2,例如:
<property><name>yarn.resourcemanager.am.max-attempts</name><value>4</value><description>The maximum number of application master execution attempts.</description>
</property>
使用示例:
spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.kaikeba.streaming.Demo \
--executor-memory 1g \
--total-executor-cores 2 \
original-sparkStreamingStudy-1.0-SNAPSHOT.jar
步骤二:设置HDFS的checkpoint目录
streamingContext.setCheckpoint(hdfsDirectory)
步骤三:代码实现
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {val ssc = new StreamingContext(...) // new contextval lines = ssc.socketTextStream(...) // create DStreams...ssc.checkpoint(checkpointDirectory) // set checkpoint directoryssc
}// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...// Start the context
context.start()
context.awaitTermination()
这篇关于大数据-SparkStreaming(五)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!