揭秘Iceberg:数据湖新版本的高级特性全面解析

2024-08-21 04:12

本文主要是介绍揭秘Iceberg:数据湖新版本的高级特性全面解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1前言

2 特性说明

2.1 Branch and Tag

2.2 Puffin format

2.3 Statistics 

2.4View

创建物化视图

查询物化视图

3应用案例

3.1 CDC数据入湖

3.2 多流数据拼接

3.3 异步索引和Z-order聚簇优化

3.4 A/B测试

3.5 多租户访问控制


1 前言

以下的讨论都基于iceberg1.1.0版本

2 特性说明

2.1 Branch and Tag

支持对表创建分支和标签,方便数据管理和访问。可以在特定快照上创建分支或标签,并设置保留时间。读写可以指定分支。

2.2 Puffin format

Puffin( ['pʌfɪn])  format是Iceberg引入的一种新的文件格式,主要用于存储那些无法直接存储在Iceberg清单(manifest)文件中的数据,如表的索引和统计信息。 

Puffin format的设计目标是:

  1. 独立性:Puffin文件可以独立于数据文件和清单文件而存在,方便单独管理和访问。

  2. 灵活性:Puffin文件的内容和结构可以根据不同的需求而定制,如不同类型的索引(Bloom filter、B+树等)或不同粒度的统计信息(表级、分区级等)。

  3. 可扩展性:Puffin format支持动态添加新的内容类型,以支持未来的新特性和需求。

 创建表时启用Puffin format

Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);PartitionSpec spec = PartitionSpec.unpartitioned();Map<String, String> properties = new HashMap<>();
properties.put(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
properties.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "134217728");// 启用Puffin format
properties.put(TableProperties.WRITE_NEW_FORMAT_ENABLED, "true");Table table = catalog.createTable(schema, spec, properties, "my_table");
Puffin Statics社区说明

假设我们有一个很大的Iceberg表,记录了某电商平台的用户订单信息,包括用户ID、商品ID、购买时间、金额等字段。现在我们要为这个表建立索引和统计信息,以优化查询性能。

  • 索引:我们决定为用户ID和商品ID分别建立Bloom filter索引,以加速查询。我们可以在数据写入时动态生成这些索引,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:

Puffin File: user_id_index.puffin
Magic Number: PUFFIN
Version: 1
Index Type: BLOOM_FILTER
Column: user_id
Chunk 1: (offset: 0, length: 1024)
Chunk 2: (offset: 1024, length: 1024)
...
Bloom Filter Data: [...bytes...]

  • 统计信息:我们还想为表的每个分区(如按天分区)记录一些统计信息,如行数、总金额等。这些统计信息可以帮助查询优化器生成更好的执行计划。我们可以在数据写入后异步生成这些统计信息,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:

Puffin File: partition_stats.puffin
Magic Number: PUFFIN
Version: 1
Stats Type: PARTITION_STATS
Partition 1: (name: "2022-01-01", rows: 1000000, total_amount: 1234567.89)
Partition 2: (name: "2022-01-02", rows: 1500000, total_amount: 2345678.90)
...

有了这些索引和统计信息,我们就可以更高效地查询数据。例如,如果我们要查询某个用户在某个时间段内的订单,可以先通过Bloom filter索引快速过滤出可能的数据文件,然后再根据分区统计信息选择最优的查询路径

2.3 Statistics 

Statistics如果帮助提高CBO(Cost-Based Optimizer)

  1. 基数估计:

    • 诸如行数、不同值计数和空值计数等统计信息有助于估计表和中间结果的基数。
    • 基数估计对于预测查询结果的大小以及优化连接顺序、连接算法和数据访问路径至关重要。
    • 准确的基数估计可以得出更精确的成本计算结果和更好的执行计划。
  2. 选择性估计:

    • 列值的统计信息(如最小值、最大值和直方图)可以估计谓词的选择性。
    • 选择性是指满足给定谓词或条件的行的比例。
    • CBO使用选择性估计来确定过滤器、索引和连接条件的有效性。
    • 更准确的选择性估计有助于CBO选择最有效的过滤器和访问路径。
  3. 数据倾斜检测

  4. 索引选择

  5. 分区裁剪

  6. 连接算法选择

  7. 资源估计

写入数据时生成索引和统计信息

DataFile dataFile = DataFiles.builder(spec).withPath("/path/to/data-1.parquet").withFileSizeInBytes(1234).withRecordCount(100).build();// 生成Bloom filter索引
BloomFilter bloomFilter = new BloomFilter(1000, 0.01);
bloomFilter.add(1);
bloomFilter.add(2);
// ...// 生成统计信息
Map<Integer, Long> columnSizes = new HashMap<>();
columnSizes.put(1, 100L);
columnSizes.put(2, 200L);
// ...StatisticsFile statisticsFile = StatisticsFiles.builder().withPath("/path/to/stats-1.puffin").withBloomFilter(1, bloomFilter).withColumnSizes(columnSizes).build();table.newAppend().appendFile(dataFile).appendStatistics(statisticsFile).commit();

查询数据时使用索引和统计信息 

Table table = catalog.loadTable("my_table");// 读取索引和统计信息
StatisticsFile statisticsFile = table.currentSnapshot().statisticsFile();
BloomFilter bloomFilter = statisticsFile.bloomFilter(1);
Map<Integer, Long> columnSizes = statisticsFile.columnSizes();// 使用索引和统计信息进行查询优化
CloseableIterable<Record> reader = IcebergGenerics.read(table).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).filter(Expressions.equal("id", 1)).caseSensitive(true).useBloomFilter(1, bloomFilter).build()).build();

 在查询数据时,可以通过table.currentSnapshot().statisticsFile()读取当前快照对应的索引和统计信息文件,然后在创建CloseableIterable读取器时,通过useBloomFilter等方法应用这些索引和统计信息,以优化查询性能。

