Spark | SparkSql Insert Overwrite 小文件过多

2023-10-28 16:58

本文主要是介绍Spark | SparkSql Insert Overwrite 小文件过多,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

SparkSql在执行Hive Insert Overwrite Table 操作时 ,默认文件生成数和表文件存储的个数有关,但一般上游表存储个数并非下游能控制的,这样的话得考虑处理小文件问题。

小文件产生原因:
spark.sql.shuffle.partitions=200 ,spark sql默认shuffle分区是200个,如果数据量比较小时,写hdfs时会产生200个小文件。

可尝试通过以下操作来解决小文件过多问题,如下

Shuffle Partition 

--conf spark.sql.shuffle.partitions=10 \
--conf spark.default.parallelism=10 \

目前SparkSql中reduce阶段的task个数取决于固定参数 spark.sql.shuffle.partition(默认值 200),注意,若一个作业一旦设置了该参数,它运行过程中的所有阶段的reduce个数都是同一个值。

而对于不同的作业,以及同一个作业内的不同reduce阶段,实际的数据量大小可能相差很大,例如reduce阶段要处理的数据可能是10MB,也有可能是100GB, 如果使用同一个值对实际运行效率会产生很大影响,例如10MB的数据一个task就可以解决,如果 spark.sql.shuffle.partition使用默认值200的话,那么10MB的数据就要被分成200个task处理,增加了调度开销,影响运行效率。

而spark.default.parallelism 该参数用于设置每个stage的默认task数量。

值得注意的是,

spark.default.parallelism只有在处理RDD时才会起作用,对SparkSql无效。

spark.sql.shuffle.partitions则是对sparks SQL专用的设置。

 

coalesce & repartition

rdd.toDF("name", "age", "address").coalesce(3).createOrReplaceTempView("tmp_table")
spark.sql("insert overwrite table t1 select name,age,address from tmp_table")df.repartition(10).createOrReplaceTempView("tmp_table")
spark.sql("insert overwrite table t1 select name,age,address from tmp_table")

coalesce 和 repartition 都是对 RDD 重新分区。 coalesce 操作使用HashPartitioner 进行重分区,第一个参数为重分区的数目,第二个为是否进行 shuffle,默认情况下是 false。
repartition 操作是 coalesce 函数第二个参数为 true 的实现。
使用 coalesce 操作 RDD 时需注意:
1)重新分区的数目需要小于原分区数据。
2)重新分区数目大于原来的分区数,那么必须制定 shuffle 参数为 true,否则分区数不变。

 

Distribute by 方式

distribute by是控制在map端如何拆分数据给reduce端的。hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。大部分情况都用于解决Map输出的文件大小不均,Reduce输出文件大小不均,小文件过多,文件数过大等情况.

distribute by 控制map输出结果的分发,相同字段的map输出会发到一个reduce节点去处理;sort by为每一个reducer产生一个排序文件。cluster by = distribute by + sort by,默认只能是升序。

 select name,age,address from tmp_table Distribute by name;

 

SparkSql自适应调整

 --conf spark.sql.adaptive.enabled=true \--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128000000 \

SparkSQL自适应框架可以通过设置shuffle partition的上下限区间,在这个区间内对不同作业不同阶段的reduce个数进行动态调整。通过区间的设置,一方面可以大大减少调优的成本(不需要找到一个固定值),另一方面同一个作业内部不同reduce阶段的reduce个数也能动态调整。

属性名称默认值备注
spark.sql.adaptive.enabledfalse自适应执行框架的开关。
spark.sql.adaptive.minNumPostShufflePartitions1reduce个数区间最小值。
spark.sql.adaptive.maxNumPostShufflePartitions500reduce个数区间最大值。
spark.sql.adaptive.shuffle.targetPostShuffleInputSize67108864动态调整reduce个数的 partition 大小依据,如设置64MB,则reduce 阶段每个task最少处理 64MB的数据。
spark.sql.adaptive.shuffle.targetPostShuffleRowCount20000000动态调整reduce个数的partition条数依据,如设置20000000则reduce阶段每个task最少处理 20000000条的数据。

 

数据倾斜

Join中会经常碰到数据倾斜的场景,导致某些task处理的数据过多,出现很严重的长尾。目前SparkSQL没有对倾斜的数据进行相关的优化处理。

