Hive On Tez小文件合并的技术调研

2024-03-18 20:40
文章标签 技术 合并 hive 调研 tez

本文主要是介绍Hive On Tez小文件合并的技术调研,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Hive On Tez小文件合并的技术调研

背景

在升级到CDP7.1.5之后,默认的运算引擎变成了Tez,之前这篇有讲过:

https://lizhiyong.blog.csdn.net/article/details/126688391

具体参考Cloudera的官方文档:https://docs.cloudera.com/cdp-private-cloud-base/7.1.3/hive-introduction/topics/hive-unsupported.html

并且只能用Tez,调度、血缘等重度依赖租来的阿里云DataPhin,那么最常用的离线跑批任务还是要使用HQL【也就是Hive On Tez】。HQL上手门槛极低,之前搞那种Oracle数据库开发的也可以一周内升任Sql Boy岗位,这是其一;PySpark任务或者用Java/Scala打Jar包的Spark任务由于平台的缺陷无法记录血缘,做溯源及下游影响分析时也是极不方便,只能怼人天靠人工,效率低下。HQL虽然低端并且性能不高,但是一时半会儿还不能被取缔。

原先在CDH5.16中,HQL任务是Hive On MapReduce。写在HQL最上方用于合并小文件的参数到了CDP就不能使用了:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

所以急需合并小文件,最好还是这种方式,就可以不用做太大的改动。

方式

直接set启用

既然之前的MapReduce任务有小文件合并的功能,那么找一找,还真就找到了更多的参数,在HiveConf这个Java类里。

在Apache Hive3.1.2的Java源码找到:

package org.apache.hadoop.hive.conf;public class HiveConf extends Configuration {
HIVEMERGEMAPFILES("hive.merge.mapfiles", true,"Merge small files at the end of a map-only job"),HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false,"Merge small files at the end of a map-reduce job"),HIVEMERGETEZFILES("hive.merge.tezfiles", false, "Merge small files at the end of a Tez DAG"),HIVEMERGESPARKFILES("hive.merge.sparkfiles", false, "Merge small files at the end of a Spark DAG Transformation"),HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000),"Size of merged files at the end of the job"),HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000),"When the average output file size of a job is less than this number, Hive will start an additional \n" +"map-reduce job to merge the output files into bigger files. This is only done for map-only jobs \n" +"if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true."),HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,"When hive.merge.mapfiles, hive.merge.mapredfiles or hive.merge.tezfiles is enabled\n" +"while writing a table with ORC file format, enabling this config will do stripe-level\n" +"fast merge for small ORC files. Note that enabling this config will not honor the\n" +"padding tolerance config (hive.exec.orc.block.padding.tolerance)."),MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true,"Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" +"table there is at most 1 matching row in the source table per SQL Specification.")}

参考:https://lizhiyong.blog.csdn.net/article/details/126688391

这篇有扒源码,看到了Hive最终会把AST解析成Tez或者Spark的DAG,那么此处的hive.merge.tezfileshive.merge.sparkfiles按照注释的字面意思,也就很容易看明白,是要在DAG的结尾处来一次merge。这当然就是最简单的方式。由于某些配置默认是false也就是停用状态,所以必须手动set启用才能生效。

所以只需要在HQL任务头上+这些参数即可:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.tezfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

使用Apache Hive自己搭建了Hive On Spark的难兄难弟们就可以照猫画虎:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.sparkfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

不过Hive的Calcite和CBO、RBO比起商业化DataBrick的Catalyst优势不明显,这种方式性能并不理想,使用SparkSQL的应该才是大多数。

验证

已经在prod环境充分验证,加入参数后敲:

hadoop fs -count
hadoop fs -du -s -h
hadoop fs -du -h

可以看到小文件情况明显改善。

到这里,肤浅的SQL Boy们就可以止步,已经够用了。

调整reducer个数

有时候,出于种种原因【数据量基本可以预知】,我们希望可以像Spark那样写死文件的个数。这样后续的任务调优、资源配额就比较方便。

Spark默认200个Task:

spark.sql.shuffle.partitions=200

只需要最后写文件或者sql跑insert overwrite前来一句免Shuffle高性能的:

df.coalesce(1)

或者肤浅的Sql Boy们比较能接受的Shuffle低性能的:

df1.repartition(1)

即可。Tez当然也是有办法实现这种写死文件个数的效果,那就是限制reducer个数,利用利用distribute by打散到reducer,每个reducer会写一个文件,最终文件个数就是写死的reducer个数。

验证

