spark之DataFrame

2024-08-28 01:38
文章标签 dataframe spark

本文主要是介绍spark之DataFrame,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、DataFrame的优点

DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。

2、DataFrame的生成方式

重要环境变量

from pyspark import SparkContext
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jsy/spark_test/mysql/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar,/home/jsy/spark_test/mysql/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar pyspark-shell'
sc = SparkContext( 'local', 'test')from pyspark.sql import SparkSession
import pyspark
spark=SparkSession(sc)

2.1、json文件中读取数据并生成DataFrame并显示数据

 

df=spark.read.json("file:///home/jsy/spark_test/people.json")
df.show()

2.2、RDD转化为DataFrame

2.2.1、利用反射转化RDD

利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;

from pyspark.sql.types import Row
def f(x):res={}res['name']=x[0]res['age']=x[1]return res
rdd=sc.textFile('file:///home/jsy/spark_test/people.txt')
rdd.take(1)df=rdd.map(lambda x:x.split(',')).map(lambda x:Row(**f(x))).toDF()
df.show()

2.2.2、编程接口

第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。

from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType#生成rdd
rdd=sc.textFile('file:///home/jsy/spark_test/people.txt')#创建模式schema字符串
schemaString="name age"#生成schema模式
fields=list(map(lambda x:StructField(x,StringType(),nullable=True),schemaString.split(' ')))
print(fields)
#从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
schema = StructType(fields)
print(schema)
rowRdd=rdd.map(lambda x:x.split(',')).map(lambda x:Row(x[0],x[1]))
print(type(rowRdd))df=spark.createDataFrame(rowRdd,schema)
df.show()
df.createOrReplaceTempView("people")

2.3、JDBC连接方式生成DataFrame

Spark支持通过JDBC方式连接到其他数据库获取数据生成DataFrame。

配置mysql驱动可参考https://blog.csdn.net/u013069552/article/details/108903622

生成DataFrame代码如下

jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://10.26.1,1:10649/test").option("driver","com.mysql.jdbc.Driver").option("dbtable", "test").option("user", "root").option("password", "123456").load()
jdbcDF.show()

2.4、hive读取生成DataFrame

from pyspark import SparkContext
from pyspark.sql import SparkSession
import os
sc = SparkContext( 'local', '1111')
from pyspark.sql import HiveContextsql='select * from DB_test.TABLE_test'
data = HiveContext(sc).sql(sql).repartition(500).rdd
df=HiveContext(sc).sql(sql)
data.take(10)
df.show()

3、DataFrame的常见操作

3.1、打印模式信息

#打印模式信息,输出DataFrame的头信息及相关类型
df.printSchema()

3.2、选择多列

#选择多列
df.select(df.name,df.age).show()

3.3、条件过滤

#条件过滤
df.filter(df.age>20).show()

3.4、分组聚合

#分组聚合
df.groupBy('age').count().show()

3.5、排序

#排序
df.sort(df.age.desc()).show()

3.6、多列排序

#多列排序
df.sort(df.age.desc(),df.name.asc()).show()

3.7、对列重命名

#对列重命名
df.select(df.name.alias("Name"),df.age).show()

3.8、存储

#存储
df.rdd.saveAsTextFile("file:///home/jsy/spark_test/df.txt")
df.select("name","age").write.format("csv").save("file:///home/jsy/spark_test/df_csv.csv")

参考文献

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#pyspark.sql.SparkSession

http://dblab.xmu.edu.cn/blog/1709-2/

 

这篇关于spark之DataFrame的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1113365

相关文章

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

【python pandas】 Dataframe的数据print输出 显示为...省略号

pandas.set_option() 可以设置pandas相关的参数,从而改变默认参数。 打印pandas数据事,默认是输出100行,多的话会输出….省略号。 那么可以添加: pandas.set_option('display.max_rows',None) 这样就可以显示全部数据 同样,某一列比如url太长 显示省略号 也可以设置。 pd.set_option('display.

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

Spark数据介绍

从趋势上看,DataFrame 和 Dataset 更加流行。 示例场景 数据仓库和 BI 工具集成: 如果你需要处理存储在数据仓库中的结构化数据,并且希望与 BI 工具集成,那么 DataFrame 和 Dataset 是首选。 机器学习流水线: 在构建机器学习流水线时,使用 DataFrame 和 Dataset 可以更好地管理数据流,并且可以方便地与 MLlib 集成。 实时数据处理:

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop

Spark-在集群上运行Spark

Spark-在集群上运行Spark

Spark—数据读取和保存

Spark—数据读取和保存

Spark源码分析之Spark Shell(上)

终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。 先来介绍一下Spark-shell是什么? Spark-shell是提供给用户即时交互的一个命令窗口,你可以在里面编写spark代码,然后根据你的命令立即进行

[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。可以查看 编程指南了解更多的内容。 为了良好的阅读下面的文档,最好是结合实际的练习。首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs。 Spark Shell 交互 基本操作 Spark Shell提供给用