SparkSQL自适应框架可以根据预先的配置在作业运行过程中自动检测是否出现倾斜,并对检测到的倾斜进行优化处理。

优化的主要逻辑是对倾斜的partition进行拆分由多个task来进行处理,最后通过union进行结果合并。

支持的 Join 类型:

join类型备注
Inner左/右表均可处理倾斜。
Cross左/右表均可处理倾斜。
LeftSemi只对左表处理倾斜。
LeftAnti只对左表处理倾斜。
LeftOuter只对左表处理倾斜。
RightOuter只对右表处理倾斜。

参数设置:

spark.sql.adaptive.enabledfalse自适应执行框架的开关。
spark.sql.adaptive.skewedJoin.enabledfalse倾斜处理开关。
spark.sql.adaptive.skewedPartitionFactor10当一个 partition的size大小或大于该值(所有parititon大小的中位数) 且大于spark.sql.adaptive.skewedPartitionSizeThreshold,或者parition的条数大于该值(所有parititon条数的中位数)且大于 spark.sql.adaptive.skewedPartitionRowCountThreshold, 才会被当做倾斜的partition进行相应的处理。
spark.sql.adaptive.skewedPartitionSizeThreshold67108864倾斜的partition大小不能小于该值。
spark.sql.adaptive.skewedPartitionRowCountThreshold10000000倾斜的partition条数不能小于该值。

 

Runtime执行计划优化

SparkSQL的Catalyst优化器会将sql语句转换成物理执行计划,然后真正运行物理执行计划。但是Catalyst转换物理执行计划的过程中,由于缺少Statistics统计信息,或者Statistics统计信息不准等原因,实际转换的物理执行计划可能并不是最优的,例如转换为SortMergeJoinExec,但实际BroadcastJoin更合适。

SparkSQL自适应执行框架会在物理执行计划真正运行的过程中,动态的根据shuffle阶段shuffle write的实际数据大小,来调整是否可以用 BroadcastJoin来代替SortMergeJoin,提高运行效率。

参数:

属性名称默认值备注
spark.sql.adaptive.enabledfalse自适应执行框架的开关。
spark.sql.adaptive.join.enabledtrue开关。
spark.sql.adaptiveBroadcastJoinThreshold等于spark.sql.autoBroadcastJoinThreshold

运行过程中用于判断是否满足BroadcastJoin 条件

 


关于SparkSql 自适应调整最近官方正式发布了Spark3.0,新特性中有涉及到SparkSql自适应查询执行计划的优化。

更多学习:

SparkSql 自适应: https://my.oschina.net/hblt147/blog/3006406

Spark官网:http://spark.apache.org/docs/latest/configuration.html

Spark3.0: https://www.iteblog.com/archives/9754.html

 

这篇关于Spark | SparkSql Insert Overwrite 小文件过多的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/294622

相关文章

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

linux 下Time_wait过多问题解决

转自:http://blog.csdn.net/jaylong35/article/details/6605077 问题起因: 自己开发了一个服务器和客户端,通过短连接的方式来进行通讯,由于过于频繁的创建连接,导致系统连接数量被占用,不能及时释放。看了一下18888,当时吓到了。 现象: 1、外部机器不能正常连接SSH 2、内向外不能够正常的ping通过,域名也不能正常解析。

Hibernate插入数据时,报错:org.springframework.dao.DataIntegrityViolationException: could not insert: [cn.itc

在用junit测试:插入数据时,报一下错误: 错误原因: package junit;import org.junit.Test;import cn.itcast.crm.container.ServiceProvinder;import cn.itcast.crm.dao.ISysUserDao;import cn.itcast.crm.domain.SysRole;

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

Spark数据介绍

从趋势上看,DataFrame 和 Dataset 更加流行。 示例场景 数据仓库和 BI 工具集成: 如果你需要处理存储在数据仓库中的结构化数据,并且希望与 BI 工具集成,那么 DataFrame 和 Dataset 是首选。 机器学习流水线: 在构建机器学习流水线时,使用 DataFrame 和 Dataset 可以更好地管理数据流,并且可以方便地与 MLlib 集成。 实时数据处理:

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop

Spark-在集群上运行Spark

Spark-在集群上运行Spark

Spark—数据读取和保存

Spark—数据读取和保存