以上就是在Java中使用Iceberg的Puffin format和Statistics的基本流程。当然,具体的实现还涉及到更多的细节和优化,如索引和统计信息的增量更新、并发控制等,需要根据实际的应用场景来设计。

2.4View

view可以很好地支持物化视图场景,通过预先计算和存储数据来加速查询。

创建物化视图

// 定义基表
Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table baseTable = catalog.createTable(schema, spec, "base_table");// 定义物化视图
String viewQuery = "SELECT id, count(*) as cnt FROM base_table GROUP BY id";
Map<String, String> properties = new HashMap<>();
properties.put("format-version", "2");
properties.put("location", "/path/to/view");//将刷新模式设置为periodic(定期刷新),刷新间隔设置为1h(1小时)。Iceberg会在后台自动管理物化视图的刷新,无需手动调用refresh方法。
properties.put("iceberg.view.refresh.mode", "periodic");
properties.put("iceberg.view.refresh.period", "1h");
Table materializedView = catalog.createView(viewQuery, Collections.emptyList(), properties, "my_view");

这个例子中,我们首先创建了一个基表base_table,然后定义了一个聚合查询语句作为物化视图my_view的定义。注意,在创建视图时,我们需要设置一些属性,如format-versionlocation,以指定视图的存储格式和位置。

查询物化视图

// 查询物化视图
try (CloseableIterable<Record> reader = IcebergGenerics.read(materializedView).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).build()) {for (Record record : reader) {System.out.println(record);}
}

查询物化视图与查询普通表的方式类似,Iceberg会自动从视图的存储位置读取预先计算好的数据,从而提供更快的查询速度。

3应用案例

3.1 CDC数据入湖

// 创建主分支和变更分支
Branch mainBranch = table.mainBranch();
Branch changeBranch = mainBranch.createBranch("change_branch_" + System.currentTimeMillis());// 将CDC事件写入变更分支
List<Record> cdcEvents = Lists.newArrayList(new Record(1L, "update", "a", "a1"),new Record(2L, "insert", "b", null),new Record(3L, "delete", "c", null)
);try (CloseableIterable<Record> writer = IcebergGenerics.write(changeBranch).createWriterFunc(GenericParquetWriter::buildWriter).build()) {writer.forEach(cdcEvents::add);
}// 生成变更日志
Table changeTable = changeBranch.table();
Table changeLogTable = changeTable.newAppend().appendFile(changeTable.currentSnapshot().addedFiles()).commit();// 将变更分支快照合并到主分支
mainBranch.cherryPick(changeBranch.snapshot()).commit();

