Spark SQL快速指南

2024-06-23 12:04
文章标签 spark sql 指南 快速 database

本文主要是介绍Spark SQL快速指南,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark SQL快速指南

  • 一、起点:SparkSession
  • 二、创建DataFrames
  • 三、DataFrame操作
  • 四、以编程方式运行SQL查询
  • 五、全局临时视图
  • 创建Datasets
  • 六、与RDD进行互操作
  • 6.1 使用反射推断Schema
  • 6.2 以编程方式指定Schema
  • 七、Scalar函数
  • 八、聚合函数

一、起点:SparkSession

进入Spark所有功能的入口点是SparkSession类。要创建一个基本的SparkSession,只需使用SparkSession.builder:
Java

import org.apache.spark.sql.SparkSession;SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate();

Python

from pyspark.sql import SparkSessionspark = SparkSession \.builder \.appName("Python Spark SQL basic example") \.config("spark.some.config.option", "some-value") \.getOrCreate()

在Spark repo的"examples/src/main/python/sql/basic.py"中找到完整的示例代码。
Spark 2.0中的SparkSession提供了对Hive功能的内置支持,包括使用Hive QL编写查询的能力、对Hive UDF的访问以及从Hive 表读取数据的能力。要使用这些功能,你不需要配置一个单独的Hive。

二、创建DataFrames

使用SparkSession,应用程序可以从现有RDD、Hive表或Spark数据源创建DataFrames。
例如,以下内容基于JSON文件的内容创建DataFrame:
Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Python

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

在Spark repo中的“examples/src/main/python/sql/basic.py”中找到完整的示例代码。

三、DataFrame操作

DataFrames为Scala、Java、Python和R中的结构化数据操作提供了一种特定于领域的语言。
如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中的多行Dataset。这些操作也被称为“非类型化转换”,与强类型Scala/Java数据集中的“类型转换”不同。
在这里,我们包括一些使用Datasets进行结构化数据处理的基本示例:
在Python中,可以通过属性(df.age)或索引(df[‘age’])访问DataFrame的列。虽然前者便于交互式数据探索,但强烈鼓励用户使用后一种形式,这种形式经得起未来的考验,不会与DataFrame类上的列名相冲突。
Java

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

Python

# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+

在Spark repo的“examples/src/main/python/sql/basic.py”中找到完整的示例代码。
有关可在DataFrame上执行的操作类型的完整列表,请参阅API文档。
除了简单的列引用和表达式外,DataFrames还拥有丰富的函数库,包括字符串操作、日期算术、常见数学运算等。完整列表可在《DataFrame函数参考》中找到。

四、以编程方式运行SQL查询

SparkSession上的sql函数使应用程序能够以编程方式运行sql查询,并将结果作为DataFrame返回。
Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Python

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

在Spark repo中的“examples/src/main/python/sql/basic.py”中找到完整的示例代码。

五、全局临时视图

Spark SQL中的临时视图是会话范围的,如果创建它的会话终止,它将消失。如果你希望拥有一个在所有会话之间共享的临时视图,并在Spark应用程序终止之前保持活动状态,则可以创建一个全局临时视图。全局临时视图绑定到系统保留的数据库global_temp,我们必须使用限定名称来引用它,例如SELECT * FROM global_temp.view1。
Java

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Python

# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

在Spark repo中的“examples/src/main/python/sql/basic.py”中找到完整的示例代码。

创建Datasets

Datasets类似于RDD,但是,它们没有使用Java序列化或Kryo,而是使用专门的编码器来序列化对象,以便通过网络进行处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并使用一种格式,允许Spark执行许多操作,如过滤、排序和哈希,而无需将字节反序列化回对象。

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;public static class Person implements Serializable {private String name;private long age;public String getName() {return name;}public void setName(String name) {this.name = name;}public long getAge() {return age;}public void setAge(long age) {this.age = age;}
}// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(Collections.singletonList(person),personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+// Encoders for most common types are provided in class Encoders
Encoder<Long> longEncoder = Encoders.LONG();
Dataset<Long> primitiveDS = spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);
Dataset<Long> transformedDS = primitiveDS.map((MapFunction<Long, Long>) value -> value + 1L,longEncoder);
transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

