揭秘Iceberg:数据湖新版本的高级特性全面解析

2024-08-21 04:12

本文主要是介绍揭秘Iceberg:数据湖新版本的高级特性全面解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1前言

2 特性说明

2.1 Branch and Tag

2.2 Puffin format

2.3 Statistics 

2.4View

创建物化视图

查询物化视图

3应用案例

3.1 CDC数据入湖

3.2 多流数据拼接

3.3 异步索引和Z-order聚簇优化

3.4 A/B测试

3.5 多租户访问控制


1 前言

以下的讨论都基于iceberg1.1.0版本

2 特性说明

2.1 Branch and Tag

支持对表创建分支和标签,方便数据管理和访问。可以在特定快照上创建分支或标签,并设置保留时间。读写可以指定分支。

2.2 Puffin format

Puffin( ['pʌfɪn])  format是Iceberg引入的一种新的文件格式,主要用于存储那些无法直接存储在Iceberg清单(manifest)文件中的数据,如表的索引和统计信息。 

Puffin format的设计目标是:

  1. 独立性:Puffin文件可以独立于数据文件和清单文件而存在,方便单独管理和访问。

  2. 灵活性:Puffin文件的内容和结构可以根据不同的需求而定制,如不同类型的索引(Bloom filter、B+树等)或不同粒度的统计信息(表级、分区级等)。

  3. 可扩展性:Puffin format支持动态添加新的内容类型,以支持未来的新特性和需求。

 创建表时启用Puffin format

Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);PartitionSpec spec = PartitionSpec.unpartitioned();Map<String, String> properties = new HashMap<>();
properties.put(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
properties.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "134217728");// 启用Puffin format
properties.put(TableProperties.WRITE_NEW_FORMAT_ENABLED, "true");Table table = catalog.createTable(schema, spec, properties, "my_table");
Puffin Statics社区说明

假设我们有一个很大的Iceberg表,记录了某电商平台的用户订单信息,包括用户ID、商品ID、购买时间、金额等字段。现在我们要为这个表建立索引和统计信息,以优化查询性能。

  • 索引:我们决定为用户ID和商品ID分别建立Bloom filter索引,以加速查询。我们可以在数据写入时动态生成这些索引,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:

Puffin File: user_id_index.puffin
Magic Number: PUFFIN
Version: 1
Index Type: BLOOM_FILTER
Column: user_id
Chunk 1: (offset: 0, length: 1024)
Chunk 2: (offset: 1024, length: 1024)
...
Bloom Filter Data: [...bytes...]

  • 统计信息:我们还想为表的每个分区(如按天分区)记录一些统计信息,如行数、总金额等。这些统计信息可以帮助查询优化器生成更好的执行计划。我们可以在数据写入后异步生成这些统计信息,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:

Puffin File: partition_stats.puffin
Magic Number: PUFFIN
Version: 1
Stats Type: PARTITION_STATS
Partition 1: (name: "2022-01-01", rows: 1000000, total_amount: 1234567.89)
Partition 2: (name: "2022-01-02", rows: 1500000, total_amount: 2345678.90)
...

有了这些索引和统计信息,我们就可以更高效地查询数据。例如,如果我们要查询某个用户在某个时间段内的订单,可以先通过Bloom filter索引快速过滤出可能的数据文件,然后再根据分区统计信息选择最优的查询路径

2.3 Statistics 

Statistics如果帮助提高CBO(Cost-Based Optimizer)

  1. 基数估计:

    • 诸如行数、不同值计数和空值计数等统计信息有助于估计表和中间结果的基数。
    • 基数估计对于预测查询结果的大小以及优化连接顺序、连接算法和数据访问路径至关重要。
    • 准确的基数估计可以得出更精确的成本计算结果和更好的执行计划。
  2. 选择性估计:

    • 列值的统计信息(如最小值、最大值和直方图)可以估计谓词的选择性。
    • 选择性是指满足给定谓词或条件的行的比例。
    • CBO使用选择性估计来确定过滤器、索引和连接条件的有效性。
    • 更准确的选择性估计有助于CBO选择最有效的过滤器和访问路径。
  3. 数据倾斜检测

  4. 索引选择

  5. 分区裁剪

  6. 连接算法选择

  7. 资源估计

写入数据时生成索引和统计信息

DataFile dataFile = DataFiles.builder(spec).withPath("/path/to/data-1.parquet").withFileSizeInBytes(1234).withRecordCount(100).build();// 生成Bloom filter索引
BloomFilter bloomFilter = new BloomFilter(1000, 0.01);
bloomFilter.add(1);
bloomFilter.add(2);
// ...// 生成统计信息
Map<Integer, Long> columnSizes = new HashMap<>();
columnSizes.put(1, 100L);
columnSizes.put(2, 200L);
// ...StatisticsFile statisticsFile = StatisticsFiles.builder().withPath("/path/to/stats-1.puffin").withBloomFilter(1, bloomFilter).withColumnSizes(columnSizes).build();table.newAppend().appendFile(dataFile).appendStatistics(statisticsFile).commit();

查询数据时使用索引和统计信息 

Table table = catalog.loadTable("my_table");// 读取索引和统计信息
StatisticsFile statisticsFile = table.currentSnapshot().statisticsFile();
BloomFilter bloomFilter = statisticsFile.bloomFilter(1);
Map<Integer, Long> columnSizes = statisticsFile.columnSizes();// 使用索引和统计信息进行查询优化
CloseableIterable<Record> reader = IcebergGenerics.read(table).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).filter(Expressions.equal("id", 1)).caseSensitive(true).useBloomFilter(1, bloomFilter).build()).build();

 在查询数据时,可以通过table.currentSnapshot().statisticsFile()读取当前快照对应的索引和统计信息文件,然后在创建CloseableIterable读取器时,通过useBloomFilter等方法应用这些索引和统计信息,以优化查询性能。

