Hive On Tez小文件合并的技术调研

2024-03-18 20:40
文章标签 技术 合并 hive 调研 tez

本文主要是介绍Hive On Tez小文件合并的技术调研,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Hive On Tez小文件合并的技术调研

背景

在升级到CDP7.1.5之后,默认的运算引擎变成了Tez,之前这篇有讲过:

https://lizhiyong.blog.csdn.net/article/details/126688391

具体参考Cloudera的官方文档:https://docs.cloudera.com/cdp-private-cloud-base/7.1.3/hive-introduction/topics/hive-unsupported.html

并且只能用Tez,调度、血缘等重度依赖租来的阿里云DataPhin,那么最常用的离线跑批任务还是要使用HQL【也就是Hive On Tez】。HQL上手门槛极低,之前搞那种Oracle数据库开发的也可以一周内升任Sql Boy岗位,这是其一;PySpark任务或者用Java/Scala打Jar包的Spark任务由于平台的缺陷无法记录血缘,做溯源及下游影响分析时也是极不方便,只能怼人天靠人工,效率低下。HQL虽然低端并且性能不高,但是一时半会儿还不能被取缔。

原先在CDH5.16中,HQL任务是Hive On MapReduce。写在HQL最上方用于合并小文件的参数到了CDP就不能使用了:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

所以急需合并小文件,最好还是这种方式,就可以不用做太大的改动。

方式

直接set启用

既然之前的MapReduce任务有小文件合并的功能,那么找一找,还真就找到了更多的参数,在HiveConf这个Java类里。

在Apache Hive3.1.2的Java源码找到:

package org.apache.hadoop.hive.conf;public class HiveConf extends Configuration {
HIVEMERGEMAPFILES("hive.merge.mapfiles", true,"Merge small files at the end of a map-only job"),HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false,"Merge small files at the end of a map-reduce job"),HIVEMERGETEZFILES("hive.merge.tezfiles", false, "Merge small files at the end of a Tez DAG"),HIVEMERGESPARKFILES("hive.merge.sparkfiles", false, "Merge small files at the end of a Spark DAG Transformation"),HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000),"Size of merged files at the end of the job"),HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000),"When the average output file size of a job is less than this number, Hive will start an additional \n" +"map-reduce job to merge the output files into bigger files. This is only done for map-only jobs \n" +"if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true."),HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,"When hive.merge.mapfiles, hive.merge.mapredfiles or hive.merge.tezfiles is enabled\n" +"while writing a table with ORC file format, enabling this config will do stripe-level\n" +"fast merge for small ORC files. Note that enabling this config will not honor the\n" +"padding tolerance config (hive.exec.orc.block.padding.tolerance)."),MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true,"Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" +"table there is at most 1 matching row in the source table per SQL Specification.")}

参考:https://lizhiyong.blog.csdn.net/article/details/126688391

这篇有扒源码,看到了Hive最终会把AST解析成Tez或者Spark的DAG,那么此处的hive.merge.tezfileshive.merge.sparkfiles按照注释的字面意思,也就很容易看明白,是要在DAG的结尾处来一次merge。这当然就是最简单的方式。由于某些配置默认是false也就是停用状态,所以必须手动set启用才能生效。

所以只需要在HQL任务头上+这些参数即可:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.tezfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

使用Apache Hive自己搭建了Hive On Spark的难兄难弟们就可以照猫画虎:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.sparkfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

不过Hive的Calcite和CBO、RBO比起商业化DataBrick的Catalyst优势不明显,这种方式性能并不理想,使用SparkSQL的应该才是大多数。

验证

已经在prod环境充分验证,加入参数后敲:

hadoop fs -count
hadoop fs -du -s -h
hadoop fs -du -h

可以看到小文件情况明显改善。

到这里,肤浅的SQL Boy们就可以止步,已经够用了。

调整reducer个数

有时候,出于种种原因【数据量基本可以预知】,我们希望可以像Spark那样写死文件的个数。这样后续的任务调优、资源配额就比较方便。

Spark默认200个Task:

spark.sql.shuffle.partitions=200

只需要最后写文件或者sql跑insert overwrite前来一句免Shuffle高性能的:

df.coalesce(1)

或者肤浅的Sql Boy们比较能接受的Shuffle低性能的:

df1.repartition(1)

即可。Tez当然也是有办法实现这种写死文件个数的效果,那就是限制reducer个数,利用利用distribute by打散到reducer,每个reducer会写一个文件,最终文件个数就是写死的reducer个数。

验证

