揭秘Iceberg:数据湖新版本的高级特性全面解析

2024-08-21 04:12

本文主要是介绍揭秘Iceberg:数据湖新版本的高级特性全面解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1前言

2 特性说明

2.1 Branch and Tag

2.2 Puffin format

2.3 Statistics 

2.4View

创建物化视图

查询物化视图

3应用案例

3.1 CDC数据入湖

3.2 多流数据拼接

3.3 异步索引和Z-order聚簇优化

3.4 A/B测试

3.5 多租户访问控制


1 前言

以下的讨论都基于iceberg1.1.0版本

2 特性说明

2.1 Branch and Tag

支持对表创建分支和标签,方便数据管理和访问。可以在特定快照上创建分支或标签,并设置保留时间。读写可以指定分支。

2.2 Puffin format

Puffin( ['pʌfɪn])  format是Iceberg引入的一种新的文件格式,主要用于存储那些无法直接存储在Iceberg清单(manifest)文件中的数据,如表的索引和统计信息。 

Puffin format的设计目标是:

  1. 独立性:Puffin文件可以独立于数据文件和清单文件而存在,方便单独管理和访问。

  2. 灵活性:Puffin文件的内容和结构可以根据不同的需求而定制,如不同类型的索引(Bloom filter、B+树等)或不同粒度的统计信息(表级、分区级等)。

  3. 可扩展性:Puffin format支持动态添加新的内容类型,以支持未来的新特性和需求。

 创建表时启用Puffin format

Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);PartitionSpec spec = PartitionSpec.unpartitioned();Map<String, String> properties = new HashMap<>();
properties.put(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
properties.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "134217728");// 启用Puffin format
properties.put(TableProperties.WRITE_NEW_FORMAT_ENABLED, "true");Table table = catalog.createTable(schema, spec, properties, "my_table");
Puffin Statics社区说明

假设我们有一个很大的Iceberg表,记录了某电商平台的用户订单信息,包括用户ID、商品ID、购买时间、金额等字段。现在我们要为这个表建立索引和统计信息,以优化查询性能。

  • 索引:我们决定为用户ID和商品ID分别建立Bloom filter索引,以加速查询。我们可以在数据写入时动态生成这些索引,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:

Puffin File: user_id_index.puffin
Magic Number: PUFFIN
Version: 1
Index Type: BLOOM_FILTER
Column: user_id
Chunk 1: (offset: 0, length: 1024)
Chunk 2: (offset: 1024, length: 1024)
...
Bloom Filter Data: [...bytes...]

  • 统计信息:我们还想为表的每个分区(如按天分区)记录一些统计信息,如行数、总金额等。这些统计信息可以帮助查询优化器生成更好的执行计划。我们可以在数据写入后异步生成这些统计信息,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:

Puffin File: partition_stats.puffin
Magic Number: PUFFIN
Version: 1
Stats Type: PARTITION_STATS
Partition 1: (name: "2022-01-01", rows: 1000000, total_amount: 1234567.89)
Partition 2: (name: "2022-01-02", rows: 1500000, total_amount: 2345678.90)
...

有了这些索引和统计信息,我们就可以更高效地查询数据。例如,如果我们要查询某个用户在某个时间段内的订单,可以先通过Bloom filter索引快速过滤出可能的数据文件,然后再根据分区统计信息选择最优的查询路径

2.3 Statistics 

Statistics如果帮助提高CBO(Cost-Based Optimizer)

  1. 基数估计:

    • 诸如行数、不同值计数和空值计数等统计信息有助于估计表和中间结果的基数。
    • 基数估计对于预测查询结果的大小以及优化连接顺序、连接算法和数据访问路径至关重要。
    • 准确的基数估计可以得出更精确的成本计算结果和更好的执行计划。
  2. 选择性估计:

    • 列值的统计信息(如最小值、最大值和直方图)可以估计谓词的选择性。
    • 选择性是指满足给定谓词或条件的行的比例。
    • CBO使用选择性估计来确定过滤器、索引和连接条件的有效性。
    • 更准确的选择性估计有助于CBO选择最有效的过滤器和访问路径。
  3. 数据倾斜检测

  4. 索引选择

  5. 分区裁剪

  6. 连接算法选择

  7. 资源估计

写入数据时生成索引和统计信息

DataFile dataFile = DataFiles.builder(spec).withPath("/path/to/data-1.parquet").withFileSizeInBytes(1234).withRecordCount(100).build();// 生成Bloom filter索引
BloomFilter bloomFilter = new BloomFilter(1000, 0.01);
bloomFilter.add(1);
bloomFilter.add(2);
// ...// 生成统计信息
Map<Integer, Long> columnSizes = new HashMap<>();
columnSizes.put(1, 100L);
columnSizes.put(2, 200L);
// ...StatisticsFile statisticsFile = StatisticsFiles.builder().withPath("/path/to/stats-1.puffin").withBloomFilter(1, bloomFilter).withColumnSizes(columnSizes).build();table.newAppend().appendFile(dataFile).appendStatistics(statisticsFile).commit();

查询数据时使用索引和统计信息 

Table table = catalog.loadTable("my_table");// 读取索引和统计信息
StatisticsFile statisticsFile = table.currentSnapshot().statisticsFile();
BloomFilter bloomFilter = statisticsFile.bloomFilter(1);
Map<Integer, Long> columnSizes = statisticsFile.columnSizes();// 使用索引和统计信息进行查询优化
CloseableIterable<Record> reader = IcebergGenerics.read(table).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).filter(Expressions.equal("id", 1)).caseSensitive(true).useBloomFilter(1, bloomFilter).build()).build();

 在查询数据时,可以通过table.currentSnapshot().statisticsFile()读取当前快照对应的索引和统计信息文件,然后在创建CloseableIterable读取器时,通过useBloomFilter等方法应用这些索引和统计信息,以优化查询性能。

