Iceberg Changelog

2024-03-05 04:04
文章标签 iceberg changelog

本文主要是介绍Iceberg Changelog,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

01 Iceberg Changelog使用

0101 Flink使用

CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://xxxx:19083','clientimecol'='5','property-version'='1','warehouse'='hdfs://nameservice/spark'
);use CATALOG hive_catalog;CREATE TABLE test2(
id BIGINT COMMENT 'unique id',
data STRING,
primary key(id) not ENFORCED
);
ALTER TABLE test2 SET('format-version'='2');SET table.exec.iceberg.use-flip27-source = true;SELECT * FROM test2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s')*/ ;

Iceberg流式更新目前只支持Append的数据,不支持更新删除

0102 Spark使用

Spark不是使用正常的业务SQL语句,而是在表后面加一个.changes

SELECT * FROM test2.changes ;1       2022-11-20      INSERT  0       3417478502276420225
5       2022-12-13      INSERT  16      8632592143956424357
1       2022-11-12      DELETE  4       8089565695180123096
1       2022-01-01      DELETE  13      7376527066811164734
1       2022-12-12      DELETE  8       1562898119085686311
7       2022-12-13      INSERT  18      7329729628749942729

02 Flink流式更新

Iceberg Flink FLIP-27实现当中介绍了流式更新时Iceberg相关的接口和流程,主要涉及分片相关的内容,就是获取文件列表的过程,流程中只读取了Append的数据文件

此章节结合FlinkSQL ChangeLog相关内容,看GenericRowData的RowKind产生过程

0201 初始化数据类型

参考kafka,追踪IcebergSourceRecordEmitter,发现没有做数据转换,直接做了数据转发

public void emitRecord(RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {output.collect(element.record());split.updatePosition(element.fileOffset(), element.recordOffset());
}

数据格式的构建在更前面读数据的时候就完成了,读数据的核心逻辑在DataIterator

private void updateCurrentIterator() {try {while (!currentIterator.hasNext() && tasks.hasNext()) {currentIterator.close();currentIterator = openTaskIterator(tasks.next());fileOffset += 1;recordOffset = 0L;}} catch (IOException e) {throw new UncheckedIOException(e);}}
}

主要的功能类就是currentIterator,实现类为RowDataFileScanTaskReader,最终调用下一层iterator,下一层的实现类根据文件类型不同,parquet的实现类为ParquetReader,在next中读取数据

public T next() {if (valuesRead >= nextRowGroupStart) {advance();}if (reuseContainers) {this.last = model.read(last);} else {this.last = model.read(null);}valuesRead += 1;return last;
}

model实现类为ParquetValueReaders

public final T read(T reuse) {I intermediate = newStructData(reuse);for (int i = 0; i < readers.length; i += 1) {set(intermediate, i, readers[i].read(get(intermediate, i)));// setters[i].set(intermediate, i, get(intermediate, i));}return buildStruct(intermediate);
}

newStructData构建数据,创建了GenericRowData

protected GenericRowData newStructData(RowData reuse) {if (reuse instanceof GenericRowData) {return (GenericRowData) reuse;} else {return new GenericRowData(numFields);}
}

0202 支持更新类型

FlinkSQL ChangeLog中明确了数据源支持类型由ScanTableSource中定义,Iceberg的实现类是IcebergTableSource,支持支insert,因此不走Flink的ChangelogNormalize流程

public ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly();
}

03 Spark Changelog

0301 SparkChangelogTable

Spark的changelog有一个专门的处理类SparkChangelogTable,根据第一节中的用法,它需要对应在表名后面加上.changes

public class SparkChangelogTable implements Table, SupportsRead, SupportsMetadataColumns {public static final String TABLE_NAME = "changes";

创建了一个Changelog的TableScan

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {if (refreshEagerly) {icebergTable.refresh();}return new SparkScanBuilder(spark(), icebergTable, changelogSchema(), options) {@Overridepublic Scan build() {return buildChangelogScan();}};
}

buildChangelogScan当中创建了SparkChangelogScan

IncrementalChangelogScan scan =table.newIncrementalChangelogScan().caseSensitive(caseSensitive).filter(filterExpression()).project(expectedSchema);return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);

