spark shuffle的演进过程

2024-06-13 12:08
文章标签 过程 spark 演进 shuffle

本文主要是介绍spark shuffle的演进过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

spark各版本shuffle的变化

  • Spark 0.8及以前 Hash Based Shuffle
  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
  • Spark 1.4 引入Tungsten-Sort Based Shuffle
  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle退出历史舞台

接下来详细研究版本演化的驱动因素

Hash Based Shuffle

最开始的时候使用的是 Hash Based Shuffle, 这时候每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是MR ,其中M是MapTask的个数,R是ReduceTask的个数。这样会产生大量的小文件,对文件系统压力很大,而且也不利于IO吞吐量。如下图
在这里插入图片描述
后面做了优化,把在同一个core上运行的多个Mapper task 输出合并到同一个文件,这样文件数目就变成了 cores
R 个了
在这里插入图片描述

Sort Based Shuffle

经过FileConsolidation之后,同一个core上会产出

SortShuffleManager代码解析
override def registerShuffle[K, V, C](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't// need map-side aggregation, then write numPartitions files directly and just concatenate// them at the end. This avoids doing serialization and deserialization twice to merge// together the spilled files, which would happen with the normal code path. The downside is// having multiple files open at a time and thus more memory allocated to buffers.new BypassMergeSortShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:new SerializedShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else {// Otherwise, buffer map outputs in a deserialized form:new BaseShuffleHandle(shuffleId, numMaps, dependency)}}

在这里插入图片描述

SortShuffleWriter代码解析

SortShuffleWriter使用ExternalSorter,write的方法接收的参数为Iterator[Product2[K, V]],一个KEY、VALUE的集合,经过ExternalSorter排序之后,向

/** Write a bunch of records to this task's output */override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is sortByKey.new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}}

这篇关于spark shuffle的演进过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1057262

相关文章

java中使用POI生成Excel并导出过程

《java中使用POI生成Excel并导出过程》:本文主要介绍java中使用POI生成Excel并导出过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录需求说明及实现方式需求完成通用代码版本1版本2结果展示type参数为atype参数为b总结注:本文章中代码均为

SpringCloud之LoadBalancer负载均衡服务调用过程

《SpringCloud之LoadBalancer负载均衡服务调用过程》:本文主要介绍SpringCloud之LoadBalancer负载均衡服务调用过程,具有很好的参考价值,希望对大家有所帮助,... 目录前言一、LoadBalancer是什么?二、使用步骤1、启动consul2、客户端加入依赖3、以服务

Oracle存储过程里操作BLOB的字节数据的办法

《Oracle存储过程里操作BLOB的字节数据的办法》该篇文章介绍了如何在Oracle存储过程中操作BLOB的字节数据,作者研究了如何获取BLOB的字节长度、如何使用DBMS_LOB包进行BLOB操作... 目录一、缘由二、办法2.1 基本操作2.2 DBMS_LOB包2.3 字节级操作与RAW数据类型2.

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

Spring Security注解方式权限控制过程

《SpringSecurity注解方式权限控制过程》:本文主要介绍SpringSecurity注解方式权限控制过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、摘要二、实现步骤2.1 在配置类中添加权限注解的支持2.2 创建Controller类2.3 Us

Spring AI集成DeepSeek三步搞定Java智能应用的详细过程

《SpringAI集成DeepSeek三步搞定Java智能应用的详细过程》本文介绍了如何使用SpringAI集成DeepSeek,一个国内顶尖的多模态大模型,SpringAI提供了一套统一的接口,简... 目录DeepSeek 介绍Spring AI 是什么?Spring AI 的主要功能包括1、环境准备2

SpringBoot集成图片验证码框架easy-captcha的详细过程

《SpringBoot集成图片验证码框架easy-captcha的详细过程》本文介绍了如何将Easy-Captcha框架集成到SpringBoot项目中,实现图片验证码功能,Easy-Captcha是... 目录SpringBoot集成图片验证码框架easy-captcha一、引言二、依赖三、代码1. Ea

pycharm远程连接服务器运行pytorch的过程详解

《pycharm远程连接服务器运行pytorch的过程详解》:本文主要介绍在Linux环境下使用Anaconda管理不同版本的Python环境,并通过PyCharm远程连接服务器来运行PyTorc... 目录linux部署pytorch背景介绍Anaconda安装Linux安装pytorch虚拟环境安装cu

SpringBoot项目注入 traceId 追踪整个请求的日志链路(过程详解)

《SpringBoot项目注入traceId追踪整个请求的日志链路(过程详解)》本文介绍了如何在单体SpringBoot项目中通过手动实现过滤器或拦截器来注入traceId,以追踪整个请求的日志链... SpringBoot项目注入 traceId 来追踪整个请求的日志链路,有了 traceId, 我们在排

Spring Boot 3 整合 Spring Cloud Gateway实践过程

《SpringBoot3整合SpringCloudGateway实践过程》本文介绍了如何使用SpringCloudAlibaba2023.0.0.0版本构建一个微服务网关,包括统一路由、限... 目录引子为什么需要微服务网关实践1.统一路由2.限流防刷3.登录鉴权小结引子当前微服务架构已成为中大型系统的标