在这个例子中,我们首先创建了一个变更分支change_branch,用于存储CDC事件。然后,我们将CDC事件写入变更分支,并基于变更分支生成了一个变更日志表changeLogTable。最后,我们使用cherryPick方法将变更分支的快照合并到主分支,从而完成了CDC数据入湖的过程。 

3.2 多流数据拼接

-- 定义基表
CREATE TABLE my_table (key1 INTEGER,key2 INTEGER,b0_new INTEGER,c0_new INTEGER,d0_new INTEGER,e0_new INTEGER,c1_new INTEGER,d1_new INTEGER,c2_new INTEGER,d2_new INTEGER
);-- 创建主分支和两个增量分支
CREATE BRANCH my_table_main;
CREATE BRANCH my_table_inc1;
CREATE BRANCH my_table_inc2;-- 给增量分支1写入部分数据
MERGE INTO my_table my_table_inc1USING (SELECT 'A' AS key1, 'B' AS key2, 'b0_new' AS b0_new, 'c0_new' AS c0_new) AS inputON my_table_inc1.key1 = input.key1 AND my_table_inc1.key2 = input.key2  WHEN MATCHED THENUPDATE SET b0_new = input.b0_new, c0_new = input.c0_newWHEN NOT MATCHED THEN  INSERT (key1, key2, b0_new, c0_new)VALUES (input.key1, input.key2, input.b0_new, input.c0_new);-- 给增量分支2写入部分数据  
MERGE INTO my_table my_table_inc2USING (SELECT 'A' AS key1, 'D' AS key2, 'e0_new' AS e0_new) AS inputON my_table_inc2.key1 = input.key1 AND my_table_inc2.key2 = input.key2WHEN MATCHED THENUPDATE SET e0_new = input.e0_new  WHEN NOT MATCHED THENINSERT (key1, key2, e0_new)VALUES (input.key1, input.key2, input.e0_new);-- 把增量数据合并到主分支,分区键和聚簇键保持不变
MERGE INTO my_table my_table_mainUSING my_table_inc1 ON my_table_main.key1 = my_table_inc1.key1 AND my_table_main.key2 = my_table_inc1.key2WHEN MATCHED AND my_table_inc1.b0_new IS NOT NULL THEN  UPDATE SET b0_new = my_table_inc1.b0_new, c0_new = my_table_inc1.c0_newWHEN NOT MATCHED THENINSERT *;MERGE INTO my_table my_table_main  USING my_table_inc2ON my_table_main.key1 = my_table_inc2.key1 AND my_table_main.key2 = my_table_inc2.key2WHEN MATCHED AND my_table_inc2.e0_new IS NOT NULL THENUPDATE SET e0_new = my_table_inc2.e0_new  WHEN NOT MATCHED THENINSERT *;
  1. 首先定义了一个基表my_table,包含了所有需要的列。

  2. 然后创建了一个主分支和两个增量分支。

  3. 分别向两个增量分支写入部分变更数据。

  4. 最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。

  5. 首先定义了一个基表my_table,包含了所有需要的列。

  6. 然后创建了一个主分支和两个增量分支。

  7. 分别向两个增量分支写入部分变更数据。

  8. 最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。

3.3 异步索引和Z-order聚簇优化

使用异步索引和Z-order聚簇优化,在分支上验证效果,支持A/B测试

// 创建优化分支
Branch optimizedBranch = mainBranch.createBranch("optimized_branch_" + System.currentTimeMillis());// 异步构建Bloom filter索引
BloomFilterIndex bloomFilterIndex = BloomFilterIndex.builderFor(optimizedBranch).withColumnNames("id").withExpectedEntries(1000000).withFalsePositiveProbability(0.01).build();// 异步构建Z-order索引
ZOrderIndex zOrderIndex = ZOrderIndex.builderFor(optimizedBranch).withColumnNames("ts", "id").build();IndexTasks.Builder indexBuilder = IndexTasks.builderFor(optimizedBranch).addIndex(bloomFilterIndex).addIndex(zOrderIndex).withMaxDurationInMs(30 * 60 * 1000); // 30 minutes timeoutIndexTasks indexTasks = indexBuilder.build();
indexTasks.executeAsync();// 使用Z-order聚簇优化数据布局
ClusteringOrder clusteringOrder = ClusteringOrder.builderFor(optimizedBranch).asc("ts").asc("id").build();optimizedBranch.replaceSortOrder(clusteringOrder).commit();