create external table if not exists test_small_file1(id int,message string
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_small_file2(id int,message string
)
partitioned by(dt string
)
stored as parquet
location '这里写自己集群的即可'
;

作为结果表。

create external table if not exists test_data_source(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_data_source_100w(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_data_source_1000w(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;

作为取数的数据源表。

然后insert数据产生小文件:

insert into test_data_source values(1,'a1',20230310);
...	--这里自己填充
insert into test_data_source values(20,'a20',20230310);insert into test_data_source values(21,'a21',20230311);
...
insert into test_data_source values(40,'a40',20230311);insert into test_data_source values(100,'a100',20230312);
...
insert into test_data_source values(199,'a199',20230312);insert into test_data_source values(200,'a200',20230313);
...
insert into test_data_source values(299,'a299',20230313);

此时在Impala执行:

show files in db_name.test_data_source;

或者直接HDFS查看:

hadoop fs -du -h

都可以看到240个小文件。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source;

可以看到结果是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source;

可以看到是6个小文件。

这点数据就会产生小文件。

Mock出百万级别的数据量:

insert overwrite table test_data_source_100w
select * from test_date_source;
;
insert overwrite table test_data_source_100w
select * from test_data_source_100w;
;

多次执行,直到数据量超过100w。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source_100w;

可以看到结果还是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w;

可以看到是8个小文件。

重头戏来了:

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file1
select id,message from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

打散为10个小文件。

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

由于4个日期,所以打散成40个小文件。

之后Mock出千万级别的也类似。直接说结果:

最终test_small_file1表有5个小文件,test_small_file2有20个小文件。

但是使用参数后:

最终test_small_file1表有10个小文件,test_small_file2有40个小文件。

所以可以看出,distribute by cast(rand() * 100 as int)这个操作利用了Hash Partitioner,结果散步到了Reduce Task。最后文件的个数=Reduce Task个数。

分区表是每个Partition的结果再次Hash Partitioner打散,所以每个分区路径的parquet文件个数都是=Reduce Task个数。

由于hive.exec.reducers.bytes.per.reduce可以设置的很大,那么只需要修改mapreduce.job.reduces的值,就可以让Tez跑HQL任务时写固定个数的文件。

ACID表的ORC小文件

由于Sql Boy们比较顽固,总是想着把Hive当RDBMS来用,于是。。。说多了都是泪。。。

Hive的ACID表会产生大量的小文件,阈值配置的不合适时,触发合并的次数就很少,导致小文件越来越多。这有点像Hudi的MOR【merge on read】。当下游HQL任务从这些ACID表读数据时,就会由于Map Task过多,出现极其严重的性能问题,这个故事慢慢讲。于是Sql Boy们养成了手动合并小文件的好习惯:

alter table tb_name compact 'major';
alter table tb_name compact 'minor';

可以通过:

show compactions;

查看提交到Yarn的合并任务的记录,自己把unix的timestamp换算成人类能看懂的时间,就知道近期的合并情况。。。任务跑得慢,想起来了就手动合并一下。。。

总结

Hive On Tez主要就是这3种合并小文件的方式。

在这里插入图片描述
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129511318

这篇关于Hive On Tez小文件合并的技术调研的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/823585

相关文章

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Qt如何实现文本编辑器光标高亮技术

《Qt如何实现文本编辑器光标高亮技术》这篇文章主要为大家详细介绍了Qt如何实现文本编辑器光标高亮技术,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录实现代码函数作用概述代码详解 + 注释使用 QTextEdit 的高亮技术(重点)总结用到的关键技术点应用场景举例示例优化建议

Java中的登录技术保姆级详细教程

《Java中的登录技术保姆级详细教程》:本文主要介绍Java中登录技术保姆级详细教程的相关资料,在Java中我们可以使用各种技术和框架来实现这些功能,文中通过代码介绍的非常详细,需要的朋友可以参考... 目录1.登录思路2.登录标记1.会话技术2.会话跟踪1.Cookie技术2.Session技术3.令牌技

Python中合并列表(list)的六种方法小结

《Python中合并列表(list)的六种方法小结》本文主要介绍了Python中合并列表(list)的六种方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录一、直接用 + 合并列表二、用 extend() js方法三、用 zip() 函数交叉合并四、用

Web技术与Nginx网站环境部署教程

《Web技术与Nginx网站环境部署教程》:本文主要介绍Web技术与Nginx网站环境部署教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Web基础1.域名系统DNS2.Hosts文件3.DNS4.域名注册二.网页与html1.网页概述2.HTML概述3.

利用Python实现Excel文件智能合并工具

《利用Python实现Excel文件智能合并工具》有时候,我们需要将多个Excel文件按照特定顺序合并成一个文件,这样可以更方便地进行后续的数据处理和分析,下面我们看看如何使用Python实现Exce... 目录运行结果为什么需要这个工具技术实现工具的核心功能代码解析使用示例工具优化与扩展有时候,我们需要将

Python实现获取带合并单元格的表格数据

《Python实现获取带合并单元格的表格数据》由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,所以本文我们就来聊聊如何使用Python实现获取带合并单元格的表格数据吧... 由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,现将将封装成类,并通过调用list_exc

Java使用WebView实现桌面程序的技术指南

《Java使用WebView实现桌面程序的技术指南》在现代软件开发中,许多应用需要在桌面程序中嵌入Web页面,例如,你可能需要在Java桌面应用中嵌入一部分Web前端,或者加载一个HTML5界面以增强... 目录1、简述2、WebView 特点3、搭建 WebView 示例3.1 添加 JavaFX 依赖3

SpringBoot3实现Gzip压缩优化的技术指南

《SpringBoot3实现Gzip压缩优化的技术指南》随着Web应用的用户量和数据量增加,网络带宽和页面加载速度逐渐成为瓶颈,为了减少数据传输量,提高用户体验,我们可以使用Gzip压缩HTTP响应,... 目录1、简述2、配置2.1 添加依赖2.2 配置 Gzip 压缩3、服务端应用4、前端应用4.1 N

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4