SparkCore(15):Shuffle原理和优化

2024-05-24 11:38

本文主要是介绍SparkCore(15):Shuffle原理和优化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、总括

Shuffle是进行重新分区的过程,即上游RDD与下游RDD是宽依赖的关系。以下操作可能会引起Shuffle
(1)重新调整分区操作:repartiton,coalesce
(2)*ByKey:groupByKey,reduceByKey
(3)关联操作:join


二、shuffle Manager改进

1-》Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式
2-》1.1版本时参考Hadoop MapReduce的实现开始引入Sort Shuffle
3-》在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用
4-》在1.6中将Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式
http://spark.apache.org/docs/1.6.0/configuration.html

5-》到最近的2.0版本,Hash Shuffle已被删除,所有Shuffle方式全部统一到Sort Shuffle一个实现中。下图是spark shuffle实现的一个版本演进。

三、Shuffle Manager具体分类

1.Spark Shuffle Manager:sort(2.0之后默认)

(1)默认执行过程

-》在该模式下,数据会先写入一个数据结构,reduceByKey 写入 Map,一边通过 Map 局部聚合,一遍写入内存。Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。

-》在溢写磁盘前:先根据 key 进行排序,排序过后的数据,会分批写入到磁盘文件中。默 认批次为 10000 条,数据会以每批一万条写入到磁盘文件。

-》在溢写磁盘过程中:写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个 Task 过程会产生多个临时文件。

-》最后:在每个 Task 中,将所有的临时文件合并,这就是 merge 过程。此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个 Task 的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。

 
(2)bypass SortShuffle

bypass SortShuffle机制触发条件:当task的数量小于200的时候,会自动启动by_pass模式(没有数据排序的操作,就是分区完之后直接写入磁盘)。配置参数:spark.shuffle.sort.bypassMergeThreshold:200(默认是200),执行by_pass过程

此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件,并将数据按 key 进行hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

和hashsort对比:

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁 盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
而该机制与普通 SortShuffleManager 运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省 掉了这部分的性能开销。

2.Spark Shuffle Manager:hash

这里我们先明确一个假设前提:每个 Executor 只有 1 个 CPU core,也就是说,无论这 个 Executor 上分配多少个 task 线程,同一时间都只能执行一个 task 线程。如果Executor有2个以上Cores,则类似,有部分补充,可以参考:https://www.jianshu.com/p/ef45ca960b5d

(1)使用场景

当应用中的数据不需要进行排序的时候(比如reduceByKey,只要分组就好了,不需要排序),可以直接考虑使用hash 

(2)原始的hash执行过程

3 个 Reducer,从 Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce 取模),分类出 3 个不同的类别,每个 Task 都分成 3 种类别的数据,注意:因为每个executor只有一个core,所以executor中的两个task是先后执行的。想把不同的数据汇聚然后计算出最终的结果,所以 Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每 1 个 Task 输出 3 份本地文件,这里有 4 个Mapper Tasks,所以总共输出了 4 个 Tasks x 3 个分类文件 = 12 个本地小文件。
  
即:最初上游maptask1000,下游reducetask1000,shuffle过程有1000000个小文件。所以,面临的问题:一个task就会有多个缓存,然后会有多个文件写出,当task数据量大,linux文件系统崩溃


(3)优化的hash执行过程:

优化的 HashShuffle 过程就是启用合并机制,合并机制就是复用 buffer,开启合并机制的配置是 spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。

