本文主要是介绍DataLake — 批流一体化的追风者(2) -- Delta Lake核心原理解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、Delta Lake
1.Delta Lake基础概述
接上文,我们全面地讲解了Data Lake相关的概念、对比区别以及实际发展历程等。那么这篇首章开篇我们来讲历史最为悠久的Delta Lake。它的定位是流批一体的存储中间层,支持 update/delete/merge。由于出自Databricks,spark的所有数据写入方式,包括基于dataframe的批、流,以及 SQL的Insert、Insert Overwrite等都是支持的(开源版本SQL写暂不支持)。与Iceberg类似,Delta不强调主键,因此其update/delete/merge的实现均是基于spark的join功能。在数据写入方面,Delta与Spark是强绑定的,这一点Hudi是不同的:Hudi的数据写入不绑定Spark(可以用 Spark,也可以使用Hudi自己的写入工具写入)。
在查询方面,开源Delta目前支持Spark与Presto,但是Spark模块是不可或缺的,因为delta log的处理需要用到Spark。这意味着如果要用Presto查询时还要跑一个Spark作业。在Presto查询前要运行Spark作业生成Symlink文件。如果表数据是实时更新的,意味着每次在查询之前先要跑一个SparkSQL,再跑Presto。这样以来,Presto查询显得特别鸡肋。所以完全脱离SparkSQL的话,需要去抽象一个DeltaInputFormat接口,这样用户可以直接使用Presto查询Delta数据,而不必事先启动一个 Spark 任务。所以开源思想在这确实没有怎么体现~ ~ ~
在查询性能方面,开源的Delta几乎没有任何优化。Iceberg的hidden partition且不说,普通的column的统计信息也没有。Delta在数据merge方面性能不如 Hudi,在查询方面性能不如Iceberg,是不是意味着Delta一无是处了呢?其实不然。Delta的一大优点就是与Spark的整合能力,尤其是其流批一体的设计,配合multi-hop的data pipeline,可以支持分析、Machine learning、CDC等多种场景。使用灵活、场景支持完善是它相比Hudi和Iceberg 的最大优点。另外,Delta号称是Lambda和Kappa架构的改进版,无需关心流批,无需关心架构。这一点上Hudi和Iceberg是力所不及的。
2.核心原理解析
所以我们可以从以上的内容看出DeltaLake是砖厂为了扩大Spark体系版图而设计的一个数据存储中间层,它的建立是为了更好得服务Spark生态,对于其他开源组件的支持短期内可能是个幻像。对它来说我们需要去了解的是它的设计思路以及实现方式。做到知己知彼,百战不殆。下来,我们开始从几个核心要素来展开。
Delta Lake支持MVVC(多版本并发控制),并且在多并发写入之间提供ACID事务保证。每次写入都是一个事务,并且在事务日志中记录了写入的序列顺序。事务日志跟踪文件级别的写入并使用乐观并发控制,这点非常适合数据湖,因为在多次写入/修改相同的文件在数据湖里很少发生。在有且存在冲突的情况下,这时Delta Lake会抛出并发修改异常以便用户能够处理它们并且这时也会重试该作业,将冲突的文件在前一个进程处理之后会将最终生成的文件与后一个写入进行下一次合并修改,这样两个同时进来的并发间的矛盾便会被合理处理掉。Delta Lake还提供强大的可序列化隔离级别,允许持续写入目录或表,并允许消费者继续从同一目录或表中读取。读者将看到阅读开始时存在的最新快照。
Delta Lake会去自动验证正在被写入的表的DataFrame是否与表的schema兼容。一旦表中存在DataFrame中不存在的列时,此操作则会引发写入异常;表中不存在但schema存在的列会设置为null。Delta Lake支持向后兼容(backward compatibility),因而这里的schema还是对数据有一定的规范和限制作用的。Delta Lake 具有显式添加新列的 DDL 以及自动更新模式的能力。
Delta Lake将表或目录的元数据信息存储在事务日志中,而不是Metastore 中。这允许Delta Lake在恒定时间内列出大型目录中的文件,同时在读取数据时非常高效。这里是数据湖产品的一个特点,即和hive的元数据存储MySQL数据库的单点存在一定的差异性,通过分布式文件系统可以有效避免元数据单点过热,并且能够保证性能。
Delta Lake允许用户读取表或目录之前的快照。当文件被修改文件时,Delta Lake会创建较新版本的文件并保留旧版本的文件。当用户想要读取旧版本的表或目录时,他们可以在 Apache Spark的读取API中提供时间戳或版本号,Delta Lake会根据事务日志中的信息构建该时间戳或版本的完整快照。这允许用户重现之前的数据,并在需要时将表还原为旧版本的数据。所以这里是可以通过Delta来实现对于历史存量数据的回溯以及消费处理。
除批量写入外,Delta Lake还可用作Apache Spark、Structured Streaming 的高效流式sink来实现写入。结合ACID事务和可扩展的元数据处理,高效的流式sink现在可以实现大量近实时分析用例,而无需同时维护复杂的流式传输和批处理管道。对于统一流批计算,维护一套开发逻辑会是一个很好的ieda,整合流批计算,首先要去能够支持对流批的引擎,这里spark可以通过一套SQL语义来实现,其次则是对于写入引擎的支持,包含了dml的更新等merge操作,这里也是hive所不支持的,因此说数据湖能整合流批的写入,这个特性起到了很大的作用;接着呢,我们在查询上要可以按照快照和过程态数据两种方式进行查询,也是对于两者融合的一大体现。
Delta Lake中的所有数据都是使用Apache Parquet 格式存储,使Delta Lake 能够利用Parquet原生的高效压缩和编码方案。这个特性大部分的数据湖产品都会使用开源的存储模式,包括parquet、avro、orc等等,这里也是需要去和目前的文件格式进行统一的,没必要重复造轮子,复用即是最好的诠释。
这个功能马上可以使用。Delta Lake 将支持 merge, update 和 delete 等 DML 命令。这使得数据工程师可以轻松地在数据湖中插入/更新和删除记录。 由于 Delta Lake 以文件级粒度跟踪和修改数据,因此它比读取和覆盖整个分区或表更有效。
Delta Lake还将支持新的 API 来设置表或目录的数据异常。工程师能够设置一个布尔条件并调整报警阈值以处理数据异常。当 Apache Spark 作业写入表或目录时,Delta Lake 将自动验证记录,当数据存在异常时,它将根据提供的设置来处理记录。
100% 兼容 Apache Spark API
3.Delta Lake几个新特性
提前说明,开源版本不一定完全都有,所以你懂得,我们接着看看具体的实现。
这个特性可以让用户查询给定时间点的快照或回滚错误更新到之前正确的数据。
UPSERT, DELETE 和 MERGE操作:这些操作可有效地重写相关对象,以实现对存档数据和法规遵从性工作流的更新;这个和多版本是密不可分的,有了多版本的支持才会有对于各个快照的处理的可能。
通过让流作业以低延迟的形式将小对象写入表中,然后以事务形式将它们合并到较大的对象中以提高查询性能。这个也即是所谓的copy on write,它的upsert操作即是使用这个功能来实现的。
由于 Delta 表中的对象及其日志是不可变(immutable)的,因此集群节点可以安全地将它们缓存在本地存储中。Databricks 云服务利用此功能为 Delta 表实现透明的 SSD 缓存。(开源不可用)
这个功能可自动优化表中对象的大小,并且会对数据记录进行聚类(clustering)(例如,将记录存储存储成Zorder形式以实现多个维度上的本地化)而不会影响正在运行的查询。这个功能只有数砖的产品里面有,开源的Delta Lake是不包含这个的。(开源不可用)
如果表的模式发生变化,Delta 可以继续读取旧的 Parquet 文件而无需重写它们。之前降到过向后兼容就是这个schema evolution。
基于事务日志的审计功能,可以提高查询的速度和效率。(开源不可用)
4.流批一体化的优化
可以通过下图看到实时方式和离线方式完全是在两套流程里进行,并且其中包含了多个步骤和组件,做到离线实时统一,一方面是去从整体计算上面去发力,另外我们组件层面尽可能的精简也是能否促成这个局面的一种方式。
同样我们通过Delta Lake可以很好的去跟Spark生态的产品进行融合,包括了机器学习、离线实时数据查询、BI报表、实时分析等各个领域。所以它能提供的能力很大程度上是我们目前正在寻求的一种解决当下问题的方式,所以看到这里是不是觉得的确有点意思呢?
5.事务日志原理
将事务分解为原子提交,每当用户执行修改表的操作(例如插入、更新或删除)时,Delta Lake将该操作分解为一系列由以下一个或多个操作组成的离散步骤:
- Add file:添加一个数据文件;
- Remove file:删除一个数据文件;
- Update metadata:更新表的元数据(例如更改表的名称,模式或分区);
- Set transaction:Structured Streaming 作业已经提交的具有给定 ID 的微批次记录;
- Change protocol:通过将事务日志切换到最新的软件协议来启用新特性;
- Commit info:包含有关提交的信息以及该操作是在何时何地进行的。
然后这些操作将按照有序的原子单位记录在事务日志中,称为提交。
UPDATE: Under the hood,首先需要找到并选择那些包含与谓词匹配因而需要更新的数据的文件。这个过程中 Delta Lake可以使用data skipping技术来加速这个过程。将每个匹配的文件读入内存,更新相关行,并将结果写入新的数据文件。
一旦 Delta Lake 成功地执行了UPDATE,它就会在事务日志中添加一个提交commit,表明从现在开始将使用新的数据文件来代替旧的数据文件。但是旧的数据文件没有被删除。相反,它只是被标记成tombstoned — 意思是这个文件只属于表的旧版本数据文件而不是当前版本的数据文件。Delta Lake能够使用它来提供数据版本控制和时间旅行(time travel)。
DELETE + VACUUM:清理旧的文件。运行 VACUUM 命令永久删除满足以下条件的所有数据文件:不再是活动表的一部分,并且超过保留阈值(默认为七天)。
Delta Lake的MERGE操作帮助我们实现upserts语义,它是实现了UPDATE和INSERT的混合。假设我们有一张目标表和一张源表,其中目标表包含新记录和对现有记录的更新。upsert具体实现原理是通过先用源表和目标数据进行inner join,得到match的行之后,然后再通过outter join去更新、删除或新增行,即当源表中的一条记录与目标表中现有的记录相匹配时,Delta Lake 将更新该记录;当没有匹配时,Delta Lake 将插入新记录;这里便通过join实现了upsert操作。
MERGE 的实现与 UPDATE 或者 DELETE 的主要区别是其使用了 join,这一事实允许我们在寻求提高性能时使用一些独特的策略。
MERGE:性能调优
为了提升 MERGE 的执行性能,我们需要了解上面两个 join 中的哪个影响了程序的执行。如果 inner join 是执行 MERGE 的瓶颈(比如找到 Delta Lake 需要重写的文件花费的时间太长了),那么我们可以采用以下的策略解决:
- 添加更多的过滤条件来减少搜索空间;
- 调整 shuffle partition 的数量;
- 调整 broadcast join 的阈值;
- 如果表中有很多小文件,我们可以先压缩合并它们;但不要将它们压缩为太大的文件,因为 Delta Lake 必须复制整个文件来重写它。
如果 outer join 是执行 MERGE 的瓶颈(比如重写文件花费的时间太长了),那么我们可以采用以下的策略解决:
- 调整 shuffle partition 的数量
- 这个对分区表可能会生成很多的小文件;
- 在写文件之前开启自动重分区来减少 Reduce 产生的文件。
- 调整 broadcast 的阈值。如果我们使用 full outer join, Spark 则无法进行 broadcast join, 但是如果我们使用 right outer join, Spark 则可以使用;我们可以根据实际情况来调整 broadcast 的阈值;
- 缓存 source table / DataFrame.
- 缓存 source table 可以加快第二次的扫描时间,但是记住别缓存 target table,因为这可能会导致缓存一致性问题。
现在我们已经在高层次上了解了事务日志如何工作的,现在我们来谈谈并发性先关的内容。到目前为止,我们的示例主要涵盖了用户线性提交事务或至少没有冲突的情况。 但是当Delta Lake处理多个并发读写时会发生什么?答案很简单,由于Delta Lake由 Apache Spark 提供支持,因此提供了可以让多个用户同时修改表的功能,为了处理这些并发情况,Delta Lake 采用了乐观的并发控制
什么是乐观并发控制?
,它是假设数据一般情况不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果冲突,则返回给用户异常信息,让用户决定如何去做。乐观锁适用于读多写少的场景,这样可以提高程序的吞吐量。乐观并发控制是一种处理并发事务的方法,它假定不同用户对表所做的事务(更改)可以在不相互冲突的情况下完成。它的速度快得令人难以置信,因为当处理 PB 级的数据时,用户很可能同时处理数据的不同部分,从而允许他们同时完成不冲突的事务。
例如,假设你和我正在一起玩拼图游戏。只要我们都在做拼图的不同部分——比如你在角落里,我在边缘上——我们没有理由不能同时做更大拼图的那一部分,并且以两倍的速度完成拼图。只有当我们同时需要相同的部件时,才会产生冲突。这就是乐观并发控制。
相反,一些数据库系统使用悲观锁定的概念,这是假设最坏的情况——即使我们有10,000块拼图,在某个时候我们肯定需要相同的拼图——这导致了太多的冲突。为了解决这个问题,它的理由是,应该只允许一个人同时做拼图,并把其他人都锁在房间外面。这不是一个快速(或友好)解决难题的方法!当然,即使使用乐观并发控制,有时用户也会尝试同时修改数据的相同部分。幸运的是,Delta Lake 有相应的协议处理它。
乐观地解决冲突
为了提供ACID事务,Delta Lake 有一个协议,用于确定提交应该如何排序(在数据库中称为 serializability),并确定在同时执行两个或多个提交时应该做什么。Delta Lake通过实现互斥(mutual exclusion)规则来处理这些情况,然后尝试乐观地解决任何冲突。该协议允许Delta Lake遵循ACID隔离原则,该原则确保多个并发写操作之后的表的结果状态与那些连续发生的写操作相同,并且是彼此隔离的。一般来说,这个过程是这样进行的:
- 记录起始表的版本;
- 记录读和写操作;
- 尝试提交;
- 如果有人已经提交了,检查一下你读到的内容是否有变化;
- 如果有提交冲突则等待后再合并;
- 重复上面的步骤。
Delta Lake对写入进行schema校验,也就是说所有表格的写入操作都会用表的schema做兼容性检查。如果schema不兼容,Delta Lake将会撤销这次事务(没有任何数据写入),并且返回相应的异常信息告知用户。Delta Lake 通过以下准则判断一次写入是否兼容,即对写入的 DataFrame 必须满足:
- 不能包含目标表 schema 中不存在的列。相反,如果写入的数据没有包含所有的列是被允许的,这些空缺的列将会被赋值为 null;
- 不能包含与目标表类型不同的列。如果目标表包含 String 类型的数据,但 DataFrame 中对应列的数据类型为 Integer,Schema 约束将会返回异常,防止该次写入生效;
- 不能包含只通过大小写区分的列名。这意味着不能在一张表中同时定义诸如“Foo”和“foo”的列。不同于 Spark可以支持大小写敏感和不敏感(默认为大小写不敏感)两种不同的模式,Delta Lake保留大小写,但在 schema存储上大小写不敏感。Parquet格式在存储和返回列信息上面是大小写敏感的,因此为了防止潜在的错误、数据污染和丢失的问题,Delta Lake 引入了这个限制。
原子性是ACID事务的四个属性之一,它可以保证在Delta Lake上执行的操作(如 INSERT 或 UPDATE )要么全部成功要么全部不成功。 如果没有此属性,硬件故障或软件错误很容易导致数据仅部分写入表中,从而导致数据混乱或损坏。
事务日志是Delta Lake能够提供原子性保证的机制。无论如何,如果它没有记录在事务日志中,它就不会发生。通过只记录完全执行的事务,并使用该记录作为唯一的真相来源,事务日志允许用户对其数据进行推理;并且即使数据在PB级别上,我们也可以对这些数据的准确性高枕无忧。
6.Delta Lake功能再总结
通过上面5章由表及里的阐述,我们大致可以发现Delta Lake优势在于与Spark生态的完整交互以及对于Spark整体流批一体化的填补,整体上的功能很多,但其实很多高阶部分都是需要用砖厂的内部付费版本。但从实际使用上的易用上来看,spark上使用确实很便捷。设计的思路上包含了大部分我们痛点的更新以及流批一体和分布式元数据管理等;所以从常规的开源版本看,对于一些高阶的诸如clustering、hidden partitions、SSD缓存及审计日志等暂时都没开放,因此对于Delta Lake来讲,我们主要还是研究其实现方式以及核心原理上,实时使用的话,会从Hudi和Iceberg中来选择。今天这篇就先到这里,下一篇我们展开来讲Hudi, 晚安 see you ~ ~ ~
这篇关于DataLake — 批流一体化的追风者(2) -- Delta Lake核心原理解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!