IncrementalChangelogScan是其中的核心,BaseTable提供了创建接口;BaseIncrementalChangelogScan是Spark调用的,IncrementalAppendScan是Flink调用的(参看Iceberg Flink FLIP-27)

public IncrementalAppendScan newIncrementalAppendScan() {return new BaseIncrementalAppendScan(this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build());
}@Override
public IncrementalChangelogScan newIncrementalChangelogScan() {return new BaseIncrementalChangelogScan(this);
}

0302 BaseIncrementalChangelogScan

略过其他调用流程,其构建changelog分片的核心是doPlanFiles接口,接口输入是起始和结尾的SnapshotId

protected CloseableIterable<ChangelogScanTask> doPlanFiles(Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
  • snapshot排序

首先是对Snapshot进行排序,基本处理就是遍历,遍历的过程中判断如果有delete类型的就异常

private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) {Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {if (!snapshot.operation().equals(DataOperations.REPLACE)) {if (snapshot.deleteManifests(table().io()).size() > 0) {throw new UnsupportedOperationException("Delete files are currently not supported in changelog scans");}changelogSnapshots.addFirst(snapshot);}}return changelogSnapshots;
}

deleteManifests就是删除类型的Manifests,由ManifestFile的content字段决定

Types.NestedField MANIFEST_CONTENT =optional(517, "content", Types.IntegerType.get(), "Contents of the manifest: 0=data, 1=deletes");

insert into、overwrite、delete三种操作出来的ManifestFile的字段值都是0也就是data(这应该是读写模式的原因,默认是copy-on-write)

  • ManifestFile列表

之后根据snapshot范围获取ManifestFile的文件列表,这一步并没有什么特殊的地方

Set<ManifestFile> newDataManifests =FluentIterable.from(changelogSnapshots).transformAndConcat(snapshot -> snapshot.dataManifests(table().io())).filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())).toSet();
  • ManifestGroup

这一步也没有什么特殊的,就是根据文件分片的通用接口进行构建和扫描

ManifestGroup manifestGroup =new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()).specsById(table().specs()).caseSensitive(isCaseSensitive()).select(scanColumns()).filterData(filter()).filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())).ignoreExisting();return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));

0303 输出格式

输出结果如上,输出的字段由schema定义,在SparkChangelogTable当中

public StructType schema() {if (lazyTableSparkType == null) {this.lazyTableSparkType = SparkSchemaUtil.convert(changelogSchema());}return lazyTableSparkType;
}

最终到ChangelogUtil当中定义,就是数据字段和CHANGELOG_METADATA联合

public static Schema changelogSchema(Schema tableSchema) {return TypeUtil.join(tableSchema, CHANGELOG_METADATA);
}

CHANGELOG_METADATA的定义为

private static final Schema CHANGELOG_METADATA =new Schema(CHANGE_TYPE, CHANGE_ORDINAL, COMMIT_SNAPSHOT_ID);

各个字段的意义为

public static final NestedField CHANGE_TYPE =NestedField.required(Integer.MAX_VALUE - 104,"_change_type",Types.StringType.get(),"Record type in changelog");
public static final NestedField CHANGE_ORDINAL =NestedField.optional(Integer.MAX_VALUE - 105,"_change_ordinal",Types.IntegerType.get(),"Change ordinal in changelog");
public static final NestedField COMMIT_SNAPSHOT_ID =NestedField.optional(Integer.MAX_VALUE - 106,"_commit_snapshot_id",Types.LongType.get(),"Commit snapshot ID");

操作示例

insert into sampleSpark values (1, 'name1');
INSERT OVERWRITE sampleSpark values (1, 'name2');
delete from sampleSpark  where id=1;

结果示例

1       name2   INSERT  1       6345233160542557791
1       name2   DELETE  2       6627122686657309022
1       name1   DELETE  1       6345233160542557791
1       name1   INSERT  0       3035871833168296964

