Iceberg小文件合并

2024-02-28 19:28
文章标签 合并 iceberg

本文主要是介绍Iceberg小文件合并,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.用法–Flink

  Iceberg提供了Actions来进行小文件合并,需要手动调用执行

import org.apache.iceberg.flink.actions.Actions;TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table).rewriteDataFiles().execute();

  这个任务目前仅仅将小文件进行了合并生成大文件,但旧的文件并没有删除,也就是文件反而变多了
  功能一路调用,到RowDataRewriter类中的rewriteDataForTasks

public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {RewriteMap map =new RewriteMap(schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory);DataStream<List<DataFile>> ds = dataStream.map(map).setParallelism(parallelism);return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream().flatMap(Collection::stream).collect(Collectors.toList());
}

  这里构建Flink任务,核心是RewriteMap的算子,其map方法如下,本质是读取并写入数据

public List<DataFile> map(CombinedScanTask task) throws Exception {// Initialize the task writer.this.writer = taskWriterFactory.create();try (DataIterator<RowData> iterator =new DataIterator<>(rowDataReader, task, io, encryptionManager)) {while (iterator.hasNext()) {RowData rowData = iterator.next();writer.write(rowData);}return Lists.newArrayList(writer.dataFiles());

  注意,这边小文件合并任务是多并发的,目前flink的并发度这里不提供配置,所以用的是flink的parallelism.default的值

int size = combinedScanTasks.size();
int parallelism = Math.min(size, maxParallelism);
DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
RowDataRewriter rowDataRewriter =new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
try {return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);

2.Spark

Result result =SparkActions().rewriteDataFiles(table)// do not include any file based on bin pack file size configs.option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0").option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)).option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE)).option(BinPackStrategy.DELETE_FILE_THRESHOLD, "2").execute();

  Spark 3.x的核心RewriteDataFilesSparkAction,不是BaseRewriteDataFilesAction的子类,Flink1.15和Spark 2.x是其子类
  Spark3最后走到RewriteDataFilesSparkAction.doExecute,核心如下

try {rewriteTaskBuilder.run(fileGroup -> {rewrittenGroups.add(rewriteFiles(ctx, fileGroup));});

  在rewriteFiles当中,进行数据的重写

String desc = jobDesc(fileGroup, ctx);
Set<DataFile> addedFiles =withJobGroupInfo(newJobGroupInfo("REWRITE-DATA-FILES", desc),() -> strategy.rewriteFiles(fileGroup.fileScans()));

  strategy有三个实现:SparkBinPackStrategy、SparkSortStrategy、SparkZOrderStrategy(就是前面actions进行配置的)
  从SparkSortStrategy的注释看,最后调用到Iceberg里的SparkWrite.RewriteFiles,其他两个没注释,但应该是一样的

sortedDf.write().format("iceberg").option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID).option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()).option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false").mode("append") // This will only write files without modifying the table, see// SparkWrite.RewriteFiles.save(groupID);

2.1.实际写分析

  从Spark里的WriteToDataSourceV2Exec.writeWithV2追踪,有这样一段操作

try {sparkContext.runJob(rdd,(context: TaskContext, iter: Iterator[InternalRow]) =>DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator,writeMetrics),

  这里提交了一个Spark作业,run方法是实际的处理函数,有写数据的方法

Utils.tryWithSafeFinallyAndFailureCallbacks(block = {while (iter.hasNext) {if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) {CustomMetrics.updateMetrics(dataWriter.currentMetricsValues, customMetrics)}// Count is here.count += 1dataWriter.write(iter.next())}

  核心就在这个dataWriter,这是由writeWithV2接口传入的writerFactory构建的,具体创建如下

val writerFactory = batchWrite.createBatchWriterFactory(PhysicalWriteInfoImpl(rdd.getNumPartitions))

  追踪代码,是写表,上游调用应该是writeToTable,在WriteToDataSourceV2Exec.scala当中的

private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {protected def writeToTable(

  核心调用如下,Iceberg中的SparkTable是SupportsWrite 的子类,应该走这个分支

Utils.tryWithSafeFinallyAndFailureCallbacks({table match {case table: SupportsWrite =>val info = LogicalWriteInfoImpl(queryId = UUID.randomUUID().toString,query.schema,writeOptions)val writeBuilder = table.newWriteBuilder(info)val write = writeBuilder.build()val writtenRows = write match {case v1: V1Write => writeWithV1(v1.toInsertableRelation)case v2 => writeWithV2(v2.toBatch)}

  根据调用链,v2.toBatch最后调用的是SparkWriteBuilder里的内容,参数被设置,走第一个分支

public BatchWrite toBatch() {if (rewrittenFileSetId != null) {return asRewrite(rewrittenFileSetId);} else if (overwriteByFilter) {return asOverwriteByFilter(overwriteExpr);

  调用createBatchWriterFactory,就到了Iceberg的SparkWrite里

private WriterFactory createWriterFactory() {// broadcast the table metadata as the writer factory will be sent to executorsBroadcast<Table> tableBroadcast =sparkContext.broadcast(SerializableTableWithSize.copyOf(table));return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
}

  其createWriter就走回了Iceberg的基本流程,创建UnpartitionedDataWriter或PartitionedDataWriter

这篇关于Iceberg小文件合并的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/756394

相关文章

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Python中合并列表(list)的六种方法小结

《Python中合并列表(list)的六种方法小结》本文主要介绍了Python中合并列表(list)的六种方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录一、直接用 + 合并列表二、用 extend() js方法三、用 zip() 函数交叉合并四、用

利用Python实现Excel文件智能合并工具

《利用Python实现Excel文件智能合并工具》有时候,我们需要将多个Excel文件按照特定顺序合并成一个文件,这样可以更方便地进行后续的数据处理和分析,下面我们看看如何使用Python实现Exce... 目录运行结果为什么需要这个工具技术实现工具的核心功能代码解析使用示例工具优化与扩展有时候,我们需要将

Python实现获取带合并单元格的表格数据

《Python实现获取带合并单元格的表格数据》由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,所以本文我们就来聊聊如何使用Python实现获取带合并单元格的表格数据吧... 由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,现将将封装成类,并通过调用list_exc

Python实现合并与拆分多个PDF文档中的指定页

《Python实现合并与拆分多个PDF文档中的指定页》这篇文章主要为大家详细介绍了如何使用Python实现将多个PDF文档中的指定页合并生成新的PDF以及拆分PDF,感兴趣的小伙伴可以参考一下... 安装所需要的库pip install PyPDF2 -i https://pypi.tuna.tsingh

使用Apache POI在Java中实现Excel单元格的合并

《使用ApachePOI在Java中实现Excel单元格的合并》在日常工作中,Excel是一个不可或缺的工具,尤其是在处理大量数据时,本文将介绍如何使用ApachePOI库在Java中实现Excel... 目录工具类介绍工具类代码调用示例依赖配置总结在日常工作中,Excel 是一个不可或缺的工http://

使用Python创建一个能够筛选文件的PDF合并工具

《使用Python创建一个能够筛选文件的PDF合并工具》这篇文章主要为大家详细介绍了如何使用Python创建一个能够筛选文件的PDF合并工具,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录背景主要功能全部代码代码解析1. 初始化 wx.Frame 窗口2. 创建工具栏3. 创建布局和界面控件4

Python自动化办公之合并多个Excel

《Python自动化办公之合并多个Excel》在日常的办公自动化工作中,尤其是处理大量数据时,合并多个Excel表格是一个常见且繁琐的任务,下面小编就来为大家介绍一下如何使用Python轻松实现合... 目录为什么选择 python 自动化目标使用 Python 合并多个 Excel 文件安装所需库示例代码

使用Python合并 Excel单元格指定行列或单元格范围

《使用Python合并Excel单元格指定行列或单元格范围》合并Excel单元格是Excel数据处理和表格设计中的一项常用操作,本文将介绍如何通过Python合并Excel中的指定行列或单... 目录python Excel库安装Python合并Excel 中的指定行Python合并Excel 中的指定列P

基于C#实现PDF文件合并工具

《基于C#实现PDF文件合并工具》这篇文章主要为大家详细介绍了如何基于C#实现一个简单的PDF文件合并工具,文中的示例代码简洁易懂,有需要的小伙伴可以跟随小编一起学习一下... 界面主要用于发票PDF文件的合并。经常出差要报销的很有用。代码using System;using System.Col