本文主要是介绍Hive必知必会的优化细节和原理释义,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、 常用参数优化
参数名 | 参数释义和用法 |
---|---|
列裁剪和分区裁剪 | 列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。解析阶段对应的则是ColumnPruner逻辑优化器 |
hive.optimize.cp | True(默认) |
hive.optimize.pruner | True(默认) |
谓词下推 | 在关系型数据库如MySQL中,也有谓词下推(Predicate Pushdown,PPD)的概念。它就是将SQL语句中的where谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是PredicatePushDown |
hive.optimize.ppd | true |
group by配置调整 | |
hive.map.aggr(map端聚合) | 默认 true,并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。 对应的优化器为GroupByOptimizer |
hive.groupby.mapaggr.checkinterval | 设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000 |
hive.groupby.skewindata | false, 解释:在group by时启动两个MR job。第一个job会将map端数据随机输入reducer,每个reducer做部分聚合,相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。 |
启用压缩 | 压缩job的中间结果数据和输出数据,可以用少量CPU时间节省很多空间。压缩方式一般选择Snappy,效率最高。 |
hive.exec.compress.intermediate | default is false |
hive.intermediate.compression.codec | org.apache.hadoop.io.compress.SnappyCodec |
hive.intermediate.compression.type | 可以选择对块(BLOCK)还是记录(RECORD)压缩,BLOCK的压缩率比较高。 |
hive.exec.compress.output=true | 输出压缩配置项 |
Join基础优化 | |
build table(小表)前置 | 小表叫build table,大表叫probe table Hive在解析带join的SQL语句时,会默认将最后一个表作为probe table,将前面的表作为build table并试图将它们读进内存。如果表顺序写反,probe table在前面,引发OOM的风险就高了 |
多表join时key相同 | 这种情况会将多个join合并为一个MR job来处理,负责这个的是相关性优化器CorrelationOptimizer,参考wiki |
利用map join特性 | map join特别适合大小表join的情况。Hive会将build table和probe table在map端直接完成join过程,消灭了reduce,效率很高。 |
hive.auto.convert.join | 默认值true,对应逻辑优化器是MapJoinProcessor。 |
hive.mapjoin.smalltable.filesize | 默认值25000000(25MB) |
hive.mapjoin.cache.numrows | 表示缓存build table的多少行数据到内存,默认值25000。 |
倾斜均衡配置项 | |
hive.optimize.skewjoin | False |
hive.skewjoin.key | 如果开启了,在join过程中Hive会将计数超过阈值hive.skewjoin.key (默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个job的mapper数量,默认10000。 |
使用向量化查询 | 向量化查询执行通过一次性批量执行1024行而不是每次单行执行,从而提供扫描、聚合、筛选器和连接等操作的性能。在Hive 0.13中引入,此功能显着提高了查询执行时间,并可通过两个参数设置轻松启用: |
hive.vectorized.execution.enabled | true |
hive.vectorizedexecution.reduce.enabled | true |
CBO(cost based query optimization) | |
自动优化HQL中多个JOIN的顺序,并选择合适的JOIN算法 | hive.cbo.enable = true; hive.compute.query.using.stats = true; hive.stats.fetch.column.stats = true; hive.stats.fetch.partition.stats = true; |
推测执行 | |
Hadoop推测执行可以触发执行一些重复的任务,尽管因对重复的数据进行计算而导致消耗更多的计算资源,不过这个功能的目标是通过加快获取单个task的结果以侦测执行慢的TaskTracker加入到没名单的方式来提高整体的任务执行效率。Hadoop的推测执行功能由2个配置控制着,通过mapred-site.xml中配置 | mapred.map.tasks.speculative.execution=true mapred.reduce.tasks.speculative.execution=true |
二、SQL语法相关的优化
-
sort by代替order by
HiveSQL中的order by与其他SQL方言中的功能一样,就是将结果按某字段全局排序,这会导致所有map端数据都进入一个reducer中,在数据量大时可能会长时间计算不完。如果使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合distribute by一同使用。如果不加distribute by的话,map端数据就会随机分配到reducer。
举个例子,假如要以UID为key,以上传时间倒序、记录类型倒序输出记录数据。select uid,upload_time,event_type,record_data from calendar_record_log where pt_date >= 20190201 and pt_date <= 20190224 distribute by uid sort by upload_time desc,event_type desc;
-
group by代替distinct
当要统计某一列的去重数时,如果数据量很大,count(distinct)就会非常慢,原因与order by类似,count(distinct)逻辑只会有很少的reducer来处理。这时可以用group by来改写:
select count(1) from (select uid from calendar_record_logwhere pt_date >= 20190101group by uid ) t;
这样写会启动两个MR job(单纯distinct只会启动一个),所以要确保数据量大到启动job的overhead远小于计算耗时,才考虑这种方法。
-
优化SQL处理join数据倾斜
空值或无意义值
这种情况很常见,比如当事实表是日志类数据时,往往会有一些项没有记录到,我们视情况会将它置为null,或者空字符串、-1等。如果缺失的项很多,在做join时这些空值就会非常集中,拖累进度。
因此,若不需要空值数据,就提前写where语句过滤掉。需要保留的话,将空值key用随机方式打散,例如将用户ID为null的记录随机改为负值:
select a.uid,a.event_type,b.nickname,b.age from ( select(case when uid is null then cast(rand()*-10240 as int) else uid end) as uid,event_type from calendar_record_log where pt_date >= 20190201 ) a left outer join ( select uid,nickname,age from user_info where status = 4 ) b on a.uid = b.uid;
单独处理倾斜key
这其实是上面处理空值方法的拓展,不过倾斜的key变成了有意义的。一般来讲倾斜的key都很少,我们可以将它们抽样出来,对应的行单独存入临时表中,然后打上一个较小的随机数前缀(比如0~9),最后再进行聚合。
不同数据类型
这种情况不太常见,主要出现在相同业务含义的列发生过逻辑上的变化时。 举个例子,假如我们有一旧一新两张日历记录表,旧表的记录类型字段是(event_type int),新表的是(event_type string)。为了兼容旧版记录,新表的event_type也会以字符串形式存储旧版的值,比如’17’。当这两张表join时,经常要耗费很长时间。其原因就是如果不转换类型,计算key的hash值时默认是以int型做的,这就导致所有“真正的”string型key都分配到一个reducer上。所以要注意类型转换:
select a.uid,a.event_type,b.record_data from calendar_record_log a left outer join ( select uid,event_type from calendar_record_log_2 where pt_date = 20190228 ) b on a.uid = b.uid and b.event_type = cast(a.event_type as string) where a.pt_date = 20190228;
build table过大
有时,build table会大到无法直接使用map join的地步,比如全量用户维度表,而使用普通join又有数据分布不均的问题。这时就要充分利用probe table的限制条件,削减build table的数据量,再使用map join解决。代价就是需要进行两次join。举个例子:
select /*+mapjoin(b)*/ a.uid,a.event_type,b.status,b.extra_info from calendar_record_log a left outer join ( select /*+mapjoin(s)*/ t.uid,t.status,t.extra_info from (select distinct uid from calendar_record_log where pt_date = 20190228) s inner join user_info t on s.uid = t.uid ) b on a.uid = b.uid where a.pt_date = 20190228;
-
MapReduce优化
调整mapper数
mapper数量与输入文件的split数息息相关,在Hadoop源码
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
类中可以看到split划分的具体逻辑。这里不贴代码,直接叙述mapper数是如何确定的。- 可以直接通过参数
mapred.map.tasks
(默认值2)来设定mapper数的期望值,但它不一定会生效,下面会提到。 - 设输入文件的总大小为
total_input_size
。HDFS中,一个块的大小由参数dfs.block.size
指定,默认值64MB或128MB。在默认情况下,mapper数就是:default_mapper_num = total_input_size / dfs.block.size
。 - 参数
mapred.min.split.size
(默认值1B)和mapred.max.split.size
(默认值64MB)分别用来指定split的最小和最大大小。split大小和split数计算规则是:split_size = MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size))
;split_num = total_input_size / split_size
。 - 得出mapper数:
mapper_num = MIN(split_num, MAX(default_num, mapred.map.tasks))
。
可见,如果想减少mapper数,就适当调高
mapred.min.split.size
,split数就减少了。如果想增大mapper数,除了降低mapred.min.split.size
之外,也可以调高mapred.map.tasks
。一般来讲,如果输入文件是少量大文件,就减少mapper数;如果输入文件是大量非小文件,就增大mapper数;至于大量小文件的情况,则需要合并小文件后再处理。
调整reducer数
reducer数量的确定方法比mapper简单得多。使用参数
mapred.reduce.tasks
可以直接设定reducer数量,不像mapper一样是期望值。但如果不设这个参数的话,Hive就会自行推测,逻辑如下:- 参数
hive.exec.reducers.bytes.per.reducer
用来设定每个reducer能够处理的最大数据量,默认值1G(1.2版本之前)或256M(1.2版本之后)。 - 参数
hive.exec.reducers.max
用来设定每个job的最大reducer数量,默认值999(1.2版本之前)或1009(1.2版本之后)。 - 得出reducer数:
reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)
。
reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或者造成OOM。
合并小文件
- 输入阶段合并 需要更改Hive的输入文件格式,即参数
hive.input.format
,默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat
,我们改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
。 这样比起上面调整mapper数时,又会多出两个参数,分别是mapred.min.split.size.per.node
和mapred.min.split.size.per.rack
,含义是单节点和单机架上的最小split大小。如果发现有split大小小于这两个值(默认都是100MB),则会进行合并。具体逻辑可以参看Hive源码中的对应类。 - 输出阶段合并 直接将
hive.merge.mapfiles
和hive.merge.mapredfiles
都设为true即可,前者表示将map-only任务的输出合并,后者表示将map-reduce任务的输出合并。 另外,hive.merge.size.per.task
可以指定每个task输出后合并文件大小的期望值,hive.merge.size.smallfiles.avgsize
可以指定所有输出文件大小的均值阈值,默认值都是1GB。如果平均大小不足的话,就会另外启动一个任务来进行合并。
启用压缩
压缩job的中间结果数据和输出数据,可以用少量CPU时间节省很多空间。压缩方式一般选择Snappy,效率最高。
Job输出文件按照block以Gzip的方式进行压缩:
set mapreduce.output.fileoutputformat.compress=true -- 默认值是 false set mapreduce.output.fileoutputformat.compress.type=BLOCK -- 默认值是 Record --可以选择对块(BLOCK)还是记录(RECORD)压缩,BLOCK的压缩率比较高 set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.GzipCodec --默认值是 org.apache.hadoop.io.compress.DefaultCodec
Map输出结果也以Gzip进行压缩:
set mapred.map.output.compress=true set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec -- 默认值是 org.apache.hadoop.io.compress.DefaultCodec
对Hive输出结果和中间都进行压缩:
set hive.exec.compress.output=true --默认值是 false,不压缩 set hive.exec.compress.intermediate=true --默认值是 false,为 true 时 MR 设置的压缩才启用
JVM重用
在MR job中,默认是每执行一个task就启动一个JVM。如果task非常小而碎,那么JVM启动和关闭的耗时就会很长。可以通过调节参数
mapred.job.reuse.jvm.num.tasks
来重用。例如将这个参数设成5,那么就代表同一个MR job中顺序执行的5个task可以重复使用一个JVM,减少启动和关闭的开销。但它对不同MR job中的task无效。并行执行与本地模式
- 并行执行 Hive中互相没有依赖关系的job间是可以并行执行的,最典型的就是多个子查询union all。在集群资源相对充足的情况下,可以开启并行执行,即将参数
hive.exec.parallel
设为true。另外hive.exec.parallel.thread.number
可以设定并行执行的线程数,默认为8,一般都够用。 - 本地模式 Hive也可以不将任务提交到集群进行运算,而是直接在一台节点上处理。因为消除了提交到集群的overhead,所以比较适合数据量很小,且逻辑不复杂的任务。 设置
hive.exec.mode.local.auto
为true可以开启本地模式。但任务的输入数据总量必须小于hive.exec.mode.local.auto.inputbytes.max
(默认值128MB),且mapper数必须小于hive.exec.mode.local.auto.tasks.max
(默认值4),reducer数必须为0或1,才会真正用本地模式执行。
严格模式
所谓严格模式,就是强制不允许用户执行3种有风险的HiveSQL语句,一旦执行会直接失败。这3种语句是:
- 查询分区表时不限定分区列的语句;
- 两表join产生了笛卡尔积的语句;
- 用order by来排序但没有指定limit的语句。
要开启严格模式,需要将参数==
hive.mapred.mode
==设为strict。采用合适的存储格式
在HiveSQL的create table语句中,可以使用
stored as ...
指定表的存储格式。Hive表支持的存储格式有TextFile、SequenceFile、RCFile、Avro、ORC、Parquet等。 存储格式一般需要根据业务进行选择,在我们的实操中,绝大多数表都采用TextFile与Parquet两种存储格式之一。 TextFile是最简单的存储格式,它是纯文本记录,也是Hive的默认格式。虽然它的磁盘开销比较大,查询效率也低,但它更多地是作为跳板来使用。RCFile、ORC、Parquet等格式的表都不能由文件直接导入数据,必须由TextFile来做中转。 Parquet和ORC都是Apache旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量OLAP查询,并且也支持更好的压缩和编码。我们选择Parquet的原因主要是它支持Impala查询引擎,并且我们对update、delete和事务性操作需求很低。 这里就不展开讲它们的细节,可以参考各自的官网: https://parquet.apache.org/ https://orc.apache.org/ - 可以直接通过参数
-
单个大文件处理办法
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,
来使得每个map处理的数据量减少,从而提高任务的执行效率。Select data_desc,count(1),count(distinct id),sum(case when ...),sum(case when ...),sum(...)from a group by data_desc
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,
这样就可以用多个map任务去完成。set mapred.reduce.tasks=10;create table a_1 strored as orc as select * from a distribute by rand(123);
distribute by :用来控制map输出结果的分发,即map端如何拆分数据给reduce端。 会根据distribute by 后边定义的列,根据reduce的个数进行数据分发,默认是采用hash算法。当 distribute by 后边跟的列是:rand()函数时,即表示保证每个分区的数据量基本一致。
- cluster by: 对同一字段分桶并排序,不能和sort by连用;
- distribute by + sort by: 分桶,保证同一字段值只存在一个结果文件当中,结合sort by 保证每个reduceTask结果有序;
- sort by: 单机排序,单个reduce结果有序
- order by:全局排序,缺陷是只能使用一个reduce
扩展参考:https://blog.csdn.net/qq_40795214/article/details/82190827
-
优化in/exists语句
虽然经过测验,hive1.2.1也支持in/exists操作,但还是推荐使用hive的一个高效替代方案:left semi join
比如说:
select a.id, a.name from a where a.id in (select b.id from b); select a.id, a.name from a where exists (select id from b where a.id = b.id);
应该转换成:
select a.id, a.name from a left semi join b on a.id = b.id;
-
补充
三、原理释义
-
Map join 工作原理释义
上图是Hive MapJoin的原理图,从图中可以看出MapJoin分为两个阶段:
(1)通过MapReduce Local Task,将小表读入内存,生成内存HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
(2)MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。也就是在map端进行join避免了shuffle。
Join操作在Map阶段完成,不再需要Reduce,有多少个Map Task,就有多少个结果文件。 -
Hive SQL Join的实现原理
select u.name, o.orderid from order o join user u on o.uid = u.uid;
在map的输出value中为不同表的数据打上tag标记,在reduce阶段根据tag判断数据来源。
-
Hive Group by 实现原理
select rank, isonline, count(*) from city group by rank, isonline;
将GroupBy的字段组合为map的输出key值,利用MapReduce的排序,在reduce阶段保存LastKey区分不同的key。MapReduce的过程如下(当然这里只是说明Reduce端的非Hash聚合过程)
-
Distinct的实现原理
select dealid, count(distinct uid) num from order group by dealid;
-
当只有一个distinct字段时,如果不考虑Map阶段的Hash GroupBy,只需要将GroupBy字段和Distinct字段组合为map输出key,利用mapreduce的排序,同时将GroupBy字段作为reduce的key,在reduce阶段保存LastKey即可完成去重.
-
如果有多个distinct字段呢,如下面的SQL
select dealid, count(distinct uid), count(distinct date) from order group by dealid;
实现方式有两种:
(1)如果仍然按照上面一个distinct字段的方法,即下图这种实现方式,无法跟据uid和date分别排序,也就无法通过LastKey去重,仍然需要在reduce阶段在内存中通过Hash去重。(2)第二种实现方式,可以对所有的distinct字段编号,每行数据生成n行数据,那么相同字段就会分别排序,这时只需要在reduce阶段记录LastKey即可去重。
这种实现方式很好的利用了MapReduce的排序,节省了reduce阶段去重的内存消耗,但是缺点是增加了shuffle的数据量。需要注意的是,在生成reduce value时,除第一个distinct字段所在行需要保留value值,其余distinct数据行value字段均可为空。
简单解释下,如上图,对多字段的去重是从打编号开始的,如 uid=0,date=1,然后每一行按照groupby 字段拆成两行如下:
<1001,0,1> <1001,1,1101> <1001,0,2> <1001,1,1101> <1001,0,2> <1001,1,1102>
-
-
Hive SQL转化为MapReduce的过程
整个编译过程分为六个阶段:
- Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree
- 遍历AST Tree,抽象出查询的基本组成单元QueryBlock
- 遍历QueryBlock,翻译为执行操作树OperatorTree
- 逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量
- 遍历OperatorTree,翻译为MapReduce任务
- 物理层优化器进行MapReduce任务的变换,生成最终的执行计划
-
MapReduce过程详解及其性能优化
参考:https://www.cnblogs.com/felixzh/p/8604188.html
为这个图的作者点个赞,制作精良,通俗易懂。简述如下,详情参考链接。
【map阶段】
1、读取数据HDFS [map数:输入文件数目,输入文件的大小,配置参数]
2、处理数据 (分区-> 环形缓冲区)
分区:默认是通过计算key的hash值后对Reduce task的数量取模获得
环形缓冲区:Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据
3、写数据 (Combiner -> spill -> 压缩 )
【reduce阶段】
1、拷贝 (Reduce任务通过HTTP向各个Map任务下载它所需要的数据(网络传输)
2、合并
1)内存到内存(memToMemMerger)
2)内存中Merge(inMemoryMerger)
3)磁盘上的Merge(onDiskMerger)具体包括两个:(一)Copy过程中磁盘合并(二)磁盘到磁盘
参考:
https://cloud.tencent.com/developer/article/1453464
https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html
https://www.cnblogs.com/swordfall/p/11037539.html
这篇关于Hive必知必会的优化细节和原理释义的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!