0304 SparkChangelogScan

文件扫描落在SparkChangelogScan,创建SparkBatch

public Batch toBatch() {return new SparkBatch(sparkContext,table,readConf,EMPTY_GROUPING_KEY_TYPE,taskGroups(),expectedSchema,hashCode());
}

SparkBatch当中创建PartitionReaderFactory

public PartitionReaderFactory createReaderFactory() {if (useParquetBatchReads()) {int batchSize = readConf.parquetBatchSize();return new SparkColumnarReaderFactory(batchSize);} else if (useOrcBatchReads()) {int batchSize = readConf.orcBatchSize();return new SparkColumnarReaderFactory(batchSize);} else {return new SparkRowReaderFactory();}
}

前两者都是要启用对应文件读取器的向量化功能,普通设置走最后的分支

// conditions for using Parquet batch reads:
// - Parquet vectorization is enabled
// - at least one column is projected
// - only primitives are projected
// - all tasks are of FileScanTask type and read only Parquet files
private boolean useParquetBatchReads() {return readConf.parquetVectorizationEnabled()&& expectedSchema.columns().size() > 0&& expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType())&& taskGroups.stream().allMatch(this::supportsParquetBatchReads);
}

在PartitionReaderFactory当中创建对应的PartitionReader,分支走changelog

} else if (partition.allTasksOfType(ChangelogScanTask.class)) {return new ChangelogRowReader(partition);}

0305 ChangelogRowReader

ChangelogRowReader是实际的读取类,由数据读取器BaseReader调用,对应就是下面的open调用

this.currentIterator.close();
this.currentTask = taskT;
this.currentIterator = open(currentTask);
return true;

数据上设置类型(insert、delete)就是在open当中设置的,这里出来的数据就是带上insert和delete的

protected CloseableIterator<InternalRow> open(ChangelogScanTask task) {JoinedRow cdcRow = new JoinedRow();cdcRow.withRight(changelogMetadata(task));CloseableIterable<InternalRow> rows = openChangelogScanTask(task);CloseableIterable<InternalRow> cdcRows = CloseableIterable.transform(rows, cdcRow::withLeft);return cdcRows.iterator();
}

具体的设置在cdcRow.withRight(changelogMetadata(task));这一行,changelogMetadata如下,类型由task.operation()确定

private static InternalRow changelogMetadata(ChangelogScanTask task) {InternalRow metadataRow = new GenericInternalRow(3);metadataRow.update(0, UTF8String.fromString(task.operation().name()));metadataRow.update(1, task.changeOrdinal());metadataRow.update(2, task.commitSnapshotId());return metadataRow;
}

根本上来说,入参ChangelogScanTask决定了数据是什么类型,有效的类型是AddedRowsScanTask、DeletedDataFileScanTask;对应insert和delete

private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask task) {if (task instanceof AddedRowsScanTask) {return openAddedRowsScanTask((AddedRowsScanTask) task);} else if (task instanceof DeletedRowsScanTask) {throw new UnsupportedOperationException("Deleted rows scan task is not supported yet");} else if (task instanceof DeletedDataFileScanTask) {return openDeletedDataFileScanTask((DeletedDataFileScanTask) task);} else {throw new IllegalArgumentException("Unsupported changelog scan task type: " + task.getClass().getName());}
}

这篇关于Iceberg Changelog的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/775280

相关文章

数据湖解决方案关键一环,IceBerg会不会脱颖而出?

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 小编在之前的详细讲解过关于数据湖的发展历程和现状,《我看好数据湖的未来,但不看好数据湖的现在》 ,在最后一部分中提到了当前数据湖的解决方案中,目前跳的最凶的三巨头包括:Delta、Apache Iceberg 和 Apache Hudi。 本文中将详细的介绍一下其中的IceBerg,看一下IceBerg会不会最终脱颖而出。 发展历

前端项目代码自动生成changelog文件的几种方法