以上就是在Java中使用Iceberg的Puffin format和Statistics的基本流程。当然,具体的实现还涉及到更多的细节和优化,如索引和统计信息的增量更新、并发控制等,需要根据实际的应用场景来设计。

2.4View

view可以很好地支持物化视图场景,通过预先计算和存储数据来加速查询。

创建物化视图

// 定义基表
Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table baseTable = catalog.createTable(schema, spec, "base_table");// 定义物化视图
String viewQuery = "SELECT id, count(*) as cnt FROM base_table GROUP BY id";
Map<String, String> properties = new HashMap<>();
properties.put("format-version", "2");
properties.put("location", "/path/to/view");//将刷新模式设置为periodic(定期刷新),刷新间隔设置为1h(1小时)。Iceberg会在后台自动管理物化视图的刷新,无需手动调用refresh方法。
properties.put("iceberg.view.refresh.mode", "periodic");
properties.put("iceberg.view.refresh.period", "1h");
Table materializedView = catalog.createView(viewQuery, Collections.emptyList(), properties, "my_view");

这个例子中,我们首先创建了一个基表base_table,然后定义了一个聚合查询语句作为物化视图my_view的定义。注意,在创建视图时,我们需要设置一些属性,如format-versionlocation,以指定视图的存储格式和位置。

查询物化视图

// 查询物化视图
try (CloseableIterable<Record> reader = IcebergGenerics.read(materializedView).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).build()) {for (Record record : reader) {System.out.println(record);}
}

查询物化视图与查询普通表的方式类似,Iceberg会自动从视图的存储位置读取预先计算好的数据,从而提供更快的查询速度。

3应用案例

3.1 CDC数据入湖

