Iceberg小文件合并

2024-02-28 19:28
文章标签 合并 iceberg

本文主要是介绍Iceberg小文件合并,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.用法–Flink

  Iceberg提供了Actions来进行小文件合并,需要手动调用执行

import org.apache.iceberg.flink.actions.Actions;TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table).rewriteDataFiles().execute();

  这个任务目前仅仅将小文件进行了合并生成大文件,但旧的文件并没有删除,也就是文件反而变多了
  功能一路调用,到RowDataRewriter类中的rewriteDataForTasks

public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {RewriteMap map =new RewriteMap(schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory);DataStream<List<DataFile>> ds = dataStream.map(map).setParallelism(parallelism);return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream().flatMap(Collection::stream).collect(Collectors.toList());
}

  这里构建Flink任务,核心是RewriteMap的算子,其map方法如下,本质是读取并写入数据

public List<DataFile> map(CombinedScanTask task) throws Exception {// Initialize the task writer.this.writer = taskWriterFactory.create();try (DataIterator<RowData> iterator =new DataIterator<>(rowDataReader, task, io, encryptionManager)) {while (iterator.hasNext()) {RowData rowData = iterator.next();writer.write(rowData);}return Lists.newArrayList(writer.dataFiles());

  注意,这边小文件合并任务是多并发的,目前flink的并发度这里不提供配置,所以用的是flink的parallelism.default的值

int size = combinedScanTasks.size();
int parallelism = Math.min(size, maxParallelism);
DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
RowDataRewriter rowDataRewriter =new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
try {return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);

2.Spark

Result result =SparkActions().rewriteDataFiles(table)// do not include any file based on bin pack file size configs.option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0").option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)).option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE)).option(BinPackStrategy.DELETE_FILE_THRESHOLD, "2").execute();

  Spark 3.x的核心RewriteDataFilesSparkAction,不是BaseRewriteDataFilesAction的子类,Flink1.15和Spark 2.x是其子类
  Spark3最后走到RewriteDataFilesSparkAction.doExecute,核心如下

try {rewriteTaskBuilder.run(fileGroup -> {rewrittenGroups.add(rewriteFiles(ctx, fileGroup));});

  在rewriteFiles当中,进行数据的重写

String desc = jobDesc(fileGroup, ctx);
Set<DataFile> addedFiles =withJobGroupInfo(newJobGroupInfo("REWRITE-DATA-FILES", desc),() -> strategy.rewriteFiles(fileGroup.fileScans()));

  strategy有三个实现:SparkBinPackStrategy、SparkSortStrategy、SparkZOrderStrategy(就是前面actions进行配置的)
  从SparkSortStrategy的注释看,最后调用到Iceberg里的SparkWrite.RewriteFiles,其他两个没注释,但应该是一样的

sortedDf.write().format("iceberg").option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID).option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()).option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false").mode("append") // This will only write files without modifying the table, see// SparkWrite.RewriteFiles.save(groupID);

2.1.实际写分析

  从Spark里的WriteToDataSourceV2Exec.writeWithV2追踪,有这样一段操作

