本文主要是介绍Spark Streaming(四)—— Spark Streaming输出,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。目前,定义了下面几种输出操作:
使用Spark SQL来查询Spark Streaming处理的数据:
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSessionobject MyNetwordWordCountWithSQL {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setMaster("local[2]").setAppName("MyNetwordWordCountWithSQL")val ssc = new StreamingContext(conf,Seconds(5))val lines = ssc.socketTextStream("192.168.15.131",1234,StorageLevel.MEMORY_ONLY)val words = lines.flatMap(_.split(" "))//集成Spark SQL 使用SQL语句进行WordCountwords.foreachRDD( rdd => {// 以前创建spark:// val spark = SparkSession.builder().master("local").appName("SparkSQLDemo3").getOrCreate() // 现在将master和appName封装进conf// 使用单例模式,创建SparkSession对象 // 必须使用当前的StreamingContext对应的SparkContext创建一个SparkSession。此外,必须这样做的另一个原因是使得应用可以在driver程序故障时得以重新启动,这是通过创建一个可以延迟实例化的单例SparkSession来实现的。val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()import spark.implicits._// 将RDD[String]转换成DataFrameval df1 = rdd.toDF("word")df1.createOrReplaceTempView("words") spark.sql("select word , count(1) from words group by word").show()})ssc.start()ssc.awaitTermination()}
}
问题:
报错:Connection对象不是一个可被序列化的对象,不能RDD的每个Worker上运行;即:Connection不能在RDD分布式环境中的每个分区上运行,因为不同的分区可能运行在不同的Worker上。所以需要在每个RDD分区上单独创建Connection对象。 应该改为:
这篇关于Spark Streaming(四)—— Spark Streaming输出的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!