本文主要是介绍【spark 读写数据】数据源的读写操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
通用的 Load/Save 函数
在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。
Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手动指定选项
你也可以手动的指定数据源,并且将与你想要传递给数据源的任何额外选项一起使用。数据源由其完全限定名指定(例如 : org.apache.spark.sql.parquet),不过对于内置数据源你也可以使用它们的缩写名(json, parquet, jdbc)。使用下面这个语法可以将从任意类型数据源加载的DataFrames 转换为其他类型。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
直接在文件上运行 SQL
你也可以直接在文件上运行 SQL 查询来替代使用 API 将文件加载到 DataFrame 再进行查询。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
保存为持久化的表
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
peopleDF.write.parquet("people.parquet")
val parquetFileDF = spark.read.parquet("people.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
这篇关于【spark 读写数据】数据源的读写操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!