进化的Spark, 从DataFrame说起

2024-04-13 10:18
文章标签 dataframe 说起 spark 进化

本文主要是介绍进化的Spark, 从DataFrame说起,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本打算写一篇spark的DataFrame的文章,结果在网上找到一篇写的非常棒的问题,就直接向大师致敬了。

转载的URL:http://blueve.me/archives/1452?utm_source=tuicool&utm_medium=referral


书接上回,Spark可以说就是RDDs的化身,我们已经看到RDDs的设计方案对于大数据的计算具有诸多优势,但是随着Spark项目的推进,大家也逐渐地发现,在实际的生产领域,Spark RDDs API的编程方式并不是唯一的选择,很多的计算,都很像目前关系型数据库中的关联查询,而所谓的“信息”,其实很大部分都是产生自数据与数据之间的关联,而对于这样的数据模型,RDDs模型的表达能力还是相对有限的。

Spark 1.3.0的发布带来了全新的DataFrame API以及Spark SQL,我们继续追根溯源,找到关于DataFrame与Spark SQL的核心设计稿 – Spark sql: Relational data processing in spark[1]。

从这篇文章的角度来讲,DataFrame API和Spark SQL是配套的,从下图我们就可以看出来新增的这个模块与原有Spark框架的关系:

1

它们只是以不同的角度来诠释Spark在关系型数据的处理能力,可以看到,整个Spark SQL其实可以拆分为两个重量级的部分来描述(1)DataFrame API 以及(2)Catalyst Optimizer,接下来我们也就顺着这个思路来介绍他们。

DataFrame API

文中给出了DataFrame的核心目标:

  1. 关系式数据处理的API
  2. 可以充分利用现有数据库管理系统的优化技术
  3. 具备接纳新数据源的能力
  4. 提供更好的分析类算法的支持

我们从存储和计算两个角度来看一看这个DataFrame。

1 数据存储结构

DataFrame这个名字是取自R等语言中的概念,在这些专用于数据统计、分析的语言中,DataFrame承担了举足轻重的作用,Spark希望能够借助DataFrame实现在大数据计算平台上的同类功能,不过DataFrame究竟是什么呢?它的发明是否有意义呢?为了探究这个问题,我们将DataFrame和RDDs进行对比,首先最直观的就是他们各自的数据存储方式的差异:

2

