Spark SQL(一) 如何创建DataFrames

2024-03-29 17:58

本文主要是介绍Spark SQL(一) 如何创建DataFrames,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark SQL(一) 如何创建DataFrames

Spark SQL包含两个主要的部分,第一部分是DataFrames和Datasets, 第二部分是Catalyst optimizer.
DataFrames和Datasets是结构性API的展示,定义了操作结构化数据的高层次API,
而Catalyst optimizer则是在背后对处理数据的逻辑进行优化,以加速处理数据的速度。

结构化数据通常有确定的格式,比如文本类数据格式CSV, XML, JSON。还有二进制数据:Avro, Parquet, ORC.
Spark支持以上数据格式的读和写,因此Spark可以作为数据格式转换工具。

DataFrames

DataFrames 是一个不可变的,以行的形式被组织的,分布式的数据集合,类似于关系数据库中的表。

和RDD类似, DataFrame相关的API也被分为转换(Transformation)和行为(Action), 且转换操作是懒生效模式,行为是立即生效模式。

DataFrame可以通过从之前提到的多种个数读数据创建,也可以通过读Hive或数据库中的表创建,同时Spark SQL还支持把RDD转化成DataFrame.

创建DataFrames
  1. 从RDD中创建DataFrames

一个例子:

import scala.util.Random
val rdd = spark.sparkContext.parallelize(1 to 10).map(x => (x, Random.nextInt(100)* x))val kvDF = rdd.toDF("key","value")

这段代码首先创建了RDD, 然后调用toDF指定列名,隐式创建了一个DataFrame.

我们可以用printSchema方法打印一个DataFrame的schema, 然后通过show方法打印出数据,默认show只显示前20行,可以指定打印的行数。

kvDF.printSchema
|-- key: integer (nullable = false)
|-- value: integer (nullable = false)
kvDF.show(5)
+---+-----+
|key|value|
+---+-----+
|  1|   59|
|  2|   60|
|  3|   66|
|  4|  280|
|  5|   40|
+---+-----+
  1. 通过编程的方式创建一个schema,并和一个RDD绑定从而创建一个DataFrame

记住toDF是隐式的创建一个schema,所以不需要指定schema.

一个例子:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val peopleRDD = spark.sparkContext.parallelize(Array(Row(1L, "John Doe",  30L), Row(2L, "Mary Jane", 25L)))val schema = StructType(Array(StructField("id", LongType, true),StructField("name", StringType, true),StructField("age", LongType, true)
))

创建DataFrame

val peopleDF = spark.createDataFrame(peopleRDD, schema)

同样地查看shema和数据:

peopleDF.printSchema|-- id: long (nullable = true)|-- name: string (nullable = true)|-- age: long (nullable = true)
peopleDF.show
+--+-----------+---+
|id|       name|age|
+--+-----------+---+
| 1|   John Doe| 30|
| 2|  Mary Jane| 25|
+--+-----------+---+

DataFrame中每一个列的数据类型被映射到一个spark 内部数据类型。映射关系如下:

数据类型:ScaleType
BooleanType: Boolean
ByteType: Byte
ShortType: Short
IntegerType: Int
LongType: Long
FloatType: Float
DoubleType: Double
DecimalType: java.math.BigDecimal
StringType: String
BinaryType: Array[Byte]
TimestampType: java.sql.Timestamp
DateType: java.sql.Date
ArrayType: scala.collection.Seq
MapType: scala.collection.Map
StructType: org.apache.spark.sql.Row
  1. 从一个数字范围创建一个DataFrame

一个例子:

val df1 = spark.range(5).toDF("num").show

输出:

+---+
|num|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
  1. 根据一个数据源创建DataFrame

SparkSQL中和读写数据有关的类是DataFrameReaderDataFrameWriter
SparkSession类的read成员就是一个DataFrameReader类的实例,
所以通常情况下:可以用下面的语句从数据源创建一个DataFrame

spark.read.format(...).option("key", value").schema(...).load()

其中format可以有以下几种(json, parquet, jdbc, orc, csv, text)。

Spark内置了6种数据源:

  • 从文本文件创建DataFrame
spark.read.text("README.md")
  • 从CSV创建
val movies = spark.read.option("header","true").csv("<path>/book/chapter4/data/movies/movies.csv")

对于CSV数据源来讲有4个option可以设置:sep, header, escape, inferSchema.
其中sep是指指定一个字符作为分割符, CSV文件默认分割符是,, header的取值是true或者false,默认值是false, escape是当列中的数据和sep字符相同时用来转义用的,取值时任何字符,默认值是\. inferSchema用来指定是否根据列值来判断列的数据类型, 取值是true或者false, 默认值是false.

  • 从json文件创建
val movies5 = spark.read.json("<path>/book/chapter4/data/movies/movies.json")
  • 从parquet创建
val movies9 = spark.read.load("<path>/book/chapter4/data/movies/movies.parquet")
  • 从ORC创建
val movies11 = spark.read.orc("<path>/book/chapter4/data/movies/movies.orc")
  • 从jdbc数据源创建
val mysqlURL= "jdbc:mysql://localhost:3306/sakila"
val filmDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("url", mysqlURL).option("dbtable", "film").option("user", "<username>").option("password","<password>").load()

这篇关于Spark SQL(一) 如何创建DataFrames的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/859258

相关文章

创建Java keystore文件的完整指南及详细步骤

《创建Javakeystore文件的完整指南及详细步骤》本文详解Java中keystore的创建与配置,涵盖私钥管理、自签名与CA证书生成、SSL/TLS应用,强调安全存储及验证机制,确保通信加密和... 目录1. 秘密键(私钥)的理解与管理私钥的定义与重要性私钥的管理策略私钥的生成与存储2. 证书的创建与

浅谈mysql的not exists走不走索引

《浅谈mysql的notexists走不走索引》在MySQL中,​NOTEXISTS子句是否使用索引取决于子查询中关联字段是否建立了合适的索引,下面就来介绍一下mysql的notexists走不走索... 在mysql中,​NOT EXISTS子句是否使用索引取决于子查询中关联字段是否建立了合适的索引。以下

Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式

《Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式》本文详细介绍如何使用Java通过JDBC连接MySQL数据库,包括下载驱动、配置Eclipse环境、检测数据库连接等关键步骤,... 目录一、下载驱动包二、放jar包三、检测数据库连接JavaJava 如何使用 JDBC 连接 mys

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语