本文主要是介绍spark sql rdd转换为datafram,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
描述:
RDD转换为DataFrame
为什么要将RDD转换为DataFrame?因为这样的话,我们就可以直接针对HDFS等任何可以构建为RDD的数据,使用Spark SQL进行SQL查询了。这个功能是无比强大的。想象一下,针对HDFS中的数据,直接就可以使用SQL进行查询。
Spark SQL支持两种方式来将RDD转换为DataFrame。
第一种方式,是使用反射来推断包含了特定数据类型的RDD的元数据。这种基于反射的方式,代码比较简洁,当你已经知道你的RDD的元数据时,是一种非常不错的方式。
第二种方式,是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。
使用反射方式推断元数据
Java版本:Spark SQL是支持将包含了JavaBean的RDD转换为DataFrame的。JavaBean的信息,就定义了元数据。Spark SQL现在是不支持将包含了嵌套JavaBean或者List等复杂数据的JavaBean,作为元数据的。只支持一个包含简单数据类型的field的JavaBean。
package cn.spark.study.sql;import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;/*** 使用反射的方式将RDD转换为DataFrame* @author Administrator**/
public class RDD2DataFrameReflection {public static void main(String[] args) {// 创建普通的RDDSparkConf conf = new SparkConf().setMaster("local") .setAppName("RDD2DataFrameReflection"); JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//students.txt");JavaRDD<Student> students = lines.map(new Function<String, Student>() {private static final long serialVersionUID = 1L;@Overridepublic Student call(String line) throws Exception {String[] lineSplited = line.split(","); Student stu = new Student();stu.setId(Integer.valueOf(lineSplited[0].trim())); stu.setName(lineSplited[1]); stu.setAge(Integer.valueOf(lineSplited[2].trim())); return stu;}});// 使用反射方式,将RDD转换为DataFrame// 将Student.class传入进去,
这篇关于spark sql rdd转换为datafram的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!