本文主要是介绍重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
2019年10月16日,在荷兰阿姆斯特丹举行的 Spark+AI 欧洲峰会上,DataBricks 和 Linux 基金会联合宣布,开源项目 Delta Lake 正式成为 Linux 基金会的托管项目。
Delta Lake前世今生
Delta Lake的关键特性
数据湖的读写是不可靠的。数据工程师经常遇到不安全写入数据湖的问题,导致读者在写入期间看到垃圾数据。他们必须构建方法以确保读者在写入期间始终看到一致的数据。
数据湖中的数据质量很低。将非结构化数据转储到数据湖中是非常容易的。但这是以数据质量为代价的。没有任何验证模式和数据的机制,导致数据湖的数据质量很差。因此,努力挖掘这些数据的分析项目也会失败。
随着数据的增加,处理性能很差。随着数据湖中存储的数据量增加,文件和目录的数量也会增加。处理数据的作业和查询引擎在处理元数据操作上花费大量时间。在有流作业的情况下,这个问题更加明显。
数据湖中数据的更新非常困难。工程师需要构建复杂的管道来读取整个分区或表,修改数据并将其写回。这种模式效率低,并且难以维护。
ACID 事务:Delta Lake 提供多个写操作之间的 ACID 事务。每个写操作都是一个事务,事务日志中记录的写操作有一个串行顺序。事务日志会跟踪文件级的写操作,并使用乐观并发控制,这非常适合数据湖,因为尝试修改相同文件的多个写操作并不经常发生。在存在冲突的场景中,Delta Lake 会抛出一个并发修改异常,以便用户处理它们并重试它们的作业。Delta Lake 还提供了强大的序列化隔离级别,允许工程师不断地对目录或表进行写操作,而用户可以不断地从相同的目录或表中读取数据。读取者将看到读操作开始时存在的最新快照。
模式管理:Delta Lake 会自动验证正在写入的 DataFrame 模式是否与表的模式兼容。表中存在但 DataFrame 中不存在的列会被设置为 null。如果 DataFrame 中有额外的列在表中不存在,那么该操作将抛出异常。Delta Lake 具有可以显式添加新列的 DDL 和自动更新模式的能力。
可伸缩的元数据处理:Delta Lake 将表或目录的元数据信息存储在事务日志中,而不是存储在元存储(metastore)中。这使得 Delta Lake 能够在固定的时间内列出大型目录中的文件,并且在读取数据时非常高效。
数据版本控制和时间旅行:Delta Lake 允许用户读取表或目录先前的快照。当文件在写期间被修改时,Delta Lake 将创建文件的新版本并保存旧版本。当用户希望读取表或目录的旧版本时,他们可以向 Apache Spark 的读操作 API 提供一个时间戳或版本号,Delta Lake 根据事务日志中的信息构建该时间戳或版本的完整快照。这使得用户可以重新进行试验并生成报告,如果需要,还可以将表还原为旧版本。
统一的批处理和流接收(streaming sink):除了批处理写之外,Delta Lake 还可以使用 Apache Spark 的结构化流作为高效的流接收。再结合 ACID 事务和可伸缩的元数据处理,高效的流接收现在支持许多接近实时的分析用例,而且无需维护复杂的流和批处理管道。
记录更新和删除(即将到来):Delta Lake 将支持合并、更新和删除 DML 命令。这使得工程师可以轻松地维护和删除数据湖中的记录,并简化他们的变更数据捕获和 GDPR 用例。由于 Delta Lake 在文件粒度上跟踪和修改数据,因此,比读取和覆写整个分区或表要高效得多。
数据期望(即将到来):Delta Lake 还将支持一个新的 API,用于设置表或目录的数据期望。工程师将能够通过指定布尔条件及调整严重程度来处理数据期望。当 Apache Spark 作业写入表或目录时,Delta Lake 将自动验证记录,当出现违规时,它将根据所预置的严重程度处理记录。
原子可见性:必须有一种方法使文件完全可见或完全不可见。
互斥:只有一个写入者能够在最终目的地创建(或重命名)文件。
一致性清单:一旦在目录中写入了一个文件,该目录未来的所有清单都必须返回该文件。
Delta Lake牛刀初试
官网提供了QuickStart方便我们快速学习。
创建一个Maven工程,加入以下依赖:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.4.0</version>
</dependency>
Create a table
创建一个 Delta 类型的表方法很简单,如下。
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
SparkSession spark = ... // create SparkSession
Dataset<Row> data = data = spark.range(0, 5);
data.write().format("delta").save("/tmp/delta-table");
➜ delta-table tree
├── _delta_log
│ └── 00000000000000000000.json
├── part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet
├── part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet
├── part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet
└── part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet
Dataset<Row> data = data = spark.range(5, 10);
data.write().format("delta").mode("overwrite").save("/tmp/delta-table");
scala> val df = spark.read.format("delta").load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df.show()
+---+
| id|
+---+
| 8|
| 9|
| 5|
| 7|
| 6|
+---+
Conditional update without overwrite相当于upsert
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");
// 所有偶数加100
deltaTable.update(
functions.expr("id % 2 == 0"),
new HashMap<String, Column>() {{
put("id", functions.expr("id + 100"));
}}
);
// 删除所有偶数
deltaTable.delete(condition = functions.expr("id % 2 == 0"));
// 更新
Dataset<Row> newData = spark.range(0, 20).toDF();
deltaTable.as("oldData")
.merge(
newData.as("newData"),
"oldData.id = newData.id")
.whenMatched()
.update(
new HashMap<String, Column>() {{
put("id", functions.col("newData.id"));
}})
.whenNotMatched()
.insertExpr(
new HashMap<String, Column>() {{
put("id", functions.col("newData.id"));
}})
.execute();
deltaTable.toDF().show();
文章不错?点个【在看】吧! ?
这篇关于重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!