在这个例子中,我们首先创建了一个优化分支optimized_branch,用于尝试不同的优化策略。然后,我们异步构建了Bloom filter索引和Z-order索引,以加速查询。同时,我们还使用了Z-order聚簇来优化数据布局,使得相关数据在存储上更加紧凑。

3.4 A/B测试

// 在优化分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(optimizedBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).planWithBloomFilter(true).planWithZOrder(true).build()) {// ...
}// 在主分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(mainBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).build()) {// ...
}

为了验证优化策略的效果,我们可以在优化分支和主分支上分别执行相同的查询,并比较它们的性能指标(如查询延迟、资源消耗等)。这就是一种简单的A/B测试方法。如果优化分支的性能明显优于主分支,我们就可以将优化分支的变更合并到主分支。

3.5 多租户访问控制

// 创建部门视图
SQL.execute("CREATE VIEW dept_view_1 AS SELECT id, data FROM my_table WHERE dept = 'dept1'");
SQL.execute("CREATE VIEW dept_view_2 AS SELECT id, ts FROM my_table WHERE dept = 'dept2'");// 为不同部门授权视图
SQL.execute("GRANT SELECT ON dept_view_1 TO ROLE dept1_role");
SQL.execute("GRANT SELECT ON dept_view_2 TO ROLE dept2_role");

对于多租户表,不同部门可能关注不同的列。为了避免直接访问全表,我们可以为每个部门创建一个授权视图,只暴露部门关注的列。然后,我们可以使用Iceberg的SQL扩展来管理视图的权限,确保每个部门只能访问自己被授权的视图。 

这篇关于揭秘Iceberg:数据湖新版本的高级特性全面解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1092085

相关文章

nginx -t、nginx -s stop 和 nginx -s reload 命令的详细解析(结合应用场景)

《nginx-t、nginx-sstop和nginx-sreload命令的详细解析(结合应用场景)》本文解析Nginx的-t、-sstop、-sreload命令,分别用于配置语法检... 以下是关于 nginx -t、nginx -s stop 和 nginx -s reload 命令的详细解析,结合实际应

MyBatis中$与#的区别解析

《MyBatis中$与#的区别解析》文章浏览阅读314次,点赞4次,收藏6次。MyBatis使用#{}作为参数占位符时,会创建预处理语句(PreparedStatement),并将参数值作为预处理语句... 目录一、介绍二、sql注入风险实例一、介绍#(井号):MyBATis使用#{}作为参数占位符时,会

全面掌握 SQL 中的 DATEDIFF函数及用法最佳实践

《全面掌握SQL中的DATEDIFF函数及用法最佳实践》本文解析DATEDIFF在不同数据库中的差异,强调其边界计算原理,探讨应用场景及陷阱,推荐根据需求选择TIMESTAMPDIFF或inte... 目录1. 核心概念:DATEDIFF 究竟在计算什么?2. 主流数据库中的 DATEDIFF 实现2.1

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

Java操作Word文档的全面指南

《Java操作Word文档的全面指南》在Java开发中,操作Word文档是常见的业务需求,广泛应用于合同生成、报表输出、通知发布、法律文书生成、病历模板填写等场景,本文将全面介绍Java操作Word文... 目录简介段落页头与页脚页码表格图片批注文本框目录图表简介Word编程最重要的类是org.apach

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

Python中你不知道的gzip高级用法分享

《Python中你不知道的gzip高级用法分享》在当今大数据时代,数据存储和传输成本已成为每个开发者必须考虑的问题,Python内置的gzip模块提供了一种简单高效的解决方案,下面小编就来和大家详细讲... 目录前言:为什么数据压缩如此重要1. gzip 模块基础介绍2. 基本压缩与解压缩操作2.1 压缩文

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

深度解析Java DTO(最新推荐)

《深度解析JavaDTO(最新推荐)》DTO(DataTransferObject)是一种用于在不同层(如Controller层、Service层)之间传输数据的对象设计模式,其核心目的是封装数据,... 目录一、什么是DTO?DTO的核心特点:二、为什么需要DTO?(对比Entity)三、实际应用场景解析