本文主要是介绍Iceberg Changelog,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
01 Iceberg Changelog使用
0101 Flink使用
CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://xxxx:19083','clientimecol'='5','property-version'='1','warehouse'='hdfs://nameservice/spark'
);use CATALOG hive_catalog;CREATE TABLE test2(
id BIGINT COMMENT 'unique id',
data STRING,
primary key(id) not ENFORCED
);
ALTER TABLE test2 SET('format-version'='2');SET table.exec.iceberg.use-flip27-source = true;SELECT * FROM test2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s')*/ ;
Iceberg流式更新目前只支持Append的数据,不支持更新删除
0102 Spark使用
Spark不是使用正常的业务SQL语句,而是在表后面加一个.changes
SELECT * FROM test2.changes ;1 2022-11-20 INSERT 0 3417478502276420225
5 2022-12-13 INSERT 16 8632592143956424357
1 2022-11-12 DELETE 4 8089565695180123096
1 2022-01-01 DELETE 13 7376527066811164734
1 2022-12-12 DELETE 8 1562898119085686311
7 2022-12-13 INSERT 18 7329729628749942729
02 Flink流式更新
Iceberg Flink FLIP-27实现当中介绍了流式更新时Iceberg相关的接口和流程,主要涉及分片相关的内容,就是获取文件列表的过程,流程中只读取了Append的数据文件
此章节结合FlinkSQL ChangeLog相关内容,看GenericRowData的RowKind产生过程
0201 初始化数据类型
参考kafka,追踪IcebergSourceRecordEmitter,发现没有做数据转换,直接做了数据转发
public void emitRecord(RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {output.collect(element.record());split.updatePosition(element.fileOffset(), element.recordOffset());
}
数据格式的构建在更前面读数据的时候就完成了,读数据的核心逻辑在DataIterator
private void updateCurrentIterator() {try {while (!currentIterator.hasNext() && tasks.hasNext()) {currentIterator.close();currentIterator = openTaskIterator(tasks.next());fileOffset += 1;recordOffset = 0L;}} catch (IOException e) {throw new UncheckedIOException(e);}}
}
主要的功能类就是currentIterator,实现类为RowDataFileScanTaskReader,最终调用下一层iterator,下一层的实现类根据文件类型不同,parquet的实现类为ParquetReader,在next中读取数据
public T next() {if (valuesRead >= nextRowGroupStart) {advance();}if (reuseContainers) {this.last = model.read(last);} else {this.last = model.read(null);}valuesRead += 1;return last;
}
model实现类为ParquetValueReaders
public final T read(T reuse) {I intermediate = newStructData(reuse);for (int i = 0; i < readers.length; i += 1) {set(intermediate, i, readers[i].read(get(intermediate, i)));// setters[i].set(intermediate, i, get(intermediate, i));}return buildStruct(intermediate);
}
newStructData构建数据,创建了GenericRowData
protected GenericRowData newStructData(RowData reuse) {if (reuse instanceof GenericRowData) {return (GenericRowData) reuse;} else {return new GenericRowData(numFields);}
}
0202 支持更新类型
FlinkSQL ChangeLog中明确了数据源支持类型由ScanTableSource中定义,Iceberg的实现类是IcebergTableSource,支持支insert,因此不走Flink的ChangelogNormalize流程
public ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly();
}
03 Spark Changelog
0301 SparkChangelogTable
Spark的changelog有一个专门的处理类SparkChangelogTable,根据第一节中的用法,它需要对应在表名后面加上.changes
public class SparkChangelogTable implements Table, SupportsRead, SupportsMetadataColumns {public static final String TABLE_NAME = "changes";
创建了一个Changelog的TableScan
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {if (refreshEagerly) {icebergTable.refresh();}return new SparkScanBuilder(spark(), icebergTable, changelogSchema(), options) {@Overridepublic Scan build() {return buildChangelogScan();}};
}
buildChangelogScan当中创建了SparkChangelogScan
IncrementalChangelogScan scan =table.newIncrementalChangelogScan().caseSensitive(caseSensitive).filter(filterExpression()).project(expectedSchema);return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
IncrementalChangelogScan是其中的核心,BaseTable提供了创建接口;BaseIncrementalChangelogScan是Spark调用的,IncrementalAppendScan是Flink调用的(参看Iceberg Flink FLIP-27)
public IncrementalAppendScan newIncrementalAppendScan() {return new BaseIncrementalAppendScan(this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build());
}@Override
public IncrementalChangelogScan newIncrementalChangelogScan() {return new BaseIncrementalChangelogScan(this);
}
0302 BaseIncrementalChangelogScan
略过其他调用流程,其构建changelog分片的核心是doPlanFiles接口,接口输入是起始和结尾的SnapshotId
protected CloseableIterable<ChangelogScanTask> doPlanFiles(Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
- snapshot排序
首先是对Snapshot进行排序,基本处理就是遍历,遍历的过程中判断如果有delete类型的就异常
private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) {Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {if (!snapshot.operation().equals(DataOperations.REPLACE)) {if (snapshot.deleteManifests(table().io()).size() > 0) {throw new UnsupportedOperationException("Delete files are currently not supported in changelog scans");}changelogSnapshots.addFirst(snapshot);}}return changelogSnapshots;
}
deleteManifests就是删除类型的Manifests,由ManifestFile的content字段决定
Types.NestedField MANIFEST_CONTENT =optional(517, "content", Types.IntegerType.get(), "Contents of the manifest: 0=data, 1=deletes");
insert into、overwrite、delete三种操作出来的ManifestFile的字段值都是0也就是data(这应该是读写模式的原因,默认是copy-on-write)
- ManifestFile列表
之后根据snapshot范围获取ManifestFile的文件列表,这一步并没有什么特殊的地方
Set<ManifestFile> newDataManifests =FluentIterable.from(changelogSnapshots).transformAndConcat(snapshot -> snapshot.dataManifests(table().io())).filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())).toSet();
- ManifestGroup
这一步也没有什么特殊的,就是根据文件分片的通用接口进行构建和扫描
ManifestGroup manifestGroup =new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()).specsById(table().specs()).caseSensitive(isCaseSensitive()).select(scanColumns()).filterData(filter()).filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())).ignoreExisting();return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
0303 输出格式
输出结果如上,输出的字段由schema定义,在SparkChangelogTable当中
public StructType schema() {if (lazyTableSparkType == null) {this.lazyTableSparkType = SparkSchemaUtil.convert(changelogSchema());}return lazyTableSparkType;
}
最终到ChangelogUtil当中定义,就是数据字段和CHANGELOG_METADATA联合
public static Schema changelogSchema(Schema tableSchema) {return TypeUtil.join(tableSchema, CHANGELOG_METADATA);
}
CHANGELOG_METADATA的定义为
private static final Schema CHANGELOG_METADATA =new Schema(CHANGE_TYPE, CHANGE_ORDINAL, COMMIT_SNAPSHOT_ID);
各个字段的意义为
public static final NestedField CHANGE_TYPE =NestedField.required(Integer.MAX_VALUE - 104,"_change_type",Types.StringType.get(),"Record type in changelog");
public static final NestedField CHANGE_ORDINAL =NestedField.optional(Integer.MAX_VALUE - 105,"_change_ordinal",Types.IntegerType.get(),"Change ordinal in changelog");
public static final NestedField COMMIT_SNAPSHOT_ID =NestedField.optional(Integer.MAX_VALUE - 106,"_commit_snapshot_id",Types.LongType.get(),"Commit snapshot ID");
操作示例
insert into sampleSpark values (1, 'name1');
INSERT OVERWRITE sampleSpark values (1, 'name2');
delete from sampleSpark where id=1;
结果示例
1 name2 INSERT 1 6345233160542557791
1 name2 DELETE 2 6627122686657309022
1 name1 DELETE 1 6345233160542557791
1 name1 INSERT 0 3035871833168296964
0304 SparkChangelogScan
文件扫描落在SparkChangelogScan,创建SparkBatch
public Batch toBatch() {return new SparkBatch(sparkContext,table,readConf,EMPTY_GROUPING_KEY_TYPE,taskGroups(),expectedSchema,hashCode());
}
SparkBatch当中创建PartitionReaderFactory
public PartitionReaderFactory createReaderFactory() {if (useParquetBatchReads()) {int batchSize = readConf.parquetBatchSize();return new SparkColumnarReaderFactory(batchSize);} else if (useOrcBatchReads()) {int batchSize = readConf.orcBatchSize();return new SparkColumnarReaderFactory(batchSize);} else {return new SparkRowReaderFactory();}
}
前两者都是要启用对应文件读取器的向量化功能,普通设置走最后的分支
// conditions for using Parquet batch reads:
// - Parquet vectorization is enabled
// - at least one column is projected
// - only primitives are projected
// - all tasks are of FileScanTask type and read only Parquet files
private boolean useParquetBatchReads() {return readConf.parquetVectorizationEnabled()&& expectedSchema.columns().size() > 0&& expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType())&& taskGroups.stream().allMatch(this::supportsParquetBatchReads);
}
在PartitionReaderFactory当中创建对应的PartitionReader,分支走changelog
} else if (partition.allTasksOfType(ChangelogScanTask.class)) {return new ChangelogRowReader(partition);}
0305 ChangelogRowReader
ChangelogRowReader是实际的读取类,由数据读取器BaseReader调用,对应就是下面的open调用
this.currentIterator.close();
this.currentTask = taskT;
this.currentIterator = open(currentTask);
return true;
数据上设置类型(insert、delete)就是在open当中设置的,这里出来的数据就是带上insert和delete的
protected CloseableIterator<InternalRow> open(ChangelogScanTask task) {JoinedRow cdcRow = new JoinedRow();cdcRow.withRight(changelogMetadata(task));CloseableIterable<InternalRow> rows = openChangelogScanTask(task);CloseableIterable<InternalRow> cdcRows = CloseableIterable.transform(rows, cdcRow::withLeft);return cdcRows.iterator();
}
具体的设置在cdcRow.withRight(changelogMetadata(task));这一行,changelogMetadata如下,类型由task.operation()确定
private static InternalRow changelogMetadata(ChangelogScanTask task) {InternalRow metadataRow = new GenericInternalRow(3);metadataRow.update(0, UTF8String.fromString(task.operation().name()));metadataRow.update(1, task.changeOrdinal());metadataRow.update(2, task.commitSnapshotId());return metadataRow;
}
根本上来说,入参ChangelogScanTask决定了数据是什么类型,有效的类型是AddedRowsScanTask、DeletedDataFileScanTask;对应insert和delete
private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask task) {if (task instanceof AddedRowsScanTask) {return openAddedRowsScanTask((AddedRowsScanTask) task);} else if (task instanceof DeletedRowsScanTask) {throw new UnsupportedOperationException("Deleted rows scan task is not supported yet");} else if (task instanceof DeletedDataFileScanTask) {return openDeletedDataFileScanTask((DeletedDataFileScanTask) task);} else {throw new IllegalArgumentException("Unsupported changelog scan task type: " + task.getClass().getName());}
}
这篇关于Iceberg Changelog的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!