这里还是有 4 个 Tasks,数据类别还是分成 3 种类型,因为 Hash 算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过 Task,都会把同样的 Key 放在同一个 Buffer里,然后把 Buffer 中的数据写入以 Core 数量为单位的本地文件中,(一个 Core 只有一种类 型的 Key 的数据),每 1 个 Task 所在的进程中,分别写入共同进程中的 3 份本地文件,这里有 4 个 Mapper Tasks,所以总共输出是 2 个 Cores x 3 个分类文件 = 6 个本地小文件。
 
    

    同一个executor中多个task中的buffer先合并,然后再写出文件。

    优化效果,即为原2个executor,每个2个map task,对应3个reduce task,则总共会有2(executor)*3(reduce task)=6个小文件,而不是2*3*3=18个。
    当使用hash shuffle manager的时候(当分区数比较多的),需要将参数:spark.shuffle.consolidateFiles设置为true,表示开启文件合并功能。见:http://spark.apache.org/docs/1.5.2/configuration.html,在1.6+以后已经没有这个参数,如下图

 

四、Shuffle优化

1. 开启shuffle压缩机制

(1)使用场景:磁盘IO使用很满,则shuffle过程使用压缩,通过cpu资源换取整个应用效率
(2)配置参数:spark.shuffle.compress默认true启动。
 
(3)配置参数:spark.shuffle.spill.compress,如果内存不够用,溢写到磁盘, 

 
(4)默认压缩格式:spark.io.compression.codec,1.6前是snappy,2.0后是lz4.

五、参考

1. Spark Shuffle原理及相关调优
https://blog.csdn.net/u010886217/article/details/83409322

2.Spark Shuffle之Hash Shuffle

https://www.jianshu.com/p/ef45ca960b5d

 

这篇关于SparkCore(15):Shuffle原理和优化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/998306

相关文章

Python如何使用__slots__实现节省内存和性能优化

《Python如何使用__slots__实现节省内存和性能优化》你有想过,一个小小的__slots__能让你的Python类内存消耗直接减半吗,没错,今天咱们要聊的就是这个让人眼前一亮的技巧,感兴趣的... 目录背景:内存吃得满满的类__slots__:你的内存管理小助手举个大概的例子:看看效果如何?1.

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

一文详解SpringBoot响应压缩功能的配置与优化

《一文详解SpringBoot响应压缩功能的配置与优化》SpringBoot的响应压缩功能基于智能协商机制,需同时满足很多条件,本文主要为大家详细介绍了SpringBoot响应压缩功能的配置与优化,需... 目录一、核心工作机制1.1 自动协商触发条件1.2 压缩处理流程二、配置方案详解2.1 基础YAML

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI

MySQL中慢SQL优化的不同方式介绍

《MySQL中慢SQL优化的不同方式介绍》慢SQL的优化,主要从两个方面考虑,SQL语句本身的优化,以及数据库设计的优化,下面小编就来给大家介绍一下有哪些方式可以优化慢SQL吧... 目录避免不必要的列分页优化索引优化JOIN 的优化排序优化UNION 优化慢 SQL 的优化,主要从两个方面考虑,SQL 语

MySQL中慢SQL优化方法的完整指南

《MySQL中慢SQL优化方法的完整指南》当数据库响应时间超过500ms时,系统将面临三大灾难链式反应,所以本文将为大家介绍一下MySQL中慢SQL优化的常用方法,有需要的小伙伴可以了解下... 目录一、慢SQL的致命影响二、精准定位问题SQL1. 启用慢查询日志2. 诊断黄金三件套三、六大核心优化方案方案

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

使用国内镜像源优化pip install下载的方法步骤

《使用国内镜像源优化pipinstall下载的方法步骤》在Python开发中,pip是一个不可或缺的工具,用于安装和管理Python包,然而,由于默认的PyPI服务器位于国外,国内用户在安装依赖时可... 目录引言1. 为什么需要国内镜像源?2. 常用的国内镜像源3. 临时使用国内镜像源4. 永久配置国内镜

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

JAVA封装多线程实现的方式及原理

《JAVA封装多线程实现的方式及原理》:本文主要介绍Java中封装多线程的原理和常见方式,通过封装可以简化多线程的使用,提高安全性,并增强代码的可维护性和可扩展性,需要的朋友可以参考下... 目录前言一、封装的目标二、常见的封装方式及原理总结前言在 Java 中,封装多线程的原理主要围绕着将多线程相关的操