// 创建主分支和变更分支
Branch mainBranch = table.mainBranch();
Branch changeBranch = mainBranch.createBranch("change_branch_" + System.currentTimeMillis());// 将CDC事件写入变更分支
List<Record> cdcEvents = Lists.newArrayList(new Record(1L, "update", "a", "a1"),new Record(2L, "insert", "b", null),new Record(3L, "delete", "c", null)
);try (CloseableIterable<Record> writer = IcebergGenerics.write(changeBranch).createWriterFunc(GenericParquetWriter::buildWriter).build()) {writer.forEach(cdcEvents::add);
}// 生成变更日志
Table changeTable = changeBranch.table();
Table changeLogTable = changeTable.newAppend().appendFile(changeTable.currentSnapshot().addedFiles()).commit();// 将变更分支快照合并到主分支
mainBranch.cherryPick(changeBranch.snapshot()).commit();

在这个例子中,我们首先创建了一个变更分支change_branch,用于存储CDC事件。然后,我们将CDC事件写入变更分支,并基于变更分支生成了一个变更日志表changeLogTable。最后,我们使用cherryPick方法将变更分支的快照合并到主分支,从而完成了CDC数据入湖的过程。 

3.2 多流数据拼接

-- 定义基表
CREATE TABLE my_table (key1 INTEGER,key2 INTEGER,b0_new INTEGER,c0_new INTEGER,d0_new INTEGER,e0_new INTEGER,c1_new INTEGER,d1_new INTEGER,c2_new INTEGER,d2_new INTEGER
);-- 创建主分支和两个增量分支
CREATE BRANCH my_table_main;
CREATE BRANCH my_table_inc1;
CREATE BRANCH my_table_inc2;-- 给增量分支1写入部分数据
MERGE INTO my_table my_table_inc1USING (SELECT 'A' AS key1, 'B' AS key2, 'b0_new' AS b0_new, 'c0_new' AS c0_new) AS inputON my_table_inc1.key1 = input.key1 AND my_table_inc1.key2 = input.key2  WHEN MATCHED THENUPDATE SET b0_new = input.b0_new, c0_new = input.c0_newWHEN NOT MATCHED THEN  INSERT (key1, key2, b0_new, c0_new)VALUES (input.key1, input.key2, input.b0_new, input.c0_new);-- 给增量分支2写入部分数据  
MERGE INTO my_table my_table_inc2USING (SELECT 'A' AS key1, 'D' AS key2, 'e0_new' AS e0_new) AS inputON my_table_inc2.key1 = input.key1 AND my_table_inc2.key2 = input.key2WHEN MATCHED THENUPDATE SET e0_new = input.e0_new  WHEN NOT MATCHED THENINSERT (key1, key2, e0_new)VALUES (input.key1, input.key2, input.e0_new);-- 把增量数据合并到主分支,分区键和聚簇键保持不变
MERGE INTO my_table my_table_mainUSING my_table_inc1 ON my_table_main.key1 = my_table_inc1.key1 AND my_table_main.key2 = my_table_inc1.key2WHEN MATCHED AND my_table_inc1.b0_new IS NOT NULL THEN  UPDATE SET b0_new = my_table_inc1.b0_new, c0_new = my_table_inc1.c0_newWHEN NOT MATCHED THENINSERT *;MERGE INTO my_table my_table_main  USING my_table_inc2ON my_table_main.key1 = my_table_inc2.key1 AND my_table_main.key2 = my_table_inc2.key2WHEN MATCHED AND my_table_inc2.e0_new IS NOT NULL THENUPDATE SET e0_new = my_table_inc2.e0_new  WHEN NOT MATCHED THENINSERT *;
  1. 首先定义了一个基表my_table,包含了所有需要的列。

  2. 然后创建了一个主分支和两个增量分支。

  3. 分别向两个增量分支写入部分变更数据。

  4. 最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。

  5. 首先定义了一个基表my_table,包含了所有需要的列。

  6. 然后创建了一个主分支和两个增量分支。

  7. 分别向两个增量分支写入部分变更数据。

  8. 最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。

3.3 异步索引和Z-order聚簇优化

使用异步索引和Z-order聚簇优化,在分支上验证效果,支持A/B测试

