5.数据湖deltalake流表的读写

2023-10-09 02:08
文章标签 数据 读写 流表 deltalake

本文主要是介绍5.数据湖deltalake流表的读写,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

delta lake和 spark structured streaming可以深度整合。delta lake克服了很多常见的与流系统和文件整合带来的相关限制,如下:

  • 保证了多个流(或并发批处理作业)的仅一次处理。

  • 当使用文件作为流源时,可以有效地发现哪些文件是新文件。

1. 作为stream source

1.1 案例讲解

当你的structured streaming使用delta lake作为stream source的时候,应用会处理delta 表中已有的数据,以及delta 表新增的数据。

spark.readStream.format("delta").load("/delta/events")

也可以做一些优化,如下:

a.通过maxFilesPerTrigger配置控制structured streaming从delta lake加载的微批文件数。要知道Structured streaming也是微批的概念。该参数就是控制每次trigger计算的最大新增文件数,默认是1000,实际情况要根据数据量和资源数量进行控制。

b.通过maxBytesPerTrigger控制每次trigger处理的最大数据量。这是设置一个“ soft max”,这意味着一个批处理大约可以处理此数量的数据,并且可能处理的数量超出这个限制。如果使用的是Trigger.Once,则 此配置无效。如果将此配置与maxFilesPerTrigger结合使用,两个参数任意一个达到临届条件,都会生效。

1.2 忽略更新和删除

structured streaming不处理不是追加的输入数据,并且如果对作为source的delta table的表进行了任何修改,则structured streaming会抛出异常。 对于变更常见的企业场景,提供了两种策略,来处理对delta 表变更给structured streaming 任务造成的影响:

  • 可以删除输出和checkpoint,并重新启动structured streaming对数据计算,也即是重新计算一次。

  • 可以设置以下两个选项之一:

    • ignoreDeletes:忽略在分区表中删除数据的事务。

    • ignoreChanges:如果由于诸如UPDATE,MERGE INTO,DELETE(在分区内)或OVERWRITE之类的数据更改操作而不得不在源表中重写文件,则重新处理更新的文件。因此未更改的行仍可能会处理并向下游传输,因此structured streaming的下游应该能够处理重复数据。删除不会传输到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,则流不会因源表的删除或更新而中断。

1.3 案例

假设有一张表叫做user_events,有三个字段:date,user_email,action,而且该表以date字段进行分区。structured streaming区处理这张表,且还有其程序会对该delta 表进行插入和删除操作。

假设仅仅是删除操作,可以这么配置stream:

events.readStream  .format("delta")  .option("ignoreDeletes", "true")  .load("/delta/user_events")

假设对delta表修改操作,可以这么配置stream:

events.readStream  .format("delta")  .option("ignoreChanges", "true")  .load("/delta/user_events")

如果使用UPDATE语句更新了user_email字段某个值,则包含相关user_email的文件将被重写,这个是delta lake更改操作实现机制后面会讲。使用ignoreChanges时,新记录将与同一文件中的所有其他未更改记录一起向下游传输。 所以下游程序应该能够处理这些传入的重复记录。

2.delta 表作为sink

delta table可以作为Structured Streaming的sink使用。delta lake的事务日志确保了其能实现仅一次处理。

2.1 append mode

默认是append 模式,仅仅是追加数据到delta 表:

events.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").start("/delta/events") // as a path

2.2 complete mode

也可以使用Structured Streaming每个批次覆盖一次整张表。在某些聚合场景下会用到该模式:

  .format("delta").load("/delta/events").groupBy("customerId").count().writeStream.format("delta").outputMode("complete").option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg").start("/delta/eventsByCustomer")

对于延迟要求更宽松的应用程序,可以使用Trigger.Once来节省计算资源。once trigger每次处理从开始到最新的数据,典型的kappa模型,很适合这种场景了。

推荐阅读:

1.数据湖deltalake初识

2.数据湖DeltaLake之DDL操作

3.数据湖deltalake之时间旅行及版本管理

4.数据湖之schema校验

这篇关于5.数据湖deltalake流表的读写的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/169580

相关文章

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到... python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

SpringBoot整合jasypt实现重要数据加密

《SpringBoot整合jasypt实现重要数据加密》Jasypt是一个专注于简化Java加密操作的开源工具,:本文主要介绍详细介绍了如何使用jasypt实现重要数据加密,感兴趣的小伙伴可... 目录jasypt简介 jasypt的优点SpringBoot使用jasypt创建mapper接口配置文件加密