create external table if not exists test_small_file1(id int,message string
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_small_file2(id int,message string
)
partitioned by(dt string
)
stored as parquet
location '这里写自己集群的即可'
;

作为结果表。

create external table if not exists test_data_source(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_data_source_100w(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_data_source_1000w(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;

作为取数的数据源表。

然后insert数据产生小文件:

insert into test_data_source values(1,'a1',20230310);
...	--这里自己填充
insert into test_data_source values(20,'a20',20230310);insert into test_data_source values(21,'a21',20230311);
...
insert into test_data_source values(40,'a40',20230311);insert into test_data_source values(100,'a100',20230312);
...
insert into test_data_source values(199,'a199',20230312);insert into test_data_source values(200,'a200',20230313);
...
insert into test_data_source values(299,'a299',20230313);

此时在Impala执行:

show files in db_name.test_data_source;

或者直接HDFS查看:

hadoop fs -du -h

都可以看到240个小文件。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source;

可以看到结果是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source;

可以看到是6个小文件。

这点数据就会产生小文件。

Mock出百万级别的数据量:

insert overwrite table test_data_source_100w
select * from test_date_source;
;
insert overwrite table test_data_source_100w
select * from test_data_source_100w;
;

多次执行,直到数据量超过100w。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source_100w;

可以看到结果还是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w;

可以看到是8个小文件。

重头戏来了:

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file1
select id,message from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

打散为10个小文件。

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

由于4个日期,所以打散成40个小文件。

之后Mock出千万级别的也类似。直接说结果:

最终test_small_file1表有5个小文件,test_small_file2有20个小文件。

但是使用参数后:

最终test_small_file1表有10个小文件,test_small_file2有40个小文件。

所以可以看出,distribute by cast(rand() * 100 as int)这个操作利用了Hash Partitioner,结果散步到了Reduce Task。最后文件的个数=Reduce Task个数。

分区表是每个Partition的结果再次Hash Partitioner打散,所以每个分区路径的parquet文件个数都是=Reduce Task个数。

由于hive.exec.reducers.bytes.per.reduce可以设置的很大,那么只需要修改mapreduce.job.reduces的值,就可以让Tez跑HQL任务时写固定个数的文件。

ACID表的ORC小文件

由于Sql Boy们比较顽固,总是想着把Hive当RDBMS来用,于是。。。说多了都是泪。。。

Hive的ACID表会产生大量的小文件,阈值配置的不合适时,触发合并的次数就很少,导致小文件越来越多。这有点像Hudi的MOR【merge on read】。当下游HQL任务从这些ACID表读数据时,就会由于Map Task过多,出现极其严重的性能问题,这个故事慢慢讲。于是Sql Boy们养成了手动合并小文件的好习惯:

alter table tb_name compact 'major';
alter table tb_name compact 'minor';

可以通过:

show compactions;

查看提交到Yarn的合并任务的记录,自己把unix的timestamp换算成人类能看懂的时间,就知道近期的合并情况。。。任务跑得慢,想起来了就手动合并一下。。。

总结

Hive On Tez主要就是这3种合并小文件的方式。

在这里插入图片描述
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129511318

这篇关于Hive On Tez小文件合并的技术调研的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/823585

相关文章

SpringBoot3实现Gzip压缩优化的技术指南

《SpringBoot3实现Gzip压缩优化的技术指南》随着Web应用的用户量和数据量增加,网络带宽和页面加载速度逐渐成为瓶颈,为了减少数据传输量,提高用户体验,我们可以使用Gzip压缩HTTP响应,... 目录1、简述2、配置2.1 添加依赖2.2 配置 Gzip 压缩3、服务端应用4、前端应用4.1 N

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

Python实现合并与拆分多个PDF文档中的指定页

《Python实现合并与拆分多个PDF文档中的指定页》这篇文章主要为大家详细介绍了如何使用Python实现将多个PDF文档中的指定页合并生成新的PDF以及拆分PDF,感兴趣的小伙伴可以参考一下... 安装所需要的库pip install PyPDF2 -i https://pypi.tuna.tsingh

使用Apache POI在Java中实现Excel单元格的合并

《使用ApachePOI在Java中实现Excel单元格的合并》在日常工作中,Excel是一个不可或缺的工具,尤其是在处理大量数据时,本文将介绍如何使用ApachePOI库在Java中实现Excel... 目录工具类介绍工具类代码调用示例依赖配置总结在日常工作中,Excel 是一个不可或缺的工http://

使用Python创建一个能够筛选文件的PDF合并工具

《使用Python创建一个能够筛选文件的PDF合并工具》这篇文章主要为大家详细介绍了如何使用Python创建一个能够筛选文件的PDF合并工具,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录背景主要功能全部代码代码解析1. 初始化 wx.Frame 窗口2. 创建工具栏3. 创建布局和界面控件4

Python自动化办公之合并多个Excel

《Python自动化办公之合并多个Excel》在日常的办公自动化工作中,尤其是处理大量数据时,合并多个Excel表格是一个常见且繁琐的任务,下面小编就来为大家介绍一下如何使用Python轻松实现合... 目录为什么选择 python 自动化目标使用 Python 合并多个 Excel 文件安装所需库示例代码

java如何通过Kerberos认证方式连接hive

《java如何通过Kerberos认证方式连接hive》该文主要介绍了如何在数据源管理功能中适配不同数据源(如MySQL、PostgreSQL和Hive),特别是如何在SpringBoot3框架下通过... 目录Java实现Kerberos认证主要方法依赖示例续期连接hive遇到的问题分析解决方式扩展思考总

使用Python合并 Excel单元格指定行列或单元格范围

《使用Python合并Excel单元格指定行列或单元格范围》合并Excel单元格是Excel数据处理和表格设计中的一项常用操作,本文将介绍如何通过Python合并Excel中的指定行列或单... 目录python Excel库安装Python合并Excel 中的指定行Python合并Excel 中的指定列P

基于C#实现PDF文件合并工具

《基于C#实现PDF文件合并工具》这篇文章主要为大家详细介绍了如何基于C#实现一个简单的PDF文件合并工具,文中的示例代码简洁易懂,有需要的小伙伴可以跟随小编一起学习一下... 界面主要用于发票PDF文件的合并。经常出差要报销的很有用。代码using System;using System.Col