本文主要是介绍spark之DataFrame,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、DataFrame的优点
DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。
2、DataFrame的生成方式
重要环境变量
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jsy/spark_test/mysql/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar,/home/jsy/spark_test/mysql/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar pyspark-shell'
sc = SparkContext( 'local', 'test')from pyspark.sql import SparkSession
import pyspark
spark=SparkSession(sc)
2.1、json文件中读取数据并生成DataFrame并显示数据
df=spark.read.json("file:///home/jsy/spark_test/people.json")
df.show()
2.2、RDD转化为DataFrame
2.2.1、利用反射转化RDD
利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;
from pyspark.sql.types import Row
def f(x):res={}res['name']=x[0]res['age']=x[1]return res
rdd=sc.textFile('file:///home/jsy/spark_test/people.txt')
rdd.take(1)df=rdd.map(lambda x:x.split(',')).map(lambda x:Row(**f(x))).toDF()
df.show()
2.2.2、编程接口
第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType#生成rdd
rdd=sc.textFile('file:///home/jsy/spark_test/people.txt')#创建模式schema字符串
schemaString="name age"#生成schema模式
fields=list(map(lambda x:StructField(x,StringType(),nullable=True),schemaString.split(' ')))
print(fields)
#从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
schema = StructType(fields)
print(schema)
rowRdd=rdd.map(lambda x:x.split(',')).map(lambda x:Row(x[0],x[1]))
print(type(rowRdd))df=spark.createDataFrame(rowRdd,schema)
df.show()
df.createOrReplaceTempView("people")
2.3、JDBC连接方式生成DataFrame
Spark支持通过JDBC方式连接到其他数据库获取数据生成DataFrame。
配置mysql驱动可参考https://blog.csdn.net/u013069552/article/details/108903622
生成DataFrame代码如下
jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://10.26.1,1:10649/test").option("driver","com.mysql.jdbc.Driver").option("dbtable", "test").option("user", "root").option("password", "123456").load()
jdbcDF.show()
2.4、hive读取生成DataFrame
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os
sc = SparkContext( 'local', '1111')
from pyspark.sql import HiveContextsql='select * from DB_test.TABLE_test'
data = HiveContext(sc).sql(sql).repartition(500).rdd
df=HiveContext(sc).sql(sql)
data.take(10)
df.show()
3、DataFrame的常见操作
3.1、打印模式信息
#打印模式信息,输出DataFrame的头信息及相关类型
df.printSchema()
3.2、选择多列
#选择多列
df.select(df.name,df.age).show()
3.3、条件过滤
#条件过滤
df.filter(df.age>20).show()
3.4、分组聚合
#分组聚合
df.groupBy('age').count().show()
3.5、排序
#排序
df.sort(df.age.desc()).show()
3.6、多列排序
#多列排序
df.sort(df.age.desc(),df.name.asc()).show()
3.7、对列重命名
#对列重命名
df.select(df.name.alias("Name"),df.age).show()
3.8、存储
#存储
df.rdd.saveAsTextFile("file:///home/jsy/spark_test/df.txt")
df.select("name","age").write.format("csv").save("file:///home/jsy/spark_test/df_csv.csv")
参考文献
https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#pyspark.sql.SparkSession
http://dblab.xmu.edu.cn/blog/1709-2/
这篇关于spark之DataFrame的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!