在前端开发项目中自动生成 CHANGELOG.md 文件可以通过多种方式实现。以下是几种常见的方法: 方法一:使用 conventional-changelog conventional-changelog 是一个流行的工具,可以根据 Git 提交信息自动生成 CHANGELOG.md 文件。 安装 conventional-changelog-cli 和 conventional-cha

项目启动报错:liquibase.lockservice:? - Waiting for changelog lock....

异常报错: 原因 工作流表部分日志表被锁,可能上次未正常终止程序导致的异常。 处理 登录mysql指定项目对应数据库 SELECT * FROM DATABASECHANGELOGLOCK; UPDATE DATABASECHANGELOGLOCK SET locked=0, lockgranted=null, lockedby=null WHERE id=1;

揭秘Iceberg:数据湖新版本的高级特性全面解析

目录 1前言 2 特性说明 2.1 Branch and Tag 2.2 Puffin format 2.3 Statistics  2.4View 创建物化视图 查询物化视图 3应用案例 3.1 CDC数据入湖 3.2 多流数据拼接 3.3 异步索引和Z-order聚簇优化 3.4 A/B测试 3.5 多租户访问控制 1 前言 以下的讨论都基于iceberg1

如何为自己的项目生成changelog

背景 在github上看到人家的更新日志感觉很cool,怎么能给自己项目来一套呢 环境信息 tds@tdsdeMacBook-Pro demo-doc % node -vv14.18.1tds@tdsdeMacBook-Pro demo-doc % npm -v6.14.15 硬件信息 型号名称:MacBook Pro版本: 12.6.9芯片: Apple M1 编码实

使用 Iceberg、Tabular 和 MinIO 构建现代数据架构

现代数据环境需要一种新型的基础架构,即无缝集成结构化和非结构化数据、轻松扩展并支持高效的 AI/ML 工作负载的基础架构。这就是现代数据湖的用武之地,它为您的所有数据需求提供了一个中心枢纽。然而,构建和管理有效的数据湖可能很复杂。 这篇博文深入探讨了三个强大的工具,它们可以优化您当前的方法:Apache Iceberg、Tabular 和 MinIO。以下步骤将引导您了解这些服务如何无缝组合,

iceberg gradle项目转maven

iceberg gradle项目转maven 通过versions.props集中进行版本管理 iceberg github上源码是用gradle做依赖管理的,下面记录踩的一些坑: 通过versions.props集中进行版本管理 其各dependency的version是集中在versions.props文件中进行管理的,在build.gradle通过dependencyRe

分钟级延迟kafka和iceberg+hdfs方案成本对比

基于kafka的实时数仓可以达到秒级别延迟(多层,如果是单层可达到ms级别延迟),但是kafka的成本太高,如果要做到近实时的数仓,可用iceberg+hdfs替代kafka。 以上这段是很多公司用iceberg替换kafka的原因,通过下面两个问题问清楚成本高在哪 Q1:存放同样大小1pb的数据,kafka成本为什么比hdfs高? A1:kafka是按消息队列设计的,为了满足

Flink + Iceberg 如何解决数据入湖面临的挑战

本文来自4月17日 Apache Flink x Iceberg Meetup 上海站胡争老师的分享,文末有视频回顾和PPT资源下载~ 欢迎关注公众号,一起探讨交流! 【PPT下载】 https://files.alicdn.com/tpsservice/b201e20d578e1f3c7d

数据湖Iceberg | 实时数据仓库的发展、架构和趋势

数据处理现状:当前基于Hive的离线数据仓库已经非常成熟,数据中台体系也基本上是围绕离线数仓进行建设。但是随着实时计算引擎的不断发展以及业务对于实时报表的产出需求不断膨胀,业界最近几年就一直聚焦并探索于两个相关的热点问题:实时数仓建设和大数据架构的批流一体建设。 1 实时数仓建设:实时数仓1.0 传统意义上我们通常将数据处理分为离线数据处理和实时数据处理。对于实时处理场景,我们一般又可以分为两类