以上就是在Java中使用Iceberg的Puffin format和Statistics的基本流程。当然,具体的实现还涉及到更多的细节和优化,如索引和统计信息的增量更新、并发控制等,需要根据实际的应用场景来设计。

2.4View

view可以很好地支持物化视图场景,通过预先计算和存储数据来加速查询。

创建物化视图

// 定义基表
Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table baseTable = catalog.createTable(schema, spec, "base_table");// 定义物化视图
String viewQuery = "SELECT id, count(*) as cnt FROM base_table GROUP BY id";
Map<String, String> properties = new HashMap<>();
properties.put("format-version", "2");
properties.put("location", "/path/to/view");//将刷新模式设置为periodic(定期刷新),刷新间隔设置为1h(1小时)。Iceberg会在后台自动管理物化视图的刷新,无需手动调用refresh方法。
properties.put("iceberg.view.refresh.mode", "periodic");
properties.put("iceberg.view.refresh.period", "1h");
Table materializedView = catalog.createView(viewQuery, Collections.emptyList(), properties, "my_view");

这个例子中,我们首先创建了一个基表base_table,然后定义了一个聚合查询语句作为物化视图my_view的定义。注意,在创建视图时,我们需要设置一些属性,如format-versionlocation,以指定视图的存储格式和位置。

查询物化视图

// 查询物化视图
try (CloseableIterable<Record> reader = IcebergGenerics.read(materializedView).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).build()) {for (Record record : reader) {System.out.println(record);}
}

查询物化视图与查询普通表的方式类似,Iceberg会自动从视图的存储位置读取预先计算好的数据,从而提供更快的查询速度。

3应用案例

3.1 CDC数据入湖

