本文主要是介绍揭秘Iceberg:数据湖新版本的高级特性全面解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
1前言
2 特性说明
2.1 Branch and Tag
2.2 Puffin format
2.3 Statistics
2.4View
创建物化视图
查询物化视图
3应用案例
3.1 CDC数据入湖
3.2 多流数据拼接
3.3 异步索引和Z-order聚簇优化
3.4 A/B测试
3.5 多租户访问控制
1 前言
以下的讨论都基于iceberg1.1.0版本
2 特性说明
2.1 Branch and Tag
支持对表创建分支和标签,方便数据管理和访问。可以在特定快照上创建分支或标签,并设置保留时间。读写可以指定分支。
2.2 Puffin format
Puffin( ['pʌfɪn]) format是Iceberg引入的一种新的文件格式,主要用于存储那些无法直接存储在Iceberg清单(manifest)文件中的数据,如表的索引和统计信息。
Puffin format的设计目标是:
独立性:Puffin文件可以独立于数据文件和清单文件而存在,方便单独管理和访问。
灵活性:Puffin文件的内容和结构可以根据不同的需求而定制,如不同类型的索引(Bloom filter、B+树等)或不同粒度的统计信息(表级、分区级等)。
可扩展性:Puffin format支持动态添加新的内容类型,以支持未来的新特性和需求。
创建表时启用Puffin format
Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);PartitionSpec spec = PartitionSpec.unpartitioned();Map<String, String> properties = new HashMap<>();
properties.put(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
properties.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "134217728");// 启用Puffin format
properties.put(TableProperties.WRITE_NEW_FORMAT_ENABLED, "true");Table table = catalog.createTable(schema, spec, properties, "my_table");
假设我们有一个很大的Iceberg表,记录了某电商平台的用户订单信息,包括用户ID、商品ID、购买时间、金额等字段。现在我们要为这个表建立索引和统计信息,以优化查询性能。
- 索引:我们决定为用户ID和商品ID分别建立Bloom filter索引,以加速查询。我们可以在数据写入时动态生成这些索引,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:
Puffin File: user_id_index.puffin
Magic Number: PUFFIN
Version: 1
Index Type: BLOOM_FILTER
Column: user_id
Chunk 1: (offset: 0, length: 1024)
Chunk 2: (offset: 1024, length: 1024)
...
Bloom Filter Data: [...bytes...]
- 统计信息:我们还想为表的每个分区(如按天分区)记录一些统计信息,如行数、总金额等。这些统计信息可以帮助查询优化器生成更好的执行计划。我们可以在数据写入后异步生成这些统计信息,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:
Puffin File: partition_stats.puffin
Magic Number: PUFFIN
Version: 1
Stats Type: PARTITION_STATS
Partition 1: (name: "2022-01-01", rows: 1000000, total_amount: 1234567.89)
Partition 2: (name: "2022-01-02", rows: 1500000, total_amount: 2345678.90)
...
有了这些索引和统计信息,我们就可以更高效地查询数据。例如,如果我们要查询某个用户在某个时间段内的订单,可以先通过Bloom filter索引快速过滤出可能的数据文件,然后再根据分区统计信息选择最优的查询路径
2.3 Statistics
Statistics如果帮助提高CBO(Cost-Based Optimizer)
基数估计:
- 诸如行数、不同值计数和空值计数等统计信息有助于估计表和中间结果的基数。
- 基数估计对于预测查询结果的大小以及优化连接顺序、连接算法和数据访问路径至关重要。
- 准确的基数估计可以得出更精确的成本计算结果和更好的执行计划。
选择性估计:
- 列值的统计信息(如最小值、最大值和直方图)可以估计谓词的选择性。
- 选择性是指满足给定谓词或条件的行的比例。
- CBO使用选择性估计来确定过滤器、索引和连接条件的有效性。
- 更准确的选择性估计有助于CBO选择最有效的过滤器和访问路径。
数据倾斜检测
索引选择
分区裁剪
连接算法选择
资源估计
写入数据时生成索引和统计信息
DataFile dataFile = DataFiles.builder(spec).withPath("/path/to/data-1.parquet").withFileSizeInBytes(1234).withRecordCount(100).build();// 生成Bloom filter索引
BloomFilter bloomFilter = new BloomFilter(1000, 0.01);
bloomFilter.add(1);
bloomFilter.add(2);
// ...// 生成统计信息
Map<Integer, Long> columnSizes = new HashMap<>();
columnSizes.put(1, 100L);
columnSizes.put(2, 200L);
// ...StatisticsFile statisticsFile = StatisticsFiles.builder().withPath("/path/to/stats-1.puffin").withBloomFilter(1, bloomFilter).withColumnSizes(columnSizes).build();table.newAppend().appendFile(dataFile).appendStatistics(statisticsFile).commit();
查询数据时使用索引和统计信息
Table table = catalog.loadTable("my_table");// 读取索引和统计信息
StatisticsFile statisticsFile = table.currentSnapshot().statisticsFile();
BloomFilter bloomFilter = statisticsFile.bloomFilter(1);
Map<Integer, Long> columnSizes = statisticsFile.columnSizes();// 使用索引和统计信息进行查询优化
CloseableIterable<Record> reader = IcebergGenerics.read(table).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).filter(Expressions.equal("id", 1)).caseSensitive(true).useBloomFilter(1, bloomFilter).build()).build();
在查询数据时,可以通过table.currentSnapshot().statisticsFile()
读取当前快照对应的索引和统计信息文件,然后在创建CloseableIterable
读取器时,通过useBloomFilter
等方法应用这些索引和统计信息,以优化查询性能。
以上就是在Java中使用Iceberg的Puffin format和Statistics的基本流程。当然,具体的实现还涉及到更多的细节和优化,如索引和统计信息的增量更新、并发控制等,需要根据实际的应用场景来设计。
2.4View
view可以很好地支持物化视图场景,通过预先计算和存储数据来加速查询。
创建物化视图
// 定义基表
Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table baseTable = catalog.createTable(schema, spec, "base_table");// 定义物化视图
String viewQuery = "SELECT id, count(*) as cnt FROM base_table GROUP BY id";
Map<String, String> properties = new HashMap<>();
properties.put("format-version", "2");
properties.put("location", "/path/to/view");//将刷新模式设置为periodic(定期刷新),刷新间隔设置为1h(1小时)。Iceberg会在后台自动管理物化视图的刷新,无需手动调用refresh方法。
properties.put("iceberg.view.refresh.mode", "periodic");
properties.put("iceberg.view.refresh.period", "1h");
Table materializedView = catalog.createView(viewQuery, Collections.emptyList(), properties, "my_view");
这个例子中,我们首先创建了一个基表base_table
,然后定义了一个聚合查询语句作为物化视图my_view
的定义。注意,在创建视图时,我们需要设置一些属性,如format-version
和location
,以指定视图的存储格式和位置。
查询物化视图
// 查询物化视图
try (CloseableIterable<Record> reader = IcebergGenerics.read(materializedView).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).build()) {for (Record record : reader) {System.out.println(record);}
}
查询物化视图与查询普通表的方式类似,Iceberg会自动从视图的存储位置读取预先计算好的数据,从而提供更快的查询速度。
3应用案例
3.1 CDC数据入湖
// 创建主分支和变更分支
Branch mainBranch = table.mainBranch();
Branch changeBranch = mainBranch.createBranch("change_branch_" + System.currentTimeMillis());// 将CDC事件写入变更分支
List<Record> cdcEvents = Lists.newArrayList(new Record(1L, "update", "a", "a1"),new Record(2L, "insert", "b", null),new Record(3L, "delete", "c", null)
);try (CloseableIterable<Record> writer = IcebergGenerics.write(changeBranch).createWriterFunc(GenericParquetWriter::buildWriter).build()) {writer.forEach(cdcEvents::add);
}// 生成变更日志
Table changeTable = changeBranch.table();
Table changeLogTable = changeTable.newAppend().appendFile(changeTable.currentSnapshot().addedFiles()).commit();// 将变更分支快照合并到主分支
mainBranch.cherryPick(changeBranch.snapshot()).commit();
在这个例子中,我们首先创建了一个变更分支change_branch
,用于存储CDC事件。然后,我们将CDC事件写入变更分支,并基于变更分支生成了一个变更日志表changeLogTable
。最后,我们使用cherryPick
方法将变更分支的快照合并到主分支,从而完成了CDC数据入湖的过程。
3.2 多流数据拼接
-- 定义基表
CREATE TABLE my_table (key1 INTEGER,key2 INTEGER,b0_new INTEGER,c0_new INTEGER,d0_new INTEGER,e0_new INTEGER,c1_new INTEGER,d1_new INTEGER,c2_new INTEGER,d2_new INTEGER
);-- 创建主分支和两个增量分支
CREATE BRANCH my_table_main;
CREATE BRANCH my_table_inc1;
CREATE BRANCH my_table_inc2;-- 给增量分支1写入部分数据
MERGE INTO my_table my_table_inc1USING (SELECT 'A' AS key1, 'B' AS key2, 'b0_new' AS b0_new, 'c0_new' AS c0_new) AS inputON my_table_inc1.key1 = input.key1 AND my_table_inc1.key2 = input.key2 WHEN MATCHED THENUPDATE SET b0_new = input.b0_new, c0_new = input.c0_newWHEN NOT MATCHED THEN INSERT (key1, key2, b0_new, c0_new)VALUES (input.key1, input.key2, input.b0_new, input.c0_new);-- 给增量分支2写入部分数据
MERGE INTO my_table my_table_inc2USING (SELECT 'A' AS key1, 'D' AS key2, 'e0_new' AS e0_new) AS inputON my_table_inc2.key1 = input.key1 AND my_table_inc2.key2 = input.key2WHEN MATCHED THENUPDATE SET e0_new = input.e0_new WHEN NOT MATCHED THENINSERT (key1, key2, e0_new)VALUES (input.key1, input.key2, input.e0_new);-- 把增量数据合并到主分支,分区键和聚簇键保持不变
MERGE INTO my_table my_table_mainUSING my_table_inc1 ON my_table_main.key1 = my_table_inc1.key1 AND my_table_main.key2 = my_table_inc1.key2WHEN MATCHED AND my_table_inc1.b0_new IS NOT NULL THEN UPDATE SET b0_new = my_table_inc1.b0_new, c0_new = my_table_inc1.c0_newWHEN NOT MATCHED THENINSERT *;MERGE INTO my_table my_table_main USING my_table_inc2ON my_table_main.key1 = my_table_inc2.key1 AND my_table_main.key2 = my_table_inc2.key2WHEN MATCHED AND my_table_inc2.e0_new IS NOT NULL THENUPDATE SET e0_new = my_table_inc2.e0_new WHEN NOT MATCHED THENINSERT *;
-
首先定义了一个基表my_table,包含了所有需要的列。
-
然后创建了一个主分支和两个增量分支。
-
分别向两个增量分支写入部分变更数据。
-
最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。
-
首先定义了一个基表my_table,包含了所有需要的列。
-
然后创建了一个主分支和两个增量分支。
-
分别向两个增量分支写入部分变更数据。
-
最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。
3.3 异步索引和Z-order聚簇优化
使用异步索引和Z-order聚簇优化,在分支上验证效果,支持A/B测试
// 创建优化分支
Branch optimizedBranch = mainBranch.createBranch("optimized_branch_" + System.currentTimeMillis());// 异步构建Bloom filter索引
BloomFilterIndex bloomFilterIndex = BloomFilterIndex.builderFor(optimizedBranch).withColumnNames("id").withExpectedEntries(1000000).withFalsePositiveProbability(0.01).build();// 异步构建Z-order索引
ZOrderIndex zOrderIndex = ZOrderIndex.builderFor(optimizedBranch).withColumnNames("ts", "id").build();IndexTasks.Builder indexBuilder = IndexTasks.builderFor(optimizedBranch).addIndex(bloomFilterIndex).addIndex(zOrderIndex).withMaxDurationInMs(30 * 60 * 1000); // 30 minutes timeoutIndexTasks indexTasks = indexBuilder.build();
indexTasks.executeAsync();// 使用Z-order聚簇优化数据布局
ClusteringOrder clusteringOrder = ClusteringOrder.builderFor(optimizedBranch).asc("ts").asc("id").build();optimizedBranch.replaceSortOrder(clusteringOrder).commit();
在这个例子中,我们首先创建了一个优化分支optimized_branch
,用于尝试不同的优化策略。然后,我们异步构建了Bloom filter索引和Z-order索引,以加速查询。同时,我们还使用了Z-order聚簇来优化数据布局,使得相关数据在存储上更加紧凑。
3.4 A/B测试
// 在优化分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(optimizedBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).planWithBloomFilter(true).planWithZOrder(true).build()) {// ...
}// 在主分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(mainBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).build()) {// ...
}
为了验证优化策略的效果,我们可以在优化分支和主分支上分别执行相同的查询,并比较它们的性能指标(如查询延迟、资源消耗等)。这就是一种简单的A/B测试方法。如果优化分支的性能明显优于主分支,我们就可以将优化分支的变更合并到主分支。
3.5 多租户访问控制
// 创建部门视图
SQL.execute("CREATE VIEW dept_view_1 AS SELECT id, data FROM my_table WHERE dept = 'dept1'");
SQL.execute("CREATE VIEW dept_view_2 AS SELECT id, ts FROM my_table WHERE dept = 'dept2'");// 为不同部门授权视图
SQL.execute("GRANT SELECT ON dept_view_1 TO ROLE dept1_role");
SQL.execute("GRANT SELECT ON dept_view_2 TO ROLE dept2_role");
对于多租户表,不同部门可能关注不同的列。为了避免直接访问全表,我们可以为每个部门创建一个授权视图,只暴露部门关注的列。然后,我们可以使用Iceberg的SQL扩展来管理视图的权限,确保每个部门只能访问自己被授权的视图。
这篇关于揭秘Iceberg:数据湖新版本的高级特性全面解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!