// 创建优化分支
Branch optimizedBranch = mainBranch.createBranch("optimized_branch_" + System.currentTimeMillis());// 异步构建Bloom filter索引
BloomFilterIndex bloomFilterIndex = BloomFilterIndex.builderFor(optimizedBranch).withColumnNames("id").withExpectedEntries(1000000).withFalsePositiveProbability(0.01).build();// 异步构建Z-order索引
ZOrderIndex zOrderIndex = ZOrderIndex.builderFor(optimizedBranch).withColumnNames("ts", "id").build();IndexTasks.Builder indexBuilder = IndexTasks.builderFor(optimizedBranch).addIndex(bloomFilterIndex).addIndex(zOrderIndex).withMaxDurationInMs(30 * 60 * 1000); // 30 minutes timeoutIndexTasks indexTasks = indexBuilder.build();
indexTasks.executeAsync();// 使用Z-order聚簇优化数据布局
ClusteringOrder clusteringOrder = ClusteringOrder.builderFor(optimizedBranch).asc("ts").asc("id").build();optimizedBranch.replaceSortOrder(clusteringOrder).commit();

在这个例子中,我们首先创建了一个优化分支optimized_branch,用于尝试不同的优化策略。然后,我们异步构建了Bloom filter索引和Z-order索引,以加速查询。同时,我们还使用了Z-order聚簇来优化数据布局,使得相关数据在存储上更加紧凑。

3.4 A/B测试

// 在优化分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(optimizedBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).planWithBloomFilter(true).planWithZOrder(true).build()) {// ...
}// 在主分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(mainBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).build()) {// ...
}

为了验证优化策略的效果,我们可以在优化分支和主分支上分别执行相同的查询,并比较它们的性能指标(如查询延迟、资源消耗等)。这就是一种简单的A/B测试方法。如果优化分支的性能明显优于主分支,我们就可以将优化分支的变更合并到主分支。

3.5 多租户访问控制

// 创建部门视图
SQL.execute("CREATE VIEW dept_view_1 AS SELECT id, data FROM my_table WHERE dept = 'dept1'");
SQL.execute("CREATE VIEW dept_view_2 AS SELECT id, ts FROM my_table WHERE dept = 'dept2'");// 为不同部门授权视图
SQL.execute("GRANT SELECT ON dept_view_1 TO ROLE dept1_role");
SQL.execute("GRANT SELECT ON dept_view_2 TO ROLE dept2_role");

对于多租户表,不同部门可能关注不同的列。为了避免直接访问全表,我们可以为每个部门创建一个授权视图,只暴露部门关注的列。然后,我们可以使用Iceberg的SQL扩展来管理视图的权限,确保每个部门只能访问自己被授权的视图。 

这篇关于揭秘Iceberg:数据湖新版本的高级特性全面解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1092085

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

揭秘世界上那些同时横跨两大洲的国家

我们在《世界人口过亿的一级行政区分布》盘点全球是那些人口过亿的一级行政区。 现在我们介绍五个横跨两州的国家,并整理七大洲和这些国家的KML矢量数据分析分享给大家,如果你需要这些数据,请在文末查看领取方式。 世界上横跨两大洲的国家 地球被分为七个大洲分别是亚洲、欧洲、北美洲、南美洲、非洲、大洋洲和南极洲。 七大洲示意图 其中,南极洲是无人居住的大陆,而其他六个大洲则孕育了众多国家和

三国地理揭秘:为何北伐之路如此艰难,为何诸葛亮无法攻克陇右小城?

俗话说:天时不如地利,不是随便说说,诸葛亮六出祁山,连关中陇右的几座小城都攻不下来,行军山高路险,无法携带和建造攻城器械,是最难的,所以在汉中,无论从哪一方进攻,防守方都是一夫当关,万夫莫开;再加上千里运粮,根本不需要打,司马懿只需要坚守城池拼消耗就能不战而屈人之兵。 另一边,洛阳的虎牢关,一旦突破,洛阳就无险可守,这样的进军路线,才是顺势而为的用兵之道。 读历史的时候我们常常看到某一方势

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象