可以看到,DataFrame直观上很像是RDDs的加强版,它和RDDs在数据存储上最大的区别就在于,DataFrame是有Schema的,通俗的讲,就是上图中蓝色框住的那个表头。不要小看这一点,对于复杂的数据类型,DataFrame的这种结构可以使编程大大简化,比如下面的这段示意性代码(虽然我用了C#的语法,但希望你可以把object看成万能且不会发生slicing的值类型):

在RDDs中,数据集中的每项数据都是一个整体,因为你无法得知其内部的结构,这也就使得你对数据项的操作能力很弱,当你想获得数据项内部的部分信息的时候,你需要手动将object按照你预先设定的数据格式进行分割,麻烦,且容易出错。而使用DataFrame,意味着你可以直接获得数据项的内部数据结构,并且由于DataFrame的Schema的存在(在上面这段代码中,Schema就相当于是struct),数据项的转换也都将是类型安全的,这对于较为复杂的数据计算程序的调试是十分有利的,很多数据类型不匹配的问题都可以在编译阶段就被检查出来,而对于不合法的数据文件,DataFrame也具备一定分辨能力。

而另一方面,我们注意到,当RDD被切割出“列”并加上“表头”变成DataFrame之后,就意味着DataFrame要支持比RDD更加细粒度的查询,而这种Table式的结构,很容易就可以让我们联想到数据库中数据表,同时DataFrame API也支持使用者对DataFrame进行数据库那样的关联、聚合、筛选等查询操作。有些人可能会因此把DataFrame和ORM放在一起对比,诚然,他们之间确实具备诸多相似之处,这些相似之处很容易让人产生混淆,所以我在此简单的对他们的区别加以解释,ORM大致如下图所示:

3

Database中的数据需要被提取出来(通过ORM框架执行SQL语句来进行),并转换为程序可操作的Objects,然后才能被开发者使用,用户对Objects做得任何修改,都不会直接影响数据库中的数据,而且像一些关联表,在ORM中甚至于根本不会被映射为Objects,他们会被隐含在Objects内部的关联关系中。总而言之,ORM会把原有的数据以不同的形式进行转换后才呈现出来。而DataFrame不一样,它自己就扮演了数据库的角色,对DataFrame的查询,都是在RDDs数据集上做得直接映射,原汁原味,就地取材。你可以把DataFrame中的数据,理解为“逻辑数据”,而“物理数据”实际上在RDDs中。

4

因此,为了能够更快更好的定位到数据,甚至于更好的利用内存与磁盘中的存储空间,DataFrame中的数据在内存和磁盘中的排列也必须更为考究,才能够在不损失性能的前提下提供这些操作。Spark SQL团队给出的方案是:按列压缩存储。

列式存储是近年来出现频率越来越多的一个概念,其实它本身很早就已经被提出了,我们首先来看下列式存储和行式存储,在存储介质上的区别,还以之前的图为例:

5

传统的关系型数据库通常都采用行式存储,可以看到,数据一条一条的紧密排列着,通常情况下,行式存储中的每行数据都是紧密排列的。而列式存储则是将行拆开,将一列的数据放在一起,同时不同列可以存放在不同的位置(由于天然利于纵向分表,所以在超大数据集的存储上,列式存储也具有一定优势)。通常情况下,我们查询一个数据并不需要检查一行数据中的每个列条目,但是在行式存储中,必须要扫描全部数据集才能够筛选出我们想要的那条数据,既然我们检索的项目很可能只是“Id”一项而已,那为什么要去管其他列呢?特别是在磁盘上,磁头访问数据的方式是线性的,如果只想根据“Id”进行筛选,即便只是上面那个只有两列的数据表,磁头移动的距离也要超过列式存储的好几倍。不过相应的,列式存储中“更新”“插入“”查询“等操作会比较麻烦,但是由于DataFrame和RDDs一样都是Immutable的,所以恰好规避了这一问题。

虽然DataFrame是可以根据情况存储到磁盘上的,但是讲道理,我们用Spark最看中的其实就是在内存中进行计算以及中间结果的转换的速度优势,而上面所说的列式存储的访存优势,对于支持随机访问的内存介质而言似乎并没有明显的优势,那么为什么DataFrame仍然会采用这种方式呢?那就是列式存储的另外一个优势:利于数据压缩。我们都知道,越“随机”的数据,越难以压缩,越“相似”的数据,越容易压缩,数据表既然分了列,那么通常来讲他们就是独立的数据项,毫无关联的数据摆在一起,你说要怎么压缩嘛。而压缩本身,对于渴望大量内存的Spark而言,能够带来巨大的实惠。

而最终,DataFrame API中这种支持对列进行访问的形式,要比RDDs API的数据访问粒度更为细腻,这也就意味着数据工程师可以根据“列”的性质,来为列建立索引,从而避免遍历所有的数据项,这项数据访问优化技术在传统的关系型数据库中也是广泛使用的,但凡需要查询的关系型数据表,都可以通过建立索引来大大增加查询效率,DataFrame将这项技术引入进来,也算是对传统数据库管理系统的技术的一种借鉴。

另一方面,Spark SQL还支持通过UDT(User-Defined Type)来扩展DataFrame中“列”的数据类型。UDT是对通过基础数据类型(String, Int, Double…)的组合来扩展数据库的类型系统,这个概念来自于面向对象数据库(其实像SQL Server\DB2等数据库都是支持的)。有些关系型数据库(例如MySQL)的数据表的字段就只能从内置的N种数据类型中进行挑选(varchar/char/int/bigint…),而支持UDT其实对于处理和存储复杂数据类型是有益的,特别在大数据场景,有些带嵌套的JSON数据格式,存成数据表的形式就比较麻烦,有了UDF,只需要定义一套符合这种数据结构的新数据类型,就可以直接应用在DataFrame的列上面,不仅可以增加DataFrame数据项的表达能力,同时也仍然享有运行时的数据类型安全监测。

UDT虽然看上去好用,能让列的数据类型更直观更具表达力,但其实也会造成一些麻烦,前文也说了,DataFrame采用列式压缩存储,而这种存储形式是针对“内置类型”的一种优化方案。为了让UDT也能享受这种优化带来的好处,Spark SQL通过把UDT映射到内置类型上(内置类型除了那些耳熟能详的基本类型,还包含Array等集合性质的数据类型),来使Spark SQL可以对其进行正确的列式压缩存储。

2 运作方式

说了“数据结构”,那么接下来自然就是“算法”了。DataFrame API在使用的时候和RDDs API具有诸多相似之处,先看论文中的介绍:

6

可以看到,在上图中出现频率最高的词莫过于“Plan”了,其实Plan和RDDs中的“Lineage”的概念是有一定相似性的。如前文所说,DataFrame也是Immutable的,它通过接收“转换”操作来产生新的DataFrame,而且它的执行同样是Lazy的,只有当遇到“Output OP”的时候才会开始执行上面那一套动作。而DataFrame和RDDs在执行过程中最大的不同就是,DataFrame API的使用实质上是一种DSL(Domain Specific Language),这个词如果你比较陌生的话,举个例子来说,其实你用过的SQL其实也是一种DSL。使用DSL就意味着Spark SQL可以对查询进行深入的观察和优化,DataFrame的一系列转换以Plan的形式,经过验证、分析、优化才形成最终的执行代码,而RDDs则是开发者写成什么样子,最后就以什么样子去执行。

Spark SQL为其DSL增加了定制能力。熟悉数据库的读者可能用过,查询语句中可以使用一些针对聚合数据的操作(例如MAX(), COUNT())或是日期函数(DATA()),并且有部分数据库支持用户自己定义这样函数并在查询语句中使用,这样的自定义函数就被称为UDF(User-Defined Function),Spark SQL支持UDF,也就意味着你可以对数据进行更为自由、方便的处理,使用起来就像这样:

除了UDF和以及前文提到的UDT之外,Spark SQL还支持扩展数据源,你几乎把任何数据源的数据放到Spark SQL里进行查询、连接……这对于数据格式多样的大数据意义非凡,Spark SQL对于其他数据源的数据获取有特殊的优化,这部分内容我将放到下一篇文章中,蘸着Catalyst来介绍。

现在,你可以把DataFrame类比为数据库视图(被Cache的DataFrame则很像传统数据库中的“物化视图”),然后把RDDs理解为存储介质上的生肉。DataFrame API则作为一套通用的数据处理工具,把一切的一切变成RDDs,然后交给作业执行引擎去跑。从上图也可以看到,优化到脚趾头的Plan最终会生成为RDDs然后在Spark上执行(DataFrame API支持开发者在使用它的过程中随时与RDD进行互换)。

其实在RDDs和DataFrame之间,有很多相似的概念,为作区分 ,我把他们稍微总结一下,避免发生不必要的困惑:

  RDDs DataFrame
延迟执行 Action Operation -> Run Output Operation -> Catalyst -> Run
执行计划 Lineage – DAG Task Plan -> RDDs -> DAG Task
数据缓存 Cache Columnar Cache
执行方式 Runtime DSL
数据源 手动转换 手动转换/反射/JDBC/…

没想到这篇文章会写这么长,本来打算一并介绍Catalyst的,但是由于现在篇幅规模已经过于巨大……所以还是另开新篇单独来说吧。

[1] Armbrust M, Xin R S, Lian C, et al. Spark sql: Relational data processing in spark[C]//Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data. ACM, 2015: 1383-1394.


这篇关于进化的Spark, 从DataFrame说起的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/899861

相关文章

Python使用Pandas库将Excel数据叠加生成新DataFrame的操作指南

《Python使用Pandas库将Excel数据叠加生成新DataFrame的操作指南》在日常数据处理工作中,我们经常需要将不同Excel文档中的数据整合到一个新的DataFrame中,以便进行进一步... 目录一、准备工作二、读取Excel文件三、数据叠加四、处理重复数据(可选)五、保存新DataFram

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

Python中差分进化differential_evolution的调用及参数说明

在场景应用中,要求我们的函数计算结果尽可能的逼近实际测量结果,可转化计算结果与测量结果的残差,通过最小化残差,便可求出最优的结果。但使用最小二乘等方法来计算时,常常会使迭代的结果显然局部最优点而导致结算错误。 差分进化原理 差分进化(Differential Evolution,DE)是一种基于群体差异的进化算法,其计算思想主要包括以下几个方面: 一、初始化种群 首先,随机生成一个初始种群

[机缘参悟-222] - 系统的重构源于被动的痛苦、源于主动的精进、源于进化与演进(软件系统、思维方式、亲密关系、企业系统、商业价值链、中国社会、全球)

目录 前言:系统的重构源于被动的痛苦、源于主动的精进、源于进化与演进 一、软件系统的重构 1、重构的定义与目的 2、重构的时机与方法 3、重构的注意事项 4、重构的案例分析 二、大脑思维的重构 1、大脑思维重构的定义 2、大脑思维重构的方法 3、大脑思维重构的挑战与前景 三、认知的重构 1、定义 2、目的 3、方法 四、实例 五、总结 四、婚姻家庭的重构 1、婚

【python pandas】 Dataframe的数据print输出 显示为...省略号

pandas.set_option() 可以设置pandas相关的参数,从而改变默认参数。 打印pandas数据事,默认是输出100行,多的话会输出….省略号。 那么可以添加: pandas.set_option('display.max_rows',None) 这样就可以显示全部数据 同样,某一列比如url太长 显示省略号 也可以设置。 pd.set_option('display.

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

期货赫兹量化-种群优化算法:进化策略,(μ,λ)-ES 和 (μ+λ)-ES

进化策略(Evolution Strategies, ES)是一种启发式算法,旨在模仿自然选择的过程来解决复杂的优化问题,尤其在没有显式解、或搜索空间巨大的情况下表现良好。基于自然界的进化原理,进化策略通过突变、选择等遗传算子迭代生成解,并最终寻求全局最优解。 进化策略通常基于两个核心机制:突变和选择。突变是对当前解进行随机扰动,而选择则用于保留适应度更高的个体。本文详细介绍了 (μ,λ)-ES

Spark数据介绍

从趋势上看,DataFrame 和 Dataset 更加流行。 示例场景 数据仓库和 BI 工具集成: 如果你需要处理存储在数据仓库中的结构化数据,并且希望与 BI 工具集成,那么 DataFrame 和 Dataset 是首选。 机器学习流水线: 在构建机器学习流水线时,使用 DataFrame 和 Dataset 可以更好地管理数据流,并且可以方便地与 MLlib 集成。 实时数据处理:

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop