本文主要是介绍Iceberg小文件合并,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.用法–Flink
Iceberg提供了Actions来进行小文件合并,需要手动调用执行
import org.apache.iceberg.flink.actions.Actions;TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table).rewriteDataFiles().execute();
这个任务目前仅仅将小文件进行了合并生成大文件,但旧的文件并没有删除,也就是文件反而变多了
功能一路调用,到RowDataRewriter类中的rewriteDataForTasks
public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {RewriteMap map =new RewriteMap(schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory);DataStream<List<DataFile>> ds = dataStream.map(map).setParallelism(parallelism);return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream().flatMap(Collection::stream).collect(Collectors.toList());
}
这里构建Flink任务,核心是RewriteMap的算子,其map方法如下,本质是读取并写入数据
public List<DataFile> map(CombinedScanTask task) throws Exception {// Initialize the task writer.this.writer = taskWriterFactory.create();try (DataIterator<RowData> iterator =new DataIterator<>(rowDataReader, task, io, encryptionManager)) {while (iterator.hasNext()) {RowData rowData = iterator.next();writer.write(rowData);}return Lists.newArrayList(writer.dataFiles());
注意,这边小文件合并任务是多并发的,目前flink的并发度这里不提供配置,所以用的是flink的parallelism.default的值
int size = combinedScanTasks.size();
int parallelism = Math.min(size, maxParallelism);
DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
RowDataRewriter rowDataRewriter =new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
try {return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
2.Spark
Result result =SparkActions().rewriteDataFiles(table)// do not include any file based on bin pack file size configs.option(BinPackStrategy.MIN_FILE_SIZE_BYTES, "0").option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)).option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE)).option(BinPackStrategy.DELETE_FILE_THRESHOLD, "2").execute();
Spark 3.x的核心RewriteDataFilesSparkAction,不是BaseRewriteDataFilesAction的子类,Flink1.15和Spark 2.x是其子类
Spark3最后走到RewriteDataFilesSparkAction.doExecute,核心如下
try {rewriteTaskBuilder.run(fileGroup -> {rewrittenGroups.add(rewriteFiles(ctx, fileGroup));});
在rewriteFiles当中,进行数据的重写
String desc = jobDesc(fileGroup, ctx);
Set<DataFile> addedFiles =withJobGroupInfo(newJobGroupInfo("REWRITE-DATA-FILES", desc),() -> strategy.rewriteFiles(fileGroup.fileScans()));
strategy有三个实现:SparkBinPackStrategy、SparkSortStrategy、SparkZOrderStrategy(就是前面actions进行配置的)
从SparkSortStrategy的注释看,最后调用到Iceberg里的SparkWrite.RewriteFiles,其他两个没注释,但应该是一样的
sortedDf.write().format("iceberg").option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID).option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()).option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false").mode("append") // This will only write files without modifying the table, see// SparkWrite.RewriteFiles.save(groupID);
2.1.实际写分析
从Spark里的WriteToDataSourceV2Exec.writeWithV2追踪,有这样一段操作
try {sparkContext.runJob(rdd,(context: TaskContext, iter: Iterator[InternalRow]) =>DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator,writeMetrics),
这里提交了一个Spark作业,run方法是实际的处理函数,有写数据的方法
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {while (iter.hasNext) {if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) {CustomMetrics.updateMetrics(dataWriter.currentMetricsValues, customMetrics)}// Count is here.count += 1dataWriter.write(iter.next())}
核心就在这个dataWriter,这是由writeWithV2接口传入的writerFactory构建的,具体创建如下
val writerFactory = batchWrite.createBatchWriterFactory(PhysicalWriteInfoImpl(rdd.getNumPartitions))
追踪代码,是写表,上游调用应该是writeToTable,在WriteToDataSourceV2Exec.scala当中的
private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {protected def writeToTable(
核心调用如下,Iceberg中的SparkTable是SupportsWrite 的子类,应该走这个分支
Utils.tryWithSafeFinallyAndFailureCallbacks({table match {case table: SupportsWrite =>val info = LogicalWriteInfoImpl(queryId = UUID.randomUUID().toString,query.schema,writeOptions)val writeBuilder = table.newWriteBuilder(info)val write = writeBuilder.build()val writtenRows = write match {case v1: V1Write => writeWithV1(v1.toInsertableRelation)case v2 => writeWithV2(v2.toBatch)}
根据调用链,v2.toBatch最后调用的是SparkWriteBuilder里的内容,参数被设置,走第一个分支
public BatchWrite toBatch() {if (rewrittenFileSetId != null) {return asRewrite(rewrittenFileSetId);} else if (overwriteByFilter) {return asOverwriteByFilter(overwriteExpr);
调用createBatchWriterFactory,就到了Iceberg的SparkWrite里
private WriterFactory createWriterFactory() {// broadcast the table metadata as the writer factory will be sent to executorsBroadcast<Table> tableBroadcast =sparkContext.broadcast(SerializableTableWithSize.copyOf(table));return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
}
其createWriter就走回了Iceberg的基本流程,创建UnpartitionedDataWriter或PartitionedDataWriter
这篇关于Iceberg小文件合并的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!