// 创建主分支和变更分支
Branch mainBranch = table.mainBranch();
Branch changeBranch = mainBranch.createBranch("change_branch_" + System.currentTimeMillis());// 将CDC事件写入变更分支
List<Record> cdcEvents = Lists.newArrayList(new Record(1L, "update", "a", "a1"),new Record(2L, "insert", "b", null),new Record(3L, "delete", "c", null)
);try (CloseableIterable<Record> writer = IcebergGenerics.write(changeBranch).createWriterFunc(GenericParquetWriter::buildWriter).build()) {writer.forEach(cdcEvents::add);
}// 生成变更日志
Table changeTable = changeBranch.table();
Table changeLogTable = changeTable.newAppend().appendFile(changeTable.currentSnapshot().addedFiles()).commit();// 将变更分支快照合并到主分支
mainBranch.cherryPick(changeBranch.snapshot()).commit();

在这个例子中,我们首先创建了一个变更分支change_branch,用于存储CDC事件。然后,我们将CDC事件写入变更分支,并基于变更分支生成了一个变更日志表changeLogTable。最后,我们使用cherryPick方法将变更分支的快照合并到主分支,从而完成了CDC数据入湖的过程。 

3.2 多流数据拼接

-- 定义基表
CREATE TABLE my_table (key1 INTEGER,key2 INTEGER,b0_new INTEGER,c0_new INTEGER,d0_new INTEGER,e0_new INTEGER,c1_new INTEGER,d1_new INTEGER,c2_new INTEGER,d2_new INTEGER
);-- 创建主分支和两个增量分支
CREATE BRANCH my_table_main;
CREATE BRANCH my_table_inc1;
CREATE BRANCH my_table_inc2;-- 给增量分支1写入部分数据
MERGE INTO my_table my_table_inc1USING (SELECT 'A' AS key1, 'B' AS key2, 'b0_new' AS b0_new, 'c0_new' AS c0_new) AS inputON my_table_inc1.key1 = input.key1 AND my_table_inc1.key2 = input.key2  WHEN MATCHED THENUPDATE SET b0_new = input.b0_new, c0_new = input.c0_newWHEN NOT MATCHED THEN  INSERT (key1, key2, b0_new, c0_new)VALUES (input.key1, input.key2, input.b0_new, input.c0_new);-- 给增量分支2写入部分数据  
MERGE INTO my_table my_table_inc2USING (SELECT 'A' AS key1, 'D' AS key2, 'e0_new' AS e0_new) AS inputON my_table_inc2.key1 = input.key1 AND my_table_inc2.key2 = input.key2WHEN MATCHED THENUPDATE SET e0_new = input.e0_new  WHEN NOT MATCHED THENINSERT (key1, key2, e0_new)VALUES (input.key1, input.key2, input.e0_new);-- 把增量数据合并到主分支,分区键和聚簇键保持不变
MERGE INTO my_table my_table_mainUSING my_table_inc1 ON my_table_main.key1 = my_table_inc1.key1 AND my_table_main.key2 = my_table_inc1.key2WHEN MATCHED AND my_table_inc1.b0_new IS NOT NULL THEN  UPDATE SET b0_new = my_table_inc1.b0_new, c0_new = my_table_inc1.c0_newWHEN NOT MATCHED THENINSERT *;MERGE INTO my_table my_table_main  USING my_table_inc2ON my_table_main.key1 = my_table_inc2.key1 AND my_table_main.key2 = my_table_inc2.key2WHEN MATCHED AND my_table_inc2.e0_new IS NOT NULL THENUPDATE SET e0_new = my_table_inc2.e0_new  WHEN NOT MATCHED THENINSERT *;
  1. 首先定义了一个基表my_table,包含了所有需要的列。

  2. 然后创建了一个主分支和两个增量分支。

  3. 分别向两个增量分支写入部分变更数据。

  4. 最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。

  5. 首先定义了一个基表my_table,包含了所有需要的列。

  6. 然后创建了一个主分支和两个增量分支。

  7. 分别向两个增量分支写入部分变更数据。

  8. 最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。

3.3 异步索引和Z-order聚簇优化

使用异步索引和Z-order聚簇优化,在分支上验证效果,支持A/B测试

