Iceberg小文件合并

2024-02-28 19:28
文章标签 合并 iceberg

本文主要是介绍Iceberg小文件合并,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.用法–Flink

  Iceberg提供了Actions来进行小文件合并,需要手动调用执行

import org.apache.iceberg.flink.actions.Actions;TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table).rewriteDataFiles().execute();

  这个任务目前仅仅将小文件进行了合并生成大文件,但旧的文件并没有删除,也就是文件反而变多了
  功能一路调用,到RowDataRewriter类中的rewriteDataForTasks

public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {RewriteMap map =new RewriteMap(schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory);DataStream<List<DataFile>> ds = dataStream.map(map).setParallelism(parallelism);return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream().flatMap(Collection::stream).collect(Collectors.toList());
}

  这里构建Flink任务,核心是RewriteMap的算子,其map方法如下,本质是读取并写入数据

public List<DataFile> map(CombinedScanTask task) throws Exception {// Initialize the task writer.this.writer = taskWriterFactory.create();try (DataIterator<RowData> iterator =new DataIterator<>(rowDataReader, task, io, encryptionManager)) {while (iterator.hasNext()) {RowData rowData = iterator.next();writer.write(rowData);}return Lists.newArrayList(writer.dataFiles());

  注意,这边小文件合并任务是多并发的,目前flink的并发度这里不提供配置,所以用的是flink的parallelism.default的值

int size = combinedScanTasks.size();
int parallelism = Math.min(size, maxParallelism);
DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
RowDataRewriter rowDataRewriter =new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
try {return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);

2.Spark

Result result =SparkActions().rewriteDataFiles(table)// do not include any file based on bin pack file size configs.option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0").option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)).option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE)).option(BinPackStrategy.DELETE_FILE_THRESHOLD, "2").execute();

  Spark 3.x的核心RewriteDataFilesSparkAction,不是BaseRewriteDataFilesAction的子类,Flink1.15和Spark 2.x是其子类
  Spark3最后走到RewriteDataFilesSparkAction.doExecute,核心如下

try {rewriteTaskBuilder.run(fileGroup -> {rewrittenGroups.add(rewriteFiles(ctx, fileGroup));});

  在rewriteFiles当中,进行数据的重写

String desc = jobDesc(fileGroup, ctx);
Set<DataFile> addedFiles =withJobGroupInfo(newJobGroupInfo("REWRITE-DATA-FILES", desc),() -> strategy.rewriteFiles(fileGroup.fileScans()));

  strategy有三个实现:SparkBinPackStrategy、SparkSortStrategy、SparkZOrderStrategy(就是前面actions进行配置的)
  从SparkSortStrategy的注释看,最后调用到Iceberg里的SparkWrite.RewriteFiles,其他两个没注释,但应该是一样的

sortedDf.write().format("iceberg").option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID).option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()).option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false").mode("append") // This will only write files without modifying the table, see// SparkWrite.RewriteFiles.save(groupID);

2.1.实际写分析

  从Spark里的WriteToDataSourceV2Exec.writeWithV2追踪,有这样一段操作

try {sparkContext.runJob(rdd,(context: TaskContext, iter: Iterator[InternalRow]) =>DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator,writeMetrics),

  这里提交了一个Spark作业,run方法是实际的处理函数,有写数据的方法

Utils.tryWithSafeFinallyAndFailureCallbacks(block = {while (iter.hasNext) {if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) {CustomMetrics.updateMetrics(dataWriter.currentMetricsValues, customMetrics)}// Count is here.count += 1dataWriter.write(iter.next())}

  核心就在这个dataWriter,这是由writeWithV2接口传入的writerFactory构建的,具体创建如下

val writerFactory = batchWrite.createBatchWriterFactory(PhysicalWriteInfoImpl(rdd.getNumPartitions))

  追踪代码,是写表,上游调用应该是writeToTable,在WriteToDataSourceV2Exec.scala当中的

private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {protected def writeToTable(

  核心调用如下,Iceberg中的SparkTable是SupportsWrite 的子类,应该走这个分支

Utils.tryWithSafeFinallyAndFailureCallbacks({table match {case table: SupportsWrite =>val info = LogicalWriteInfoImpl(queryId = UUID.randomUUID().toString,query.schema,writeOptions)val writeBuilder = table.newWriteBuilder(info)val write = writeBuilder.build()val writtenRows = write match {case v1: V1Write => writeWithV1(v1.toInsertableRelation)case v2 => writeWithV2(v2.toBatch)}

  根据调用链,v2.toBatch最后调用的是SparkWriteBuilder里的内容,参数被设置,走第一个分支

public BatchWrite toBatch() {if (rewrittenFileSetId != null) {return asRewrite(rewrittenFileSetId);} else if (overwriteByFilter) {return asOverwriteByFilter(overwriteExpr);

  调用createBatchWriterFactory,就到了Iceberg的SparkWrite里

private WriterFactory createWriterFactory() {// broadcast the table metadata as the writer factory will be sent to executorsBroadcast<Table> tableBroadcast =sparkContext.broadcast(SerializableTableWithSize.copyOf(table));return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
}

  其createWriter就走回了Iceberg的基本流程,创建UnpartitionedDataWriter或PartitionedDataWriter

这篇关于Iceberg小文件合并的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/756394

相关文章

不删数据还能合并磁盘? 让电脑C盘D盘合并并保留数据的技巧

《不删数据还能合并磁盘?让电脑C盘D盘合并并保留数据的技巧》在Windows操作系统中,合并C盘和D盘是一个相对复杂的任务,尤其是当你不希望删除其中的数据时,幸运的是,有几种方法可以实现这一目标且在... 在电脑生产时,制造商常为C盘分配较小的磁盘空间,以确保软件在运行过程中不会出现磁盘空间不足的问题。但在

在C#中合并和解析相对路径方式

《在C#中合并和解析相对路径方式》Path类提供了几个用于操作文件路径的静态方法,其中包括Combine方法和GetFullPath方法,Combine方法将两个路径合并在一起,但不会解析包含相对元素... 目录C#合并和解析相对路径System.IO.Path类幸运的是总结C#合并和解析相对路径对于 C

hdu2241(二分+合并数组)

题意:判断是否存在a+b+c = x,a,b,c分别属于集合A,B,C 如果用暴力会超时,所以这里用到了数组合并,将b,c数组合并成d,d数组存的是b,c数组元素的和,然后对d数组进行二分就可以了 代码如下(附注释): #include<iostream>#include<algorithm>#include<cstring>#include<stack>#include<que

day-51 合并零之间的节点

思路 直接遍历链表即可,遇到val=0跳过,val非零则加在一起,最后返回即可 解题过程 返回链表可以有头结点,方便插入,返回head.next Code /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}*

【每日一题】LeetCode 2181.合并零之间的节点(链表、模拟)

【每日一题】LeetCode 2181.合并零之间的节点(链表、模拟) 题目描述 给定一个链表,链表中的每个节点代表一个整数。链表中的整数由 0 分隔开,表示不同的区间。链表的开始和结束节点的值都为 0。任务是将每两个相邻的 0 之间的所有节点合并成一个节点,新节点的值为原区间内所有节点值的和。合并后,需要移除所有的 0,并返回修改后的链表头节点。 思路分析 初始化:创建一个虚拟头节点

【Python从入门到进阶】64、Pandas如何实现数据的Concat合并

接上篇《63.Pandas如何实现数据的Merge》 上一篇我们学习了Pandas如何实现数据的Merge,本篇我们来继续学习Pandas如何实现数据的Concat合并。 一、引言 在数据处理过程中,经常需要将多个数据集合并为一个统一的数据集,以便进行进一步的分析或建模。这种需求在多种场景下都非常常见,比如合并不同来源的数据集以获取更全面的信息、将时间序列数据按时间顺序拼接起来以观察长期趋势等

线性表中顺序表的合并

对两个顺序表进行合并,算法的复杂度为O(La.size+Lb.size)。 已知: 顺序线性表La和Lb的元素按值非递减排列 归并La和Lb得到的顺序线性表Lc,Lc的元素也按值非递减排列。 代码定义: void mergeList(SeqList *La,SeqList *Lb,SeqList *Lc){Lc->capacity = La->size + Lb->size;Lc->b

为libpng不同架构创建构建目录、编译、安装以及合并库文件的所有步骤。

好的。既然你已经有了 libpng 的源代码,并且当前处在它的目录下,我们可以简化脚本,不再需要下载和解压源代码这一步。以下是修改后的脚本:```sh#!/bin/bash# 当前目录即 libpng 源代码目录LIBPNG_SRC_DIR=$(pwd)# 设置工作目录WORK_DIR=$(pwd)/libpng_buildBUILD_DIR_X86_64="$WORK_DIR/build

listview与复选框的合并使用

在使用listview的过程中,我们常常需要使用复选框,实现一些批处理功能。这时候我们需使用自定义的adapter,实现相关复选框的事件响应。      首先在adapter定义一个哈希表,用于存放复选框的选中情况:      如private static HashMap<String,Boolean> isSelected,private static HashMap<Inter

ZOJ 3324 Machine(线段树区间合并)

这道题网上很多代码是错误的,由于后台数据水,他们可以AC。 比如这组数据 10 3 p 0 9 r 0 5 r 6 9 输出应该是 0 1 1 所以有的人直接记录该区间是否被覆盖过的方法是错误的 正确方法应该是记录这段区间的最小高度(就是最接近初始位置的高度),和最小高度对应的最长左区间和右区间 开一个sum记录这段区间最小高度的块数,min_v 记录该区间最小高度 cover