本文主要是介绍Hive性能调优与实战节选,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
此文来自于《Hive性能调优与实战》,写的真不错,如有需要自行购买
京东 在这里仅用以笔记备忘,侵删!
一、什么是mapreduce的shuffle ?
shuffle的过程应该是从mapper的map方式输出到Reduce方法输入的过程. 非常关键的一个环节,制约了性能,保证了可以在廉价机器上可靠运行的一个环节。
在Mapper的map方法中,context.write 会讲数据计算所在的分区后写入到内存缓冲区(100mb)达到阈值0.8 也就是当写到80mb的时候开始启动新线程写入hdfs临时目录。目的是避免单行数据频繁写,以减轻磁盘负载。
在写到hdfs的过程中,为了下游的reducer任务顺序拉取数据,会讲数据排序后在写入到临时文件,当整个map执行介绍后会讲临时文件归并成一个文件。试想如果不对文件进行排序归并,那么下游的reducer任务拉取数据的时候就会频繁搜索磁盘,使得顺序读变为了随机读,极大的影响IO负载。
早期的spark shuffle 默认采用Hash based shuffle 会产生大量的临时文件,以至于下游读取需要很多的文件句柄,性能低下且容易oom。 因此spark 1.1开始引入Sort based shuffle才得意改善,且在spark2.0 正式废弃 Hash Based Shuffle. 事实上 Sort based shuffle也是借鉴了hasoop shuffle的思想。
继续说reduce,reduce任务启动后回启动拉取数据的线程,从HDFS拉取所需要的数据。
试想为啥不直接map完了直接推到对应的reduce节点上呢,这样可以节省写入磁盘的操作,性能更高?
因为采用缓存到hdfs,当一个reducer任务因为某些原因导致异常结束的时候,再启用一个新的依然可以读取到正确的结果。
从hdfs拉取数据回先缓存到内存缓冲区,当数据达到一定的阈值才会讲内存中的数据写入到内存或者磁盘文件中。文件数据量达到一定的量级还需要进行归并。(常见的表连接阶段)
二、发现并优化hive中的性能问题
收集表的元数据:
1、收集表的元数据
analyze table 表名 compute statistics
2、收集字段的元数据
analyze table 表名 compute statistics for columns
3、收集所有分区的元数据 (可能超时)
analyze table 表名 partition(分区列) compute statistics;
4、收集指定分区元数据
analyze table 表名 partition(分区列=分区值) compute;
5、收集所有分区列的元数据
analyze table 表名 partition(分区列) compute statistics for columns;
使用元数据做监控:
(1) 监控普通表存储的文件的平均大小 (2倍文件块大小)
SELECT
TBL_NAME,avgfilesize'fileSize(Mb)'
FROM (
SELECT
tp.totalSize/(1024*1024)/numFiles avgfilessize,TBL_NAME
FROM DBS d
INNER join TBLS t on d.DB_ID = t.DB_ID
left join (SELECT TBL_ID,MAX(case PARAM_KEY when 'numFiles' then PARAM_VALUE ELSE 0 END) numFiles,MAX(case (PARAM_KEY when 'totalSize' then PARAM_VALUE ELSE 0 END ) totalSizefrom TABLE_PARAMSGROUP by TBL_ID
) tp on t.TBL_ID = tp.TBL_ID
where d.`NAME` = 'database name'
and tp.numFiles is not NULL
and tp.numFiles > 0
) a where avgfileszie > hdfs文件块大小*2;
(2)监控分区存储的文件的大小
SELECT
TBL_NAME,part_name,avgfilesize'fileSize(Mb)'
FROM (
SELECT
pp.totalSize/(1024*1024)/numFiles avgfilesize,TBL_NAME,part.PART_NAME
FROM DBS d
INNER join TBLS t on d.DB_ID = t.DB_ID
INNER join PARTITIONS part on t.TBL_ID = part.TBL_ID
left join (SELECT PART_ID,-- 每个表存储的文件个数MAX(case PARAM_KEY when 'numFiles' then PARAM_VALUE ELSE 0 END) numFiles,-- 文件存储的大小MAX(case (PARAM_KEY when 'totalSize' then PARAM_VALUE ELSE 0 END ) totalSizefrom PARTITION_PARAMSGROUP by PART_ID
) pp on part.PART_ID = pp.PART_ID
where d.`NAME` = 'database name'
and pp.numFiles is not NULL
and pp.numFiles > 0
) a where avgfileszie > hdfs文件块大小*2;
(3)监控大表不分区的的表
select t.TBL_NAME
FROM DBS d
inner join TBLS t on d.`DB_ID` = t.`DB_ID`
inner join (select `TBL_ID`,max(case `PARAM_KEY` when 'totalSize' then `PARAM_VALUE` else 0 end) totalSizefrom `TABLE_PARAMS`group by `TBL_ID`) tp on t.`TBL_ID` = tp.`TBL_ID`
left join
(select distinct `TBL_ID` from PARTITIONING
) part on t.`TBL_ID` = part.`TBL_ID`
where d.`NAME` = '监控的库名'
and part.`TBL_ID` is null
and totalSize/1024/1024/1024 > 30
(4)监控hive表的分区数
-- 监控hive的分区数
SELECT
t.TBL_NAME '表名',d.`NAME` '库名', COUNT(part.PART_NAME) '分区数'
FROM
DBS d
INNER JOIN TBLS t on d.DB_ID = t.DB_ID
INNER join `PARTITIONS` part on part.TBL_ID = t.TBL_ID
WHERE d.`NAME` = '需要监控的库名'
GROUP by t.TBL_NAME,d.`NAME`;
三、监控当前集群状态
1、获取集群的状态信息
get http://xx:8088/ws/v1/cluster
2、获取集群任务的整体信息
get http://xx:8088/ws/v1/cluster/metrics
3、获取集群任务的运行信息
get http://xx:8088/ws/v1/cluster/apps
4、单个任务的运行信息
get http://xx:8088/ws/v1/cluster/apps/任务ID
5、获取资源调度分配的信息
get http://xx:8088/ws/v1/cluster/scheduler
四、聊聊parquet 和orc存储格式
列式存储格式普遍具有如下特点:
- 高效的压缩编码,用于降低存储成本
- 高效的读取能力,用于支撑快速查询
parquet具有如下特点;
- 列式存储
- 自带Schema
- 具备Predicate Filter特性
实践证明:使用orc存储可以有效借助于元数据快速筛掉不需要的数据,在查询时候锁消耗 的集群资源比parquet少。
具体参考:https://zhuanlan.zhihu.com/p/35622907
五、数据倾斜解决方案
-
不可拆分大文件引发的倾斜
解决:写入的时候随机打散分多份,或者使用bzip2,zip等支持读取时分割的压缩算法。 -
业务无关的数据引发的数据倾斜
解决:没啥好说的,过滤吧 -
多维聚合计算引发的数据倾斜
比如:rollup 可以尝试将其拆分成普通的分组聚合。
-
无法削减中间结果的数据量引发的数据倾斜
解决:一般情况下开启 hive.groupby.skewindata,但是对于collect_list的聚合函数没有效果,因为该函数要求全量操作数据的中间结果,明显起不到作用,反而会因为引入新的作业增加了磁盘和网络IO开销,从而导致性能更为低下。解决办法,调整reduce执行内存大小,使用mapreduce.reduce.memory.mb。 但是如果hive客户端连接的hiveserver2服务一次性需要返回处理的数据量很大,超过了hiveserver2设置的最大JAVA堆也是会导致内存溢出的,所以请评估好在用。 -
两个hive表join时候引起的数据倾斜
hive的做法是,启动两个作业,第一个处理么有倾斜的数据,第二个处理倾斜的数据将其存到分布式缓存中,分发到各个map任务所在节点,在map阶段完成join 即mapjoin ,避免shuffle ,从而避免数据倾斜。
这篇关于Hive性能调优与实战节选的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!