// 创建优化分支
Branch optimizedBranch = mainBranch.createBranch("optimized_branch_" + System.currentTimeMillis());// 异步构建Bloom filter索引
BloomFilterIndex bloomFilterIndex = BloomFilterIndex.builderFor(optimizedBranch).withColumnNames("id").withExpectedEntries(1000000).withFalsePositiveProbability(0.01).build();// 异步构建Z-order索引
ZOrderIndex zOrderIndex = ZOrderIndex.builderFor(optimizedBranch).withColumnNames("ts", "id").build();IndexTasks.Builder indexBuilder = IndexTasks.builderFor(optimizedBranch).addIndex(bloomFilterIndex).addIndex(zOrderIndex).withMaxDurationInMs(30 * 60 * 1000); // 30 minutes timeoutIndexTasks indexTasks = indexBuilder.build();
indexTasks.executeAsync();// 使用Z-order聚簇优化数据布局
ClusteringOrder clusteringOrder = ClusteringOrder.builderFor(optimizedBranch).asc("ts").asc("id").build();optimizedBranch.replaceSortOrder(clusteringOrder).commit();

在这个例子中,我们首先创建了一个优化分支optimized_branch,用于尝试不同的优化策略。然后,我们异步构建了Bloom filter索引和Z-order索引,以加速查询。同时,我们还使用了Z-order聚簇来优化数据布局,使得相关数据在存储上更加紧凑。

3.4 A/B测试

// 在优化分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(optimizedBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).planWithBloomFilter(true).planWithZOrder(true).build()) {// ...
}// 在主分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(mainBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).build()) {// ...
}

为了验证优化策略的效果,我们可以在优化分支和主分支上分别执行相同的查询,并比较它们的性能指标(如查询延迟、资源消耗等)。这就是一种简单的A/B测试方法。如果优化分支的性能明显优于主分支,我们就可以将优化分支的变更合并到主分支。

3.5 多租户访问控制

// 创建部门视图
SQL.execute("CREATE VIEW dept_view_1 AS SELECT id, data FROM my_table WHERE dept = 'dept1'");
SQL.execute("CREATE VIEW dept_view_2 AS SELECT id, ts FROM my_table WHERE dept = 'dept2'");// 为不同部门授权视图
SQL.execute("GRANT SELECT ON dept_view_1 TO ROLE dept1_role");
SQL.execute("GRANT SELECT ON dept_view_2 TO ROLE dept2_role");

对于多租户表,不同部门可能关注不同的列。为了避免直接访问全表,我们可以为每个部门创建一个授权视图,只暴露部门关注的列。然后,我们可以使用Iceberg的SQL扩展来管理视图的权限,确保每个部门只能访问自己被授权的视图。 

这篇关于揭秘Iceberg:数据湖新版本的高级特性全面解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1092085

相关文章

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

IDEA如何切换数据库版本mysql5或mysql8

《IDEA如何切换数据库版本mysql5或mysql8》本文介绍了如何将IntelliJIDEA从MySQL5切换到MySQL8的详细步骤,包括下载MySQL8、安装、配置、停止旧服务、启动新服务以及... 目录问题描述解决方案第一步第二步第三步第四步第五步总结问题描述最近想开发一个新应用,想使用mysq

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da

Python实现数据清洗的18种方法

《Python实现数据清洗的18种方法》本文主要介绍了Python实现数据清洗的18种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录1. 去除字符串两边空格2. 转换数据类型3. 大小写转换4. 移除列表中的重复元素5. 快速统

五大特性引领创新! 深度操作系统 deepin 25 Preview预览版发布

《五大特性引领创新!深度操作系统deepin25Preview预览版发布》今日,深度操作系统正式推出deepin25Preview版本,该版本集成了五大核心特性:磐石系统、全新DDE、Tr... 深度操作系统今日发布了 deepin 25 Preview,新版本囊括五大特性:磐石系统、全新 DDE、Tree