Iceberg从入门到精通系列之二十四:Spark Structured Streaming

2024-02-02 11:28

本文主要是介绍Iceberg从入门到精通系列之二十四:Spark Structured Streaming,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Iceberg从入门到精通系列之二十四:Spark Structured Streaming

  • 一、Streaming Reads
  • 二、Streaming Writes
  • 三、Partitioned table
  • 四、流表的维护

Iceberg 使用 Apache Spark 的 DataSourceV2 API 来实现数据源和目录。 Spark DSv2 是一个不断发展的 API,在 Spark 版本中提供不同级别的支持。

一、Streaming Reads

Iceberg 支持处理从历史时间戳开始的 Spark 结构化流作业中的增量数据:

val df = spark.readStream.format("iceberg").option("stream-from-timestamp", Long.toString(streamStartTimestamp)).load("database.table_name")

Iceberg 仅支持从追加快照中读取数据。覆盖快照无法处理,默认会引发异常。通过设置streaming-skip-overwrite-snapshots=true 可以忽略覆盖。同样,删除快照默认会引发异常,通过设置streaming-skip-delete-snapshots=true可以忽略删除。

二、Streaming Writes

要将流式查询中的值写入 Iceberg 表,请使用 DataStreamWriter:

data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("checkpointLocation", checkpointPath).toTable("database.table_name")

如果您使用的是 Spark 3.0 或更早版本,则需要使用 .option(“path”, “database.table_name”).start(),而不是 .toTable(“database.table_name”)。

对于基于目录的 Hadoop 目录:

data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", "hdfs://nn:8020/path/to/table") .option("checkpointLocation", checkpointPath).start()

Iceberg 支持追加和完整输出模式:

  • append:将每个微批次的行追加到表中
  • complete:替换每个微批次的表内容

在开始流式查询之前,请确保您创建了表。请参阅 SQL 创建表文档以了解如何创建 Iceberg 表。

Iceberg 不支持实验性连续处理,因为它不提供“提交”输出的接口。

三、Partitioned table

Iceberg 需要在写入数据之前按每个任务的分区对数据进行排序。在 Spark 中,任务按 Spark 分区进行分割。针对分区表。对于批量查询,建议您进行显式排序来满足要求(请参阅此处),但该方法会带来额外的延迟,因为重新分区和排序被视为流工作负载的繁重操作。为了避免额外的延迟,您可以启用扇出编写器来消除这一要求。

data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("fanout-enabled", "true").option("checkpointLocation", checkpointPath).toTable("database.table_name")

扇出写入器打开每个分区值的文件,并且在写入任务完成之前不会关闭这些文件。避免使用扇出写入器进行批量写入,因为对输出行进行显式排序对于批量工作负载来说成本较低。

四、流表的维护

流式写入可以快速创建新的表版本,创建大量表元数据来跟踪这些版本。强烈建议通过调整提交率、使旧快照过期以及自动清理元数据文件来维护元数据。

调整提交率

高提交率会产生数据文件、清单和快照,从而导致额外的维护。建议触发间隔至少为 1 分钟,并根据需要增加间隔。

使旧快照过期

写入表的每个批次都会生成一个新快照。 Iceberg 跟踪表元数据中的快照,直到它们过期。快照会随着频繁提交而快速积累,因此强烈建议定期维护流式查询写入的表。快照过期是删除元数据和任何不再需要的数据文件的过程。默认情况下,该过程将使超过五天的快照过期。

压缩数据文件
从流处理写入的数据量通常很小,这可能会导致表元数据跟踪大量小文件。将小文件压缩为大文件可以减少表所需的元数据,并提高查询效率。 Iceberg 和 Spark 附带了 rewrite_data_files 过程。

重写清单
为了优化流工作负载的写入延迟,Iceberg 可以使用不会自动压缩清单的“快速”附加写入新快照。这可能会导致大量小的清单文件。 Iceberg可以重写清单文件的数量来提高查询性能。 Iceberg 和 Spark 附带了 rewrite_manifests 过程。

这篇关于Iceberg从入门到精通系列之二十四:Spark Structured Streaming的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/670556

相关文章

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

数论入门整理(updating)

一、gcd lcm 基础中的基础,一般用来处理计算第一步什么的,分数化简之类。 LL gcd(LL a, LL b) { return b ? gcd(b, a % b) : a; } <pre name="code" class="cpp">LL lcm(LL a, LL b){LL c = gcd(a, b);return a / c * b;} 例题:

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联

poj 2104 and hdu 2665 划分树模板入门题

题意: 给一个数组n(1e5)个数,给一个范围(fr, to, k),求这个范围中第k大的数。 解析: 划分树入门。 bing神的模板。 坑爹的地方是把-l 看成了-1........ 一直re。 代码: poj 2104: #include <iostream>#include <cstdio>#include <cstdlib>#include <al

MySQL-CRUD入门1

文章目录 认识配置文件client节点mysql节点mysqld节点 数据的添加(Create)添加一行数据添加多行数据两种添加数据的效率对比 数据的查询(Retrieve)全列查询指定列查询查询中带有表达式关于字面量关于as重命名 临时表引入distinct去重order by 排序关于NULL 认识配置文件 在我们的MySQL服务安装好了之后, 会有一个配置文件, 也就

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0