本文主要是介绍Spark SQL(一) 如何创建DataFrames,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Spark SQL(一) 如何创建DataFrames
Spark SQL包含两个主要的部分,第一部分是DataFrames和Datasets, 第二部分是Catalyst optimizer.
DataFrames和Datasets是结构性API的展示,定义了操作结构化数据的高层次API,
而Catalyst optimizer则是在背后对处理数据的逻辑进行优化,以加速处理数据的速度。
结构化数据通常有确定的格式,比如文本类数据格式CSV, XML, JSON。还有二进制数据:Avro, Parquet, ORC.
Spark支持以上数据格式的读和写,因此Spark可以作为数据格式转换工具。
DataFrames
DataFrames 是一个不可变的,以行的形式被组织的,分布式的数据集合,类似于关系数据库中的表。
和RDD类似, DataFrame相关的API也被分为转换(Transformation)和行为(Action), 且转换操作是懒生效模式,行为是立即生效模式。
DataFrame可以通过从之前提到的多种个数读数据创建,也可以通过读Hive或数据库中的表创建,同时Spark SQL还支持把RDD转化成DataFrame.
创建DataFrames
- 从RDD中创建DataFrames
一个例子:
import scala.util.Random
val rdd = spark.sparkContext.parallelize(1 to 10).map(x => (x, Random.nextInt(100)* x))val kvDF = rdd.toDF("key","value")
这段代码首先创建了RDD, 然后调用toDF
指定列名,隐式创建了一个DataFrame.
我们可以用printSchema
方法打印一个DataFrame的schema, 然后通过show
方法打印出数据,默认show
只显示前20行,可以指定打印的行数。
kvDF.printSchema
|-- key: integer (nullable = false)
|-- value: integer (nullable = false)
kvDF.show(5)
+---+-----+
|key|value|
+---+-----+
| 1| 59|
| 2| 60|
| 3| 66|
| 4| 280|
| 5| 40|
+---+-----+
- 通过编程的方式创建一个schema,并和一个RDD绑定从而创建一个DataFrame
记住toDF
是隐式的创建一个schema,所以不需要指定schema.
一个例子:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val peopleRDD = spark.sparkContext.parallelize(Array(Row(1L, "John Doe", 30L), Row(2L, "Mary Jane", 25L)))val schema = StructType(Array(StructField("id", LongType, true),StructField("name", StringType, true),StructField("age", LongType, true)
))
创建DataFrame
val peopleDF = spark.createDataFrame(peopleRDD, schema)
同样地查看shema和数据:
peopleDF.printSchema|-- id: long (nullable = true)|-- name: string (nullable = true)|-- age: long (nullable = true)
peopleDF.show
+--+-----------+---+
|id| name|age|
+--+-----------+---+
| 1| John Doe| 30|
| 2| Mary Jane| 25|
+--+-----------+---+
DataFrame中每一个列的数据类型被映射到一个spark 内部数据类型。映射关系如下:
数据类型:ScaleType
BooleanType: Boolean
ByteType: Byte
ShortType: Short
IntegerType: Int
LongType: Long
FloatType: Float
DoubleType: Double
DecimalType: java.math.BigDecimal
StringType: String
BinaryType: Array[Byte]
TimestampType: java.sql.Timestamp
DateType: java.sql.Date
ArrayType: scala.collection.Seq
MapType: scala.collection.Map
StructType: org.apache.spark.sql.Row
- 从一个数字范围创建一个DataFrame
一个例子:
val df1 = spark.range(5).toDF("num").show
输出:
+---+
|num|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
- 根据一个数据源创建DataFrame
SparkSQL中和读写数据有关的类是DataFrameReader
和DataFrameWriter
SparkSession
类的read成员就是一个DataFrameReader
类的实例,
所以通常情况下:可以用下面的语句从数据源创建一个DataFrame
spark.read.format(...).option("key", value").schema(...).load()
其中format可以有以下几种(json, parquet, jdbc, orc, csv, text)。
Spark内置了6种数据源:
- 从文本文件创建DataFrame
spark.read.text("README.md")
- 从CSV创建
val movies = spark.read.option("header","true").csv("<path>/book/chapter4/data/movies/movies.csv")
对于CSV数据源来讲有4个option可以设置:sep
, header
, escape
, inferSchema
.
其中sep
是指指定一个字符作为分割符, CSV文件默认分割符是,
, header
的取值是true或者false,默认值是false, escape
是当列中的数据和sep
字符相同时用来转义用的,取值时任何字符,默认值是\
. inferSchema
用来指定是否根据列值来判断列的数据类型, 取值是true或者false, 默认值是false.
- 从json文件创建
val movies5 = spark.read.json("<path>/book/chapter4/data/movies/movies.json")
- 从parquet创建
val movies9 = spark.read.load("<path>/book/chapter4/data/movies/movies.parquet")
- 从ORC创建
val movies11 = spark.read.orc("<path>/book/chapter4/data/movies/movies.orc")
- 从jdbc数据源创建
val mysqlURL= "jdbc:mysql://localhost:3306/sakila"
val filmDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", mysqlURL).option("dbtable", "film").option("user", "<username>").option("password","<password>").load()
这篇关于Spark SQL(一) 如何创建DataFrames的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!