在Spark Repo中找到完整的示例代码。

六、与RDD进行互操作

Spark SQL支持两种不同的方法将现有RDD转换为Datasets。第一种方法使用反射来推断包含特定类型对象的RDD的schema。这种基于反射的方法可以生成更简洁的代码,并且当你在编写Spark应用程序时已经了解schema时,这种方法可以很好地工作。
创建Datasets的第二种方法是通过编程接口,该接口允许你构建一个schema,然后将其应用于现有的RDD。虽然此方法更详细,但你直到运行时才知道它的列及其类型,从而构造数据集。

6.1 使用反射推断Schema

Spark SQL支持将JavaBeans的RDD自动转换为DataFrame。使用反射获得的BeanInfo定义了表的schema。目前,Spark SQL不支持包含Map字段的JavaBeans。但是支持嵌套JavaBeans和List或Array字段。你可以通过创建一个实现Serializable的类来创建JavaBean,并且该类的所有字段都有getter和setter。
Java

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read().textFile("examples/src/main/resources/people.txt").javaRDD().map(line -> {String[] parts = line.split(",");Person person = new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map((MapFunction<Row, String>) row -> "Name: " + row.getString(0),stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map((MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

在Spark Repo中找到完整的示例代码。

Spark SQL可以将Row对象的RDD转换为DataFrame,从而推断数据类型。行是通过将键/值对列表作为kwargs传递给Row类来构造的。该列表的键定义了表的列名,并且通过对整个数据集进行采样来推断类型,类似于对JSON文件执行的推断。
Python

from pyspark.sql import Rowsc = spark.sparkContext# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:print(name)
# Name: Justin

在Spark repo中的“examples/src/main/python/sql/basic.py”中找到完整的示例代码。

6.2 以编程方式指定Schema

当JavaBean类不能提前定义时(例如,记录的结构被编码为字符串,或者文本数据集将被解析,字段将以不同的方式投影给不同的用户),可以通过三个步骤以编程方式创建Dataset。

  1. 从原始RDD的行创建一个RDD;
  2. 创建由StructType表示的schema,该schema与步骤1中创建的RDD中的Rows结构相匹配。
  3. 通过SparkSession提供的createDataFrame方法将schema应用到RDD的行。例如:
    Java实现:
import java.util.ArrayList;
import java.util.List;import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext().textFile("examples/src/main/resources/people.txt", 1).toJavaRDD();// The schema is encoded in a string
String schemaString = "name age";// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim());
});// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map((MapFunction<Row, String>) row -> "Name: " + row.getString(0),Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

当无法提前定义kwargs的字典时(例如,记录的结构被编码在字符串中,或者文本数据集将被解析,并且字段将针对不同的用户进行不同的投影),可以通过三个步骤以编程方式创建DataFrame。
4. 从原始RDD创建元组RDD或列表RDD;
5. 创建由StructType表示的schema,该schema与步骤1中创建的RDD中的元组或列表的结构相匹配。
6. 通过SparkSession提供的createDataFrame方法将schema应用于RDD。
例如:
Python实现

# Import data types
from pyspark.sql.types import StringType, StructType, StructFieldsc = spark.sparkContext# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))# The schema is encoded in a string.
schemaString = "name age"fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

在Spark repo中的“examples/src/main/python/sql/basic.py”中找到完整的示例代码。

七、Scalar函数

Scalar函数是指每行返回一个值的函数,而聚合函数则返回一组行的值。Spark SQL支持各种内置Scalar函数。它还支持用户定义的Scalar函数。

八、聚合函数

聚合函数是在一组行上返回单个值的函数。内置聚合函数提供常见的聚合,如count(), count_distinct(), avg(), max(), min()等。用户不限于预定义的聚合函数,可以创建自己的聚合函数。有关用户定义聚合函数的更多详细信息,请参阅用户定义聚合功能的文档。

这篇关于Spark SQL快速指南的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1087105

相关文章

乐鑫 Matter 技术体验日|快速落地 Matter 产品,引领智能家居生态新发展

随着 Matter 协议的推广和普及,智能家居行业正迎来新的发展机遇,众多厂商纷纷投身于 Matter 产品的研发与验证。然而,开发者普遍面临技术门槛高、认证流程繁琐、生产管理复杂等诸多挑战。  乐鑫信息科技 (688018.SH) 凭借深厚的研发实力与行业洞察力,推出了全面的 Matter 解决方案,包含基于乐鑫 SoC 的 Matter 硬件平台、基于开源 ESP-Matter SDK 的一

mysql索引四(组合索引)

单列索引,即一个索引只包含单个列,一个表可以有多个单列索引,但这不是组合索引;组合索引,即一个索引包含多个列。 因为有事,下面内容全部转自:https://www.cnblogs.com/farmer-cabbage/p/5793589.html 为了形象地对比单列索引和组合索引,为表添加多个字段:    CREATE TABLE mytable( ID INT NOT NULL, use

mysql索引三(全文索引)

前面分别介绍了mysql索引一(普通索引)、mysql索引二(唯一索引)。 本文学习mysql全文索引。 全文索引(也称全文检索)是目前搜索引擎使用的一种关键技术。它能够利用【分词技术】等多种算法智能分析出文本文字中关键词的频率和重要性,然后按照一定的算法规则智能地筛选出我们想要的搜索结果。 在MySql中,创建全文索引相对比较简单。例如:我们有一个文章表(article),其中有主键ID(

mysql索引二(唯一索引)

前文中介绍了MySQL中普通索引用法,和没有索引的区别。mysql索引一(普通索引) 下面学习一下唯一索引。 创建唯一索引的目的不是为了提高访问速度,而只是为了避免数据出现重复。唯一索引可以有多个但索引列的值必须唯一,索引列的值允许有空值。如果能确定某个数据列将只包含彼此各不相同的值,在为这个数据列创建索引的时候就应该使用关键字UNIQUE,把它定义为一个唯一索引。 添加数据库唯一索引的几种

mysql索引一(普通索引)

mysql的索引分为两大类,聚簇索引、非聚簇索引。聚簇索引是按照数据存放的物理位置为顺序的,而非聚簇索引则不同。聚簇索引能够提高多行检索的速度、非聚簇索引则对单行检索的速度很快。         在这两大类的索引类型下,还可以降索引分为4个小类型:         1,普通索引:最基本的索引,没有任何限制,是我们经常使用到的索引。         2,唯一索引:与普通索引

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

SQL Server中,always on服务器的相关操作

在SQL Server中,建立了always on服务,可用于数据库的同步备份,当数据库出现问题后,always on服务会自动切换主从服务器。 例如192.168.1.10为主服务器,12为从服务器,当主服务器出现问题后,always on自动将主服务器切换为12,保证数据库正常访问。 对于always on服务器有如下操作: 1、切换主从服务器:假如需要手动切换主从服务器时(如果两个服务

SQL Server中,isnull()函数以及null的用法

SQL Serve中的isnull()函数:          isnull(value1,value2)         1、value1与value2的数据类型必须一致。         2、如果value1的值不为null,结果返回value1。         3、如果value1为null,结果返回vaule2的值。vaule2是你设定的值。        如

SQL Server中,添加数据库到AlwaysOn高可用性组条件

1、将数据添加到AlwaysOn高可用性组,需要满足以下条件: 2、更多具体AlwaysOn设置,参考:https://msdn.microsoft.com/zh-cn/library/windows/apps/ff878487(v=sql.120).aspx 注:上述资源来自MSDN。