spark shuffle的演进过程

2024-06-13 12:08
文章标签 过程 spark 演进 shuffle

本文主要是介绍spark shuffle的演进过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

spark各版本shuffle的变化

  • Spark 0.8及以前 Hash Based Shuffle
  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
  • Spark 1.4 引入Tungsten-Sort Based Shuffle
  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle退出历史舞台

接下来详细研究版本演化的驱动因素

Hash Based Shuffle

最开始的时候使用的是 Hash Based Shuffle, 这时候每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是MR ,其中M是MapTask的个数,R是ReduceTask的个数。这样会产生大量的小文件,对文件系统压力很大,而且也不利于IO吞吐量。如下图
在这里插入图片描述
后面做了优化,把在同一个core上运行的多个Mapper task 输出合并到同一个文件,这样文件数目就变成了 cores
R 个了
在这里插入图片描述

Sort Based Shuffle

经过FileConsolidation之后,同一个core上会产出

SortShuffleManager代码解析
override def registerShuffle[K, V, C](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't// need map-side aggregation, then write numPartitions files directly and just concatenate// them at the end. This avoids doing serialization and deserialization twice to merge// together the spilled files, which would happen with the normal code path. The downside is// having multiple files open at a time and thus more memory allocated to buffers.new BypassMergeSortShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:new SerializedShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else {// Otherwise, buffer map outputs in a deserialized form:new BaseShuffleHandle(shuffleId, numMaps, dependency)}}

在这里插入图片描述

SortShuffleWriter代码解析

SortShuffleWriter使用ExternalSorter,write的方法接收的参数为Iterator[Product2[K, V]],一个KEY、VALUE的集合,经过ExternalSorter排序之后,向

/** Write a bunch of records to this task's output */override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is sortByKey.new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}}

这篇关于spark shuffle的演进过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1057262

相关文章

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

redis群集简单部署过程

《redis群集简单部署过程》文章介绍了Redis,一个高性能的键值存储系统,其支持多种数据结构和命令,它还讨论了Redis的服务器端架构、数据存储和获取、协议和命令、高可用性方案、缓存机制以及监控和... 目录Redis介绍1. 基本概念2. 服务器端3. 存储和获取数据4. 协议和命令5. 高可用性6.

PLsql Oracle 下载安装图文过程详解

《PLsqlOracle下载安装图文过程详解》PL/SQLDeveloper是一款用于开发Oracle数据库的集成开发环境,可以通过官网下载安装配置,并通过配置tnsnames.ora文件及环境变... 目录一、PL/SQL Developer 简介二、PL/SQL Developer 安装及配置详解1.下

在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程

《在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程》本文介绍了在Java中使用ModelMapper库简化Shapefile属性转JavaBean的过程,对比... 目录前言一、原始的处理办法1、使用Set方法来转换2、使用构造方法转换二、基于ModelMapper

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat

本地搭建DeepSeek-R1、WebUI的完整过程及访问

《本地搭建DeepSeek-R1、WebUI的完整过程及访问》:本文主要介绍本地搭建DeepSeek-R1、WebUI的完整过程及访问的相关资料,DeepSeek-R1是一个开源的人工智能平台,主... 目录背景       搭建准备基础概念搭建过程访问对话测试总结背景       最近几年,人工智能技术

Linux部署jar包过程

《Linux部署jar包过程》文章介绍了在Linux系统上部署Java(jar)包时需要注意的几个关键点,包括统一JDK版本、添加打包插件、修改数据库密码以及正确执行jar包的方法... 目录linux部署jar包1.统一jdk版本2.打包插件依赖3.修改密码4.执行jar包总结Linux部署jar包部署

SpringBoot 整合 Grizzly的过程

《SpringBoot整合Grizzly的过程》Grizzly是一个高性能的、异步的、非阻塞的HTTP服务器框架,它可以与SpringBoot一起提供比传统的Tomcat或Jet... 目录为什么选择 Grizzly?Spring Boot + Grizzly 整合的优势添加依赖自定义 Grizzly 作为

mysql-8.0.30压缩包版安装和配置MySQL环境过程

《mysql-8.0.30压缩包版安装和配置MySQL环境过程》该文章介绍了如何在Windows系统中下载、安装和配置MySQL数据库,包括下载地址、解压文件、创建和配置my.ini文件、设置环境变量... 目录压缩包安装配置下载配置环境变量下载和初始化总结压缩包安装配置下载下载地址:https://d

springboot整合gateway的详细过程

《springboot整合gateway的详细过程》本文介绍了如何配置和使用SpringCloudGateway构建一个API网关,通过实例代码介绍了springboot整合gateway的过程,需要... 目录1. 添加依赖2. 配置网关路由3. 启用Eureka客户端(可选)4. 创建主应用类5. 自定