大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程

本文主要是介绍大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • SparkSQL 语句 编码 测试 结果
  • 输入输出
  • 数据源包含如Parquet、JSON、CSV、Avro、ORC、Hive、JDBC、ODBC
  • TextFile

在这里插入图片描述

SparkSQL中的Join

数据分析中将两个数据集进行Join操作是很常见的场景。在Spark的物理计划阶段,Spark的Join Selection类会根据Join Hints 策略,Join表的大小、Join是等值Join还是不等值以及参与Join的Key是否可以排序等条件来选择最终的Join策略,最后Spark会利用选择好的Join策略执行最终的计算。

当前Spark一共支持五种Join策略:

  • Broadcast hash join (BHJ)
  • Shuffle hash join (SHJ)
  • Shuffle sort merge join(SMJ)
  • Shuffle-and-replicate nested loop join,又叫笛卡尔积(Cartesian product join)
  • Broadcast nested loop join(BNLJ)

其中 BHJ 和 SMJ 这两种 Join 策略是我们运行 Spark 任务最常见的。
JoinSelection 会先根据 Join 的 Key 为等值Join来选择 Broadcast hash join、Shuffle hash join、Shuffle sort merge join中的一个。
如果Join的Key为不等值Join或者没有指定Join条件,则会选择Broadcast nested loop join 或 Shuffle-and-replicate nested loop join。
不同的Join策略在执行效率上差别很大,了解每种Join策略的执行过程和适用条件是很有必要的。

Broadcast Hash Join

Broadcast Hash Join 的实现是将小表的数据广播到Spark所有的Executor端,这个广播过程和我们自己去广播数据没有什么区别:

  • 利用 Collect 算子将小表的数据从Executor端拉到Driver端
  • 在Driver端调用sparkContext.broadcast广播到所有Executor端
  • 在Executor端使用广播的数据与大表进行Join操作(实际上执行Map操作)

这种Join策略避免了Shuffle操作,一般而言,Broadcast Hash Join会比其他Join策略执行的要快。
在这里插入图片描述
使用这种 Join 策略必须满足如下的条件:

  • 小表的数据必须很小,可以通过 spark.sql.autoBroadcasetJoinThreshold 参数来配置,默认是10MB
  • 如果内存比较大,可以将阈值适当加大
  • 将 spark.sql.autoBroadcastJoinThreshold 参数设置为-1,可以关闭这种连接方式
  • 只能用于等值Join,不要求参与Join的keys可排序

Shuffle Hash Join

当表中的数据比较大,又不适合使用广播,这个时候就可以考虑 Shuffle Hash Join。
Shuffle Hash Join 同样是在大表和小表进行Join的时候选择了一种策略。
它的计算思想是:把大表和小表按照相同的分区算法和分区数据进行分区(根据参与Join的Keys进行分区),这样保证了 Hash 值一样的数据都分发到同一个分区中,然后在同一个 Executor 中两张表 Hash 值一样的分区就可以在本地进行Hash Join了。在进行 Join 之前,还会对小表的分区构建 Hash Map,Shuffle Hash Join 利用了分治思想,把大问题拆解成小问题去解决。

在这里插入图片描述
要启动 Shuffle Hash Join 必须满足以下条件:

  • 仅支持等值 Join,不要求参与Join的Keys可排序
  • spark.sql.join.perferSortMergeJoin 参数必须设置值为 false,参数从Spark2.0版本引入,默认值是true,也就是默认情况下是 Sort Merge Join
  • 小表的大小(plan.stats.sizeInBytes)必须小于(spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默认200))
  • 而且小表大小(stats.sizeInBytes)的三倍必须小于等于大表的大小(stats.sizeInBytes),也就是(a.stats.sizeInBytes * 3 < b.stats.sizeInBytes)

Shuffle Sort Merge Join

前面两种Join策略对表的大小都有条件的,如果参与Join的表都很大,这时候就得考虑用 Shuffle Sort Merge Join了。
Shuffle Sort Merge Join 的实现事项:

  • 将两张表按照 Join Key进行Shuffle,保证 Join Key值相同的记录会被分在相应的分区
  • 对每个分区内的数据进行排序
  • 排序后再对相应的分区内的记录进行连接

无论分区多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,而是即用即丢。
因为两个序列都有序,从头遍历,碰到Key相同的就输出,如果不同,左边小取左边,反之就取右边。
这样大大提高了大数据量下的SQL Join的稳定性。

