本文主要是介绍【Spark系列8】Spark Shuffle FetchFailedException报错解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前半部分来源:http://blog.csdn.net/lsshlsw/article/details/51213610
后半部分是我的优化方案供大家参考。
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
SparkSQL shuffle操作带来的报错
org.apache.spark.shuffle.MetadataFetchFailedException:
Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException:
Failed to connect to hostname/192.168.xx.xxx:50268
WARN TaskSetManager: Lost task 17.1 in stage 4.1 (TID 1386, spark050013): java.io.FileNotFoundException: /data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/2f/temp_shuffle_e22e013a-5392-4edb-9874-a196a1dad97c
FetchFailed(BlockManagerId(6083b277-119a-49e8-8a49-3539690a2a3f-S155, spark050013, 8533), shuffleId=1, mapId=143, reduceId=3, message=
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/0e/shuffle_1_143_0.data, offset=997061, length=112503}
(笔者按:shuffle的原理可以参考我的另一篇总结:http://blog.csdn.net/zongzhiyuan/article/details/77676662)
下面, 主要从shuffle的数据量和处理shuffle数据的分区数两个角度入手。
1. 减少shuffle数据
思考是否可以使用map side join
或是broadcast join
来规避shuffle的产生。
将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。
2. SparkSQL和DataFrame的join,group by等操作(提供shuffle并发度)通过spark.sql.shuffle.partitions
控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。
3. Rdd的join,groupBy,reduceByKey等操作
通过spark.default.parallelism
控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。
4. 提高executor的内存
通过spark.executor.memory
适当提高executor的memory值
5. 是否存在数据倾斜的问题
空值是否已经过滤?某个key是否可以单独处理?考虑改变数据的分区规则。
以上内容来源于http://blog.csdn.net/lsshlsw/article/details/5121361
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
我遇到的场景:
大数据:17亿条日志
约束:某些字段为空值,不能丢弃日志;200个核,每个核20个G内存,已经无法增加资源。
问题排查:
1. 由于dataframe中取出的字段较多,某些字段是很长的字符串,导致数据量很大。
2. 针对3个字段使用reduceByKey进行多个统计聚合,最后需要转为dataframe进行原数据与统计数据的join,共3次join
3. 在3次join过程中,其中一次join有一个key会发生数据倾斜问题。
解决方案:
1. 将需要做join操作的字段单独提取出来,不需做join并且字段值比较大的字段单独处理,防止每次shuffle都产生无用的大量数据;
2. 在我的场景下,中间的统计结果主要用于后面的规则判断,以筛选出有问题的账号,因此,期间可以做预先过滤,即如果聚合统计的中间结果值本身小于n(后续规则的阈值一定会大于n),则直接丢弃该统计中间结果,不进入后面join的shuffle阶段,以进一步减少数据量;
3. 针对某个join的key出现数据倾斜的问题,将原始表分为3份,使用randomSpilt操作符,针对每个小部分原始表做3次join,最后将3个结果进行unionAll关联操作。
经过以上3步,我的问题已经得到解决。当然,解决方案根据场景和每个人的习惯不同会有很多。其他解决数据倾斜的方案可以参考我另外的总结:http://blog.csdn.net/zongzhiyuan/article/details/77676614
这篇关于【Spark系列8】Spark Shuffle FetchFailedException报错解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!