try {sparkContext.runJob(rdd,(context: TaskContext, iter: Iterator[InternalRow]) =>DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator,writeMetrics),

  这里提交了一个Spark作业,run方法是实际的处理函数,有写数据的方法

Utils.tryWithSafeFinallyAndFailureCallbacks(block = {while (iter.hasNext) {if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) {CustomMetrics.updateMetrics(dataWriter.currentMetricsValues, customMetrics)}// Count is here.count += 1dataWriter.write(iter.next())}

  核心就在这个dataWriter,这是由writeWithV2接口传入的writerFactory构建的,具体创建如下

val writerFactory = batchWrite.createBatchWriterFactory(PhysicalWriteInfoImpl(rdd.getNumPartitions))

  追踪代码,是写表,上游调用应该是writeToTable,在WriteToDataSourceV2Exec.scala当中的

private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {protected def writeToTable(

  核心调用如下,Iceberg中的SparkTable是SupportsWrite 的子类,应该走这个分支

Utils.tryWithSafeFinallyAndFailureCallbacks({table match {case table: SupportsWrite =>val info = LogicalWriteInfoImpl(queryId = UUID.randomUUID().toString,query.schema,writeOptions)val writeBuilder = table.newWriteBuilder(info)val write = writeBuilder.build()val writtenRows = write match {case v1: V1Write => writeWithV1(v1.toInsertableRelation)case v2 => writeWithV2(v2.toBatch)}

  根据调用链,v2.toBatch最后调用的是SparkWriteBuilder里的内容,参数被设置,走第一个分支

public BatchWrite toBatch() {if (rewrittenFileSetId != null) {return asRewrite(rewrittenFileSetId);} else if (overwriteByFilter) {return asOverwriteByFilter(overwriteExpr);

  调用createBatchWriterFactory,就到了Iceberg的SparkWrite里

private WriterFactory createWriterFactory() {// broadcast the table metadata as the writer factory will be sent to executorsBroadcast<Table> tableBroadcast =sparkContext.broadcast(SerializableTableWithSize.copyOf(table));return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
}

  其createWriter就走回了Iceberg的基本流程,创建UnpartitionedDataWriter或PartitionedDataWriter

这篇关于Iceberg小文件合并的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/756394

相关文章

Python自动化办公之合并多个Excel

《Python自动化办公之合并多个Excel》在日常的办公自动化工作中,尤其是处理大量数据时,合并多个Excel表格是一个常见且繁琐的任务,下面小编就来为大家介绍一下如何使用Python轻松实现合... 目录为什么选择 python 自动化目标使用 Python 合并多个 Excel 文件安装所需库示例代码

使用Python合并 Excel单元格指定行列或单元格范围

《使用Python合并Excel单元格指定行列或单元格范围》合并Excel单元格是Excel数据处理和表格设计中的一项常用操作,本文将介绍如何通过Python合并Excel中的指定行列或单... 目录python Excel库安装Python合并Excel 中的指定行Python合并Excel 中的指定列P

基于C#实现PDF文件合并工具

《基于C#实现PDF文件合并工具》这篇文章主要为大家详细介绍了如何基于C#实现一个简单的PDF文件合并工具,文中的示例代码简洁易懂,有需要的小伙伴可以跟随小编一起学习一下... 界面主要用于发票PDF文件的合并。经常出差要报销的很有用。代码using System;using System.Col

Python视频剪辑合并操作的实现示例

《Python视频剪辑合并操作的实现示例》很多人在创作视频时都需要进行剪辑,本文主要介绍了Python视频剪辑合并操作的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录介绍安装FFmpegWindowsMACOS安装MoviePy剪切视频合并视频转换视频结论介绍

不删数据还能合并磁盘? 让电脑C盘D盘合并并保留数据的技巧

《不删数据还能合并磁盘?让电脑C盘D盘合并并保留数据的技巧》在Windows操作系统中,合并C盘和D盘是一个相对复杂的任务,尤其是当你不希望删除其中的数据时,幸运的是,有几种方法可以实现这一目标且在... 在电脑生产时,制造商常为C盘分配较小的磁盘空间,以确保软件在运行过程中不会出现磁盘空间不足的问题。但在

在C#中合并和解析相对路径方式

《在C#中合并和解析相对路径方式》Path类提供了几个用于操作文件路径的静态方法,其中包括Combine方法和GetFullPath方法,Combine方法将两个路径合并在一起,但不会解析包含相对元素... 目录C#合并和解析相对路径System.IO.Path类幸运的是总结C#合并和解析相对路径对于 C

hdu2241(二分+合并数组)

题意:判断是否存在a+b+c = x,a,b,c分别属于集合A,B,C 如果用暴力会超时,所以这里用到了数组合并,将b,c数组合并成d,d数组存的是b,c数组元素的和,然后对d数组进行二分就可以了 代码如下(附注释): #include<iostream>#include<algorithm>#include<cstring>#include<stack>#include<que

day-51 合并零之间的节点

思路 直接遍历链表即可,遇到val=0跳过,val非零则加在一起,最后返回即可 解题过程 返回链表可以有头结点,方便插入,返回head.next Code /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}*

【每日一题】LeetCode 2181.合并零之间的节点(链表、模拟)

【每日一题】LeetCode 2181.合并零之间的节点(链表、模拟) 题目描述 给定一个链表,链表中的每个节点代表一个整数。链表中的整数由 0 分隔开,表示不同的区间。链表的开始和结束节点的值都为 0。任务是将每两个相邻的 0 之间的所有节点合并成一个节点,新节点的值为原区间内所有节点值的和。合并后,需要移除所有的 0,并返回修改后的链表头节点。 思路分析 初始化:创建一个虚拟头节点

【Python从入门到进阶】64、Pandas如何实现数据的Concat合并

接上篇《63.Pandas如何实现数据的Merge》 上一篇我们学习了Pandas如何实现数据的Merge,本篇我们来继续学习Pandas如何实现数据的Concat合并。 一、引言 在数据处理过程中,经常需要将多个数据集合并为一个统一的数据集,以便进行进一步的分析或建模。这种需求在多种场景下都非常常见,比如合并不同来源的数据集以获取更全面的信息、将时间序列数据按时间顺序拼接起来以观察长期趋势等