在这里插入图片描述
要启用Shuffle Sort Merge Join必须满足以下条件:

  • 仅支持等值 Join,并且要求参与 Join 的 Keys 可排序

Cartesian Product Join

如果Spark中两张参与Join的表没有指定连接条件,那么产生Cartesian Product Join,这个Join得到的结果其实就是两张表行数的乘积。

Broadcast Nested Loop Join

可以把 Broadcast Nested Loop Join的执行看做下面的计算:

for record_1 in relation_1:for record_2 in relation_2:# join condition is executed

可以看出 Broadcast Nested Loop Join 在某些情况会对某张表重复扫描多次,效率非常低。从名字可以看出,这种Join会根据相关条件对小表进行广播,以减少表的扫描次数。
Broadcast Nested Loop Join支持等值和不等值Join,支持所有的Join类型。

SQL解析过程

基本概念

SparkSQL 可以说Spark中的精华部分,原来基于RDD构建大数据计算任务,重新在向Dataset转移,原来基于 RDD 写的代码也在迁移。
使用 SparkSQL 编码的好处是非常大的,尤其是性能方面,有很大提升。SparkSQL 中各种内嵌的性能优化比写RDD遵循各种最佳实践更加靠谱。
尤其对于新手来说,比如先 Filter 再 Map,SparkSQL中会自动进行谓词下推,Spark SQL中会自动使用 Broadcast Join来广播小表,把 Shuffle Join转换为 Map Join等等。

SparkSQL对SQL语句的处理和关系型数据库类似,即词法/语法解析、绑定、优化、执行。SparkSQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对Tree进行绑定、优化等处理过程。
SparkSQL由:Core、Catalyst、Hive、Hive-ThriftServer四部分构成:

  • Core:负责处理数据的输入和输出,如获取数据,查询结果输出成DataFrame等
  • Catalyst:负责处理整个查询过程,包括解析、绑定、优化等。
  • Hive:负责对Hive数据进行处理
  • Hive-ThriftServer:主要用于对Hive的访问

在这里插入图片描述
在这里插入图片描述
SparkSQL的代码复杂度是问题的本质复杂度带来说,SparkSQL中的Catalyst框架大部分逻辑是在一个Tree类型的数据结构上做各种折腾,基于Scala来实现还是很优雅的,Scala的偏函数和强大的Case正则匹配,让整个代码看起来非常优雅。

SparkSession是编写Spark应用代码的入口,启动一个spark-shell会提供给你创建spark-session,这个对象是整个Spark应用的起始点,以下是SparkSession的一些重要的变量和方法:
在这里插入图片描述

编写代码

package icu.wzk
import org.apache.spark.sql.{DataFrame, SparkSession}object TestDemo01 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("TestDemo01").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("warn")import spark.implicits._Seq((0, "zhansan", 10),(1, "lisi", 11),(2, "wangwu", 12)).toDF("id", "name", "age").createOrReplaceTempView("stu")Seq((0, "chinese", 80),(0, "math", 100),(0, "english", 98),(1, "chinese", 86),(1, "math", 97),(1, "english", 90),(2, "chinese", 90),(2, "math", 94),(2, "english", 88)).toDF("id", "subject", "score").createOrReplaceTempView("score")val df: DataFrame = spark.sql("""|SELECT SUM(v) AS total_score, name|FROM (|  SELECT stu.id, 100 + 10 + score.score AS v, name|  FROM stu|  JOIN score ON stu.id = score.id|  WHERE stu.age >= 11|) tmp|GROUP BY name|""".stripMargin)df.show()// 打印执行计划println(df.queryExecution)println(df.queryExecution.optimizedPlan)spark.close()}}

运行输出

执行代码可见控制台输出如下数据(我就不往服务器发了):
在这里插入图片描述
控制台的内容如下图所示:

+-----------+------+
|total_score|  name|
+-----------+------+
|        602|wangwu|
|        603|  lisi|
+-----------+------+== Parsed Logical Plan ==
'Aggregate ['name], ['SUM('v) AS total_score#27, 'name]
+- 'SubqueryAlias `tmp`+- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#26, 'name]+- 'Filter ('stu.age >= 11)+- 'Join Inner, ('stu.id = 'score.id):- 'UnresolvedRelation `stu`+- 'UnresolvedRelation `score`== Analyzed Logical Plan ==
total_score: bigint, name: string
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8]
+- SubqueryAlias `tmp`+- Project [id#7, ((100 + 10) + score#22) AS v#26, name#8]+- Filter (age#9 >= 11)+- Join Inner, (id#7 = id#20):- SubqueryAlias `stu`:  +- Project [_1#3 AS id#7, _2#4 AS name#8, _3#5 AS age#9]:     +- LocalRelation [_1#3, _2#4, _3#5]+- SubqueryAlias `score`+- Project [_1#16 AS id#20, _2#17 AS subject#21, _3#18 AS score#22]+- LocalRelation [_1#16, _2#17, _3#18]== Optimized Logical Plan ==
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8]
+- Project [(110 + score#22) AS v#26, name#8]+- Join Inner, (id#7 = id#20):- LocalRelation [id#7, name#8]+- LocalRelation [id#20, score#22]== Physical Plan ==
*(2) HashAggregate(keys=[name#8], functions=[sum(cast(v#26 as bigint))], output=[total_score#27L, name#8])
+- Exchange hashpartitioning(name#8, 200)+- *(1) HashAggregate(keys=[name#8], functions=[partial_sum(cast(v#26 as bigint))], output=[name#8, sum#38L])+- *(1) Project [(110 + score#22) AS v#26, name#8]+- *(1) BroadcastHashJoin [id#7], [id#20], Inner, BuildLeft:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))):  +- LocalTableScan [id#7, name#8]+- LocalTableScan [id#20, score#22]
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8]
+- Project [(110 + score#22) AS v#26, name#8]+- Join Inner, (id#7 = id#20):- LocalRelation [id#7, name#8]+- LocalRelation [id#20, score#22]

分析内容

queryExecution 就是对整个执行计划的执行引擎,里面有执行过程中各个中间过程变量,整个执行流程如下:
在这里插入图片描述
刚才的例子中的SQL语句经过Parser解析后就会变成一个抽象语法树,对应解析后的逻辑计划AST为:

== Analyzed Logical Plan ==
total_score: bigint, name: string
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8]
+- SubqueryAlias `tmp`+- Project [id#7, ((100 + 10) + score#22) AS v#26, name#8]+- Filter (age#9 >= 11)+- Join Inner, (id#7 = id#20):- SubqueryAlias `stu`:  +- Project [_1#3 AS id#7, _2#4 AS name#8, _3#5 AS age#9]:     +- LocalRelation [_1#3, _2#4, _3#5]+- SubqueryAlias `score`+- Project [_1#16 AS id#20, _2#17 AS subject#21, _3#18 AS score#22]+- LocalRelation [_1#16, _2#17, _3#18]

在执行计划中 Project/Projection 代表的意思是投影
在这里插入图片描述
其中过滤条件变为了 Filter 节点,这个节点是 UnaryNode (一元节点)类型,只有一个孩子。
两个表中的数据变为了 UnresolvedRelation 节点,节点类型为 LeafNode,即叶子节点,Join操作为节点,这个是一个BinaryNode节点,有两个孩子。
以上节点都是LogicalPlan类型的,可以理解为各种操作的Operator,SparkSQL对各种操作定义了各种Operator。

在这里插入图片描述
这些 Operator 组成的语法树就是整个 Catatyst 优化的基础,Catatyst优化器会在这个树上进行分析修改,把树上的节点挪来挪去进行优化。
经过Parser有了抽象语法树,但是并不知道Score,Sum这些东西,所以就需要 Analyer 定位。

Analyzer会把AST上所有Unresolved的东西都转换为Resolved状态,SparkSQL有很多Resolve规则:

  • ResolverRelations:解析表(列)的基本类型信息
  • ResolveFunctions:解析出来函数的基本信息
  • ResolveReferences:解析引用,通常是解析列名

在这里插入图片描述

常见优化逻辑

在这里插入图片描述在这里插入图片描述
这里用到的优化有:谓词下推(Push Down Predicate)、常量折叠(Constant Folding)、字段裁剪(Columning Pruning):
在这里插入图片描述
做完逻辑优化,还需要先转换为物理执行计划,将逻辑上可行的执行计划变为Spark可以真正执行的计划:
在这里插入图片描述
SparkSQL 把逻辑节点转换为了相应的物理节点,比如Join算子,Spark根据不同的场景为该算子制定了不同的算法策略。

数据在一个一个的Plan中流转,然后每个plan里面表达式都会对数据进行处理,就相当于经过了一个个小函数的调用处理,这里面有大量的函数调用开销,可以把这些小函数内联一下,当成一个大函数。可以看到最终执行计划每个节点面前有个*号,说明整段代码生成被启用。

这篇关于大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101527

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo