本文主要是介绍HIVE及SparkSQL优化经验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
简介
针对高耗跑批时间长的作业,在公司近3个月做过一个优化专项;优化成效:综合cpu、内存、跑批耗时减少均在65%以上;
cpu和内存消耗指的是:vcoreseconds和memoryseconds
这里简单说下优化的一些思路,至于优化细节之前写过两篇,或可参考::
sparksql优化:https://blog.csdn.net/me_to_007/article/details/130916946
hive优化:https://blog.csdn.net/me_to_007/article/details/126921955
运行环境:yarn调度+hdfs存储(orc格式表)+hive2+spark2.4.3
大家有什么经验,也欢迎补充、指正
什么样的任务值得关注
一个作业运行时间很长就值得关注吗?不一定,如果一个用户把n段逻辑全都整到一个作业里,不是太长,跑批时耗在数仓规范范围内,可能也是可接受合理的;
这里将时间界限监控到stage层面,对应hive就是每一个job,在sparksql里就是每一个stage;一个作业跑批时间很长可能是合理的,也许是逻辑复杂由很多个stage构成;但是一个stage跑批时间很长,在分布式计算里,一般不正常;
hive的stage可以只有map逻辑,也可以是一段mr、union、limit语句;sparksql的stage则根据有无shuffle划分,遇到shuffle宽依赖则新划分一个stage;
这个时间界限可以根据经验及要求灵活设置,比如要尽可能地提升作业跑批时效和节省结算资源,那这个界限可以设置小点,比如一个小时;一般几十亿的数据join,没有倾斜、发散,一个小时也够了;
另外一方面是监控内存配置,hive里边的map和reduce内存配置,sparksql里的executor内置配置,这些参数都不应该设置过大;每个task的内存界限可以设置到4个g
hive里边map和reduce内存是单独设置的,sparksql单个task的内存约等于:单个executor内存/每个executor cpu核数 * 每个task跑批cpu核数(这个参数默认是1,一般不做额外设置)
hive-map内存参数:mapreduce.map.memory.mb
hive-reduce内存参数:mapreduce.reduce.memory.mb
sparksql-executor内存参数:spark.executor.memory
sparksql-executor核数:spark.executor.cores
sparksql-每task-cpu核数:spark.task.cpus
该参数默认为1,一般无需额外设置;
spark-driver的内存,如果有广播和cahe缓存,需要配置大些,可根据实际情况设置;
配置自动监控
这里可以根据yarn日志,编写定时调度脚本,解析作业日志,将符合异常规则的作业发邮件出来筛查(也可配置短信接口通知作业属主整改);
这里可以手工配置一个白名单,作业如果已经审查无太大优化空间,则加入白名单避开筛查;当然这个白名单也可以设置特定的规则,比如把时间和资源界限的上限配大点,而不是无限放大,避免白名单后界限无限放大;
内存监控则直接读取作业配置参数即可,如触发条件则发邮件到相关团队优化(在我们这是数据治理组)
耗时长有两种情况,一种的倾斜导致的,这种就解决倾斜;另外一种每个task处理数据量都很大,这类情况一般把task数量调大点,再多给点资源;日志解析也比较好处理,倾斜监控作业各个stage中最大task耗时相比一般水平差异,这个界限也可以按需配置;比如我们这里设定为最大task跑批时长-上四分位时长 > 3倍四分差
数据倾斜是一个相对的概念,准确地说,每个作业都有倾斜,但对于一些跑批时间比较短的作业,至于整个数仓盘面来说,优化提升不大;这里可以在监控里筛跳过;
当然至于个别业务团队,视重要性个别对待,有的作业对时效要求比较高,比如涉及到重大活动数据,那就做精细化优化;
考虑到yarn日志文件比较大,解析耗时,集群作业比较多,解析日志应有筛选地进行;那些跑批时间相对短的作业或可暂时搁置;同时配置白名单,防止日志重复读取解析;
怎么去定位问题SQL段
主要是结合执行计划定位,在hive里面执行计划map-tree里显示的表名,如果sql中有写别名,则显示的是别名;为防止筛查混乱,难以定位,sql中的表别名尽量不要使用相同的,没有关联逻辑情况,尽量不要使用别名;
比如下面一段sql:
select group_name,count(1) as cnt
from tablename -- 这里不写表别名
group by group_name
在hive中,直接使用explain + sql字符串
打印执行计划即可;看看日志里哪个stage异常,再根据执行计划去定位异常sql段落,进而进行作业分析;
explain select dt,count(1) as cnt from tablename group by dt
hive中如果有配置join倾斜参数(set hive.optimize.skewjoin=true
)或者groupby倾斜参数(set hive.groupby.skewindata=true
),如果作业stage倾斜会额外生成一个stage,这里在执行计划里是看不到的,这个stage紧跟运行在原来的stage后面;
这里需要注意的是,调度环境和我们查看执行计划的环境,引擎配置可能会不一样,我们通过执行计划查看的stage号可能跟实际作业跑批不一致;这里尽量在同一环境下去查看执行计划;
一方面,我们也可以通过yarn里map日志的syslog日志去查看map读取的表名,然后根据使用的表名大概定位到sql段(如果当前stage的数据源都是来自前依赖stage reduce的output则日志里看到的是临时文件名而不是表名)
sparksql,同样的,查看sparkui中stage对应的执行计划(stage二级页面),spark2.4.3中stage的执行计划不是很详细,可以结合数据量,算子然后在sparkui全局执行计划里去找,定位sql语句段;
如果是作业跑批失败,在执行计划中体现为上游依赖stage全跑完了,而这个stage没有打印执行数据(sparksql执行计划每个stage跑完都会显示跑批时间及输出数据)
大概的一些优化方向
-
优化sql
每一个作业就好比一个成型的积木,而成型的积木可由不同积木块不同方式拼凑而成,条条大路通罗马;使用怎样的积木块(表)以及怎样的拼凑方式是我们一个优化介入点;比如join顺序,使用不同的表,先聚合在join等(视具体场景,具体处理) -
增加stage的task个数
这类体现为某个stage跑批时间很长但是又没有倾斜,可以适当增加stage的reduce个数;这类现象一般出现在多维聚合、countdistinct数据膨胀,多个表关联包括开窗用的是一个mr(表关联join字段和开窗partitionby字段是同一个字段,计算会优化收拢到一个mr里,具体看执行计划),join数据发散多对多;
题外:数据量大,一定要reduce个数设置大些吗?
不一定,如果存在map预聚合,且预聚合能显著减少数据量的,有时候恰恰要手动把reduce配置少点;在hive里可以指定reduce个数mapred.reduce.tasks
,亦可结合每reduce处理数据量hive.exec.reducers.bytes.per.reducer
+hive.exec.reducers.max
动态配置reduce个数(但这个数据量是map逻辑前的数据量,如果有filter筛选,需适当增加.bytes.per.reducer参数,减少reduce个数);sparksql则直接设置spark.sql.shuffle.partitions
个数和动态分区参数spark.sql.adaptive.enabled
即可;
有的跑批时间长可能是自定义函数优化做的不到位,这种情况优先使用内置函数或者找相关开发优化自定义函数;字符串处理能不正则的尽量不要用正则 -
解决倾斜
如果遇到倾斜,则按常规倾斜处理即可;在具体实践中,我们发现hive倾斜参数机制,优化效果不一定比手动处理要好;比如groupby倾斜参数set hive.groupby.skewindata=true
大多时候并不比手工处理要好,这里可以手动打散试试(就是麻烦点,如果逻辑复杂,改起来会很耗时);hive的join的倾斜参数不适用于外连接,这里需要注意下;此外mapjoin不适用于外连接主表是小表的情况(比如小表 left join 大表) -
开启并行
这里主要是针对于hive计算,如果存在stage没有依赖(查看执行计划),可以开启并行执行参数set hive.exec.parallel=true
,无依赖的stage可以并行执行,比如union all上下逻辑子句,join中嵌套的子句等;
sparksql可以通过堆资源来减少跑批时间,但一般不建议设置的总cpu核数大于并发task数量的1/2;官方推荐是task的数据大于cpu总核数的三倍。比如有300个task,那cpu总核数尽量设置在150以下。
在数据体量很大时,我们发现sparksql的map stage耗时比较长,这里可以根据实际情况切换合适的计算引擎;
此外,基于整个作业链路去优化目标作业跑批时效,要基于关键链路去优化,非关键链路作业优化对结果作业产出时间没有影响;
countdistinct,每一个distinct数据都会膨胀一倍;如果countdistinct数量过多,但去重的都是同一个字段,只是条件不同时,改写下sql,shuffle数据量可以显著减少;
比如(假设flag是一个标签字段,字段值只有0和1):
selecet group_name,count(distinct case when flag1=1 then id end) as dis_cnt1,count(distinct case when flag2=1 then id end) as dis_cnt2-- ,...,count(distinct case when flagn=1 then id end) as dis_cntn
from tablename
group by group_name
这里的数据有n个countdistinct,数据膨胀n倍;但让若我们这么改写:
selecet group_name,count(case when flag1=1 then id end) as dis_cnt1,count(case when flag2=1 then id end) as dis_cnt2-- ,...,count(case when flagn=1 then id end) as dis_cntn
(select group_name,id,max(flag1) as flag1,max(flag2) as flag2--,..,max(flagn) as flagnfrom tablenamegroup by group_name,id
) t
group by group_name
如此,数据没有膨胀,只是多了一个mr成本而已;需注意的是,hive在开启优化后,该改写,可能会合并到一个mr里,具体看执行计划&需求改写;
这篇关于HIVE及SparkSQL优化经验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!