Spark中多分区写文件前可以不排序么

2024-02-16 13:52
文章标签 排序 分区 spark 中多

本文主要是介绍Spark中多分区写文件前可以不排序么,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

Spark 3.5.0
目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。

分析

这部分主要分为三个部分:
一个是V1Writes规则的重改;
另一个是FileFormatWriter中的dataWriter的选择;
还有一个是Spark中为什么会加上Sort
这三部分是需要结合在一起分析讨论的

V1Writes规则的重改

直接转到代码部分:

object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan = {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>val newQuery = prepareQuery(write, write.query)val attrMap = AttributeMap(write.query.output.zip(newQuery.output))val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild = writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}

其中 prepareQuery是对满足条件的计划前加上Sort逻辑排序,其中prepareQuery关键的代码如下:

    val requiredOrdering = write.requiredOrdering.map(_.transform {case a: Attribute => attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering = empty2NullPlan.outputOrderingval orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global = false, empty2NullPlan)}

write.requiredOrdering中涉及到的类为InsertIntoHadoopFsRelationCommandInsertIntoHiveTable,且这两个物理计划中的requiredOrdering实现都是:

V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)

getSortOrder方法关键代码如下:

    val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns).map(SortOrder(_, Ascending))}

所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则会加上Sort逻辑计划,具体的实现可以参考SPARK-37287
如果spark.sql.maxConcurrentOutputFileWriters为0(默认值为0)且 sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则不会加上Sort逻辑计划

FileFormatWriter 中的dataWriter的选择

InsertIntoHadoopFsRelationCommandInsertIntoHiveTable 这两个物理计划中,最终写入文件/数据的时候,会调用到FileFormatWriter.write方法,这里有个concurrentOutputWriterSpecFunc函数变量的设置:

      val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec = WriteFilesSpec(description = description,committer = committer,concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)

设置concurrentOutputWriterSpecFunc的代码如下:

  private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = {val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))} else {None}}

如果 spark.sql.maxConcurrentOutputFileWriters为0(默认值为0),则ConcurrentOutputWriterSpec为None
如果 spark.sql.maxConcurrentOutputFileWriters大于0sortColumns为空(大部分情况下为空,除非建表是partition加上bucket),则为Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())

其中executeWrite会调用WriteFilesExec.doExecuteWrite方法,从而调用FileFormatWriter.executeTask,这里就涉及到dataWriter选择:

    val dataWriter =if (sparkPartitionId != 0 && !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) =>new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ =>new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}

这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter,默认情况下为DynamicPartitionDataSingleWriter
否则就会为DynamicPartitionDataConcurrentWriter
这两者的区别,见下文

Spark中为什么会加上Sort

至于Spark在写入文件的时候会加上Sort,这个是跟写入的实现有关的,也就是DynamicPartitionDataSingleWriterDynamicPartitionDataConcurrentWriter的区别:

  • DynamicPartitionDataSingleWriter 在任何时刻,只有一个writer在写文件,这能保证写入的稳定性,不会在写入文件的时候消耗大量的内存,但是速度会慢
  • DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件,能加快写入文件的速度,但是因为多个文件的同时写入,可能会导致OOM

对于DynamicPartitionDataSingleWriter 会根据partition或者bucket作为最细粒度来作为writer的标准,如果相邻的两条记录所属不同的partition或者bucket,则会切换writer,所以说如果不根据partition或者bucket排序的话,会导致writer频繁的切换,这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。

参考

  1. [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter
  2. [SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort

这篇关于Spark中多分区写文件前可以不排序么的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/714721

相关文章

【数据结构】——原来排序算法搞懂这些就行,轻松拿捏

前言:快速排序的实现最重要的是找基准值,下面让我们来了解如何实现找基准值 基准值的注释:在快排的过程中,每一次我们要取一个元素作为枢纽值,以这个数字来将序列划分为两部分。 在此我们采用三数取中法,也就是取左端、中间、右端三个数,然后进行排序,将中间数作为枢纽值。 快速排序实现主框架: //快速排序 void QuickSort(int* arr, int left, int rig

usaco 1.3 Mixing Milk (结构体排序 qsort) and hdu 2020(sort)

到了这题学会了结构体排序 于是回去修改了 1.2 milking cows 的算法~ 结构体排序核心: 1.结构体定义 struct Milk{int price;int milks;}milk[5000]; 2.自定义的比较函数,若返回值为正,qsort 函数判定a>b ;为负,a<b;为0,a==b; int milkcmp(const void *va,c

hdu 1285(拓扑排序)

题意: 给各个队间的胜负关系,让排名次,名词相同按从小到大排。 解析: 拓扑排序是应用于有向无回路图(Direct Acyclic Graph,简称DAG)上的一种排序方式,对一个有向无回路图进行拓扑排序后,所有的顶点形成一个序列,对所有边(u,v),满足u 在v 的前面。该序列说明了顶点表示的事件或状态发生的整体顺序。比较经典的是在工程活动上,某些工程完成后,另一些工程才能继续,此时

《数据结构(C语言版)第二版》第八章-排序(8.3-交换排序、8.4-选择排序)

8.3 交换排序 8.3.1 冒泡排序 【算法特点】 (1) 稳定排序。 (2) 可用于链式存储结构。 (3) 移动记录次数较多,算法平均时间性能比直接插入排序差。当初始记录无序,n较大时, 此算法不宜采用。 #include <stdio.h>#include <stdlib.h>#define MAXSIZE 26typedef int KeyType;typedef char In

【软考】希尔排序算法分析

目录 1. c代码2. 运行截图3. 运行解析 1. c代码 #include <stdio.h>#include <stdlib.h> void shellSort(int data[], int n){// 划分的数组,例如8个数则为[4, 2, 1]int *delta;int k;// i控制delta的轮次int i;// 临时变量,换值int temp;in

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

学习记录:js算法(二十八):删除排序链表中的重复元素、删除排序链表中的重复元素II

文章目录 删除排序链表中的重复元素我的思路解法一:循环解法二:递归 网上思路 删除排序链表中的重复元素 II我的思路网上思路 总结 删除排序链表中的重复元素 给定一个已排序的链表的头 head , 删除所有重复的元素,使每个元素只出现一次 。返回 已排序的链表 。 图一 图二 示例 1:(图一)输入:head = [1,1,2]输出:[1,2]示例 2:(图

鸡尾酒排序算法

目录 引言 一、概念 二、算法思想 三、图例解释 1.采用冒泡排序:   2.采用鸡尾酒排序:  3.对比总结 四、算法实现  1.代码实现  2.运行结果 3.代码解释   五、总结 引言 鸡尾酒排序(Cocktail Sort),也被称为双向冒泡排序,是一种改进的冒泡排序算法。它在冒泡排序的基础上进行了优化,通过双向遍历来减少排序时间。今天我们将学习如何在C

快速排序(java代码实现)

简介: 1.采用“分治”的思想,对于一组数据,选择一个基准元素,这里选择中间元素mid 2.通过第一轮扫描,比mid小的元素都在mid左边,比mid大的元素都在mid右边 3.然后使用递归排序这两部分,直到序列中所有数据均有序为止。 public class csdnTest {public static void main(String[] args){int[] arr = {3,

O(n)时间内对[0..n^-1]之间的n个数排序

题目 如何在O(n)时间内,对0到n^2-1之间的n个整数进行排序 思路 把整数转换为n进制再排序,每个数有两位,每位的取值范围是[0..n-1],再进行基数排序 代码 #include <iostream>#include <cmath>using namespace std;int n, radix, length_A, digit = 2;void Print(int *A,