本文主要是介绍Spark中多分区写文件前可以不排序么,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
背景
Spark 3.5.0
目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0
来避免排序。
分析
这部分主要分为三个部分:
一个是V1Writes规则的重改
;
另一个是FileFormatWriter中的dataWriter的选择
;
还有一个是Spark中为什么会加上Sort
这三部分是需要结合在一起分析讨论的
V1Writes规则的重改
直接转到代码部分:
object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan = {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>val newQuery = prepareQuery(write, write.query)val attrMap = AttributeMap(write.query.output.zip(newQuery.output))val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild = writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) =>a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}
其中 prepareQuery
是对满足条件的计划前加上Sort
逻辑排序,其中prepareQuery关键的代码
如下:
val requiredOrdering = write.requiredOrdering.map(_.transform {case a: Attribute => attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering = empty2NullPlan.outputOrderingval orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global = false, empty2NullPlan)}
write.requiredOrdering
中涉及到的类为InsertIntoHadoopFsRelationCommand
和InsertIntoHiveTable
,且这两个物理计划中的requiredOrdering
实现都是:
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)
getSortOrder
方法关键代码如下:
val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns).map(SortOrder(_, Ascending))}
所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0
(默认值为0),则会加上Sort逻辑计划
,具体的实现可以参考SPARK-37287
如果spark.sql.maxConcurrentOutputFileWriters为0
(默认值为0)且 sortColumns为空
(大部分情况下为空,除非建表是partition加上bucket),则不会加上Sort逻辑计划
FileFormatWriter 中的dataWriter的选择
在 InsertIntoHadoopFsRelationCommand
和InsertIntoHiveTable
这两个物理计划中,最终写入文件/数据的时候,会调用到FileFormatWriter.write
方法,这里有个concurrentOutputWriterSpecFunc
函数变量的设置:
val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec = WriteFilesSpec(description = description,committer = committer,concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)
设置concurrentOutputWriterSpecFunc
的代码如下:
private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = {val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))} else {None}}
如果 spark.sql.maxConcurrentOutputFileWriters为0
(默认值为0),则ConcurrentOutputWriterSpec
为None
如果 spark.sql.maxConcurrentOutputFileWriters大于0
且 sortColumns为空
(大部分情况下为空,除非建表是partition加上bucket),则为Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())
其中executeWrite
会调用WriteFilesExec.doExecuteWrite
方法,从而调用FileFormatWriter.executeTask
,这里就涉及到dataWriter
选择:
val dataWriter =if (sparkPartitionId != 0 && !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) =>new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ =>new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}
这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter
,默认情况下为DynamicPartitionDataSingleWriter
否则就会为DynamicPartitionDataConcurrentWriter
这两者的区别,见下文
Spark中为什么会加上Sort
至于Spark在写入文件的时候会加上Sort,这个是跟写入的实现有关的,也就是DynamicPartitionDataSingleWriter
和DynamicPartitionDataConcurrentWriter
的区别:
- DynamicPartitionDataSingleWriter 在任何时刻,只有一个writer在写文件,这能保证写入的稳定性,不会在写入文件的时候消耗大量的内存,但是速度会慢
- DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件,能加快写入文件的速度,但是因为多个文件的同时写入,可能会导致OOM
对于DynamicPartitionDataSingleWriter
会根据partition或者bucket作为最细粒度来作为writer的标准,如果相邻的两条记录所属不同的partition或者bucket,则会切换writer,所以说如果不根据partition或者bucket排序的话,会导致writer
频繁的切换,这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。
参考
- [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter
- [SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort
这篇关于Spark中多分区写文件前可以不排序么的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!