本文主要是介绍SparkSQL(9)RDD2DataFrame,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、两种方式
【参考官网:http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#programmatically-specifying-the-schema】
- Inferring the Schema Using Reflection(反射方式)
- Programmatically Specifying the Schema(编程的方式)
二、反射方式
1.代码(服务器验证)
//方式一:反射方式val spark=SparkSession.builder().appName("RDD2DataFrameSpark").master("local[2]").getOrCreate() val rdd = spark.sparkContext.textFile("datas/info.txt")//使用DataFrame API// For implicit conversions from RDDs to DataFramesimport spark.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()//后续处理infoDF.filter(infoDF.col("age") > 30).show()//创建一个临时的表名称infoDF.createOrReplaceTempView("infos")spark.sql("select * from infos where age > 30").show()spark.close()
其中Info定义为:
case class Info(id:Int,name:String,age:Int)
2.重点
(1)使用隐式转换toDF()直接将RDD转换为DF,但是前提是需要引入:
import spark.implicits._
(2)后续操作
既可以通过DF的API,也可以通过createDataFrame创建临时表,然后使用sql语句来操作分析。
(3)服务器不用开启master和worker即可工作
三、编程的方式
1.代码(服务器验证)
val spark=SparkSession.builder().appName("RDD2DataFrameSpark").master("local[2]").getOrCreate()//1.转换为RDDval rdd = spark.sparkContext.textFile("datas/info.txt")//2.转换为Row类型的RDDval infoRDD: RDD[Row] = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))//3.构建StructTypeval structType=StructType(Array(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))//4.构建dataframe
// spark.createDataFrame(infoRDD,structType)val infoDF= spark.createDataFrame(infoRDD,structType)infoDF.printSchema()infoDF.show()spark.close()
2.重点
(1)构建ROW类型的RDD,以及StructType。
(2)createDataFrame方法将infoRDD和structType关联起来
这篇关于SparkSQL(9)RDD2DataFrame的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!