Spark-StructuredStreaming checkpointLocation分析、优化耗时

本文主要是介绍Spark-StructuredStreaming checkpointLocation分析、优化耗时,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

  • 1 问题描述
  • 2 分析 checkpointLocation 配置
    • 2.1 checkpointLocation 在源码调用链
    • 2.2 MetadataLog(元数据日志接口)
  • 3 分析 checkpointLocation 目录内容
    • 3.1 offsets 目录
    • 3.2 commitLog 目录
    • 3.3 metadata 目录
    • 3.4 sources 目录
    • 3.5 sinks 目录
  • 4 解决方案
    • 4.1 File 作为接收端
    • 4.2 Elasticsearch 作为接收端

内容可能持续性修改完善,最新专栏内容与 spark-docs 同步,源码与 spark-advanced 同步。

1 问题描述

Spark StructuredStreaming 实时任务 kafka -> elasticsearchkafka -> hdfs(parquet格式文件) 任务运行过程中每隔固定时间后某个出现耗时较长。

本内容以kafka -> elasticsearch为例说明,生产环境版本号 Spark-2.4.0,下图为 SQL-UI Job 运行耗时情况:
job-sql-time

问题定位
分析耗时较长任务出现时间,发现出现该问题间隔时间点固定,怀疑是spark某种机制导致,与任务逻辑无关性较大。

查看指定的 checkpointPath 目录发现,在 $checkpointPath/sinks/elasticsearch 下与SQL-UI Job 长时间耗时的时间点一致。初步判断控制生成大文件的方式或者策略即可解决问题。
job-sink-es-checekpoint-compact

2 分析 checkpointLocation 配置

2.1 checkpointLocation 在源码调用链

分析源码查看 StructuredStreaming 启动流程发现,DataStreamWriter#start 方法启动一个 StreamingQuery
同时将 checkpointLocation 配置参数传递给StreamingQuery管理。

StreamingQuery 接口实现关系如下:
StreamingQuery_uml

  • StreamingQueryWrapper 仅包装了一个不可序列化的StreamExecution
  • StreamExecution 管理Spark SQL查询的执行器
    • MicroBatchExecution 微批处理执行器
    • ContinuousExecution 连续处理(流式)执行器

因此我们仅需要分析 checkpointLocation 在 StreamExecution中调用即可。

StreamExecution 中 protected def checkpointFile(name: String): String 方法为所有与 checkpointLocation 有关逻辑,该方法返回 $checkpointFile/name 路径

2.2 MetadataLog(元数据日志接口)

spark 提供了org.apache.spark.sql.execution.streaming.MetadataLog接口用于统一处理元数据日志信息。
checkpointLocation 文件内容均使用 MetadataLog进行维护。

分析 MetadataLog 接口实现关系如下:
MetadataLog_uml

各类作用说明

  • NullMetadataLog 空日志,即不输出日志直接丢弃
  • HDFSMetadataLog 使用 HDFS 作为元数据日志输出
    • CommitLog 提交日志
    • OffsetSeqLog 偏移量日志
    • CompactibleFileStreamLog 封装了支持按大小合并、删除历史记录的 MetadataLog
      • FileStreamSourceLog 文件类型作为数据源时日志记录
      • FileStreamSinkLog 文件类型作为数据接收端时日志记录
      • EsSinkMetadataLog Es作为数据接收端时日志记录

分析 CompactibleFileStreamLog#compact 合并逻辑简单描述为:

假设有 0,1,2,3,4,5,6,7,8,9,10 个批次依次到达,合并大小为3
当前合并结果为   `0,1,2.compact,3,4`
下一次合并结果为 `0,1,2.compact,3,4,5.compact` , **说明:5.compact 文件内容 = 2.compact + 3 + 4**last.compact 文件大小会随着批次运行无限增大
...

分析 CompactibleFileStreamLog 删除过期文件逻辑:

// CompactibleFileStreamLog#add 方法被调用时,默认会判断是否支持删除操作override def add(batchId: Long, logs: Array[T]): Boolean = {val batchAdded =if (isCompactionBatch(batchId, compactInterval)) { // 是否合并compact(batchId, logs)} else {super.add(batchId, logs)}if (batchAdded && isDeletingExpiredLog) { // 添加成功且支持删除过期文件// 删除时判断当前批次是否在 spark.sql.streaming.minBatchesToRetain 配置以外且在文件保留时间内// 配置项参考 第4节 解决方案配置说明deleteExpiredLog(batchId) }batchAdded}

3 分析 checkpointLocation 目录内容

目前 checkpointLocation 内容主要包含以下几个目录(子小节中逐个介绍目录数据来源及功能性)

  • offsets
  • commits
  • metadata
  • sources
  • sinks

3.1 offsets 目录

记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据,在处理数据前将其写入此日志记录。
此日志中的第 N 条记录表示当前正在已处理,第 N-1 个条目指示哪些偏移已处理完成。

// StreamExecution 类中声明了 OffsetSeqLog 变量进行操作
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))// 该日志示例内容如下,文件路径=checkpointLocation/offsets/560504
v1
{"batchWatermarkMs":0,"batchTimestampMs":1574315160001,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"game_dc_real_normal":{"17":279843310,"8":318732102,"11":290676804,"2":292352132,"5":337789356,"14":277147358,"13":334833752,"4":319279439,"16":314038811,"7":361740056,"1":281418138,"10":276872234,"9":244398684,"3":334708621,"12":290208334,"15":267180971,"6":296588360,"0":350011707}}

3.2 commitLog 目录

记录已完成的批次,重启任务检查完成的批次与 offsets 批次记录比对,确定接下来运行的批次

// StreamExecution 类中声明了 CommitLog 变量进行操作
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))// 该日志示例内容如下,文件路径=checkpointLocation/commits/560504
v1
{"nextBatchWatermarkMs":0}

3.3 metadata 目录

metadata 与整个查询关联的元数据,目前仅保留当前job id

// StreamExecution 类中声明了 StreamMetadata 变量进行操作,策略如下:/** Metadata associated with the whole query */protected val streamMetadata: StreamMetadata = {val metadataPath = new Path(checkpointFile("metadata"))val hadoopConf = sparkSession.sessionState.newHadoopConf()StreamMetadata.read(metadataPath, hadoopConf).getOrElse {val newMetadata = new StreamMetadata(UUID.randomUUID.toString)StreamMetadata.write(newMetadata, metadataPath, hadoopConf)newMetadata}}// 该日志示例内容如下,文件路径=checkpointLocation/metadata
{"id":"5314beeb-6026-485b-947a-cb088a9c9bac"}

3.4 sources 目录

sources 目录为数据源(Source)时各个批次读取详情

3.5 sinks 目录

sinks 目录为数据接收端(Sink)时批次的写出详情

例如: es 作为 sink 时,内容如下

目前 Es 支持配置自定义写出目录,如果未配置写入 checkpointLocation/sinks/ 目录,参考SparkSqlStreamingConfigs

文件路径=checkpointLocation/sinks/elasticsearch/560504
v1
{"taskId":0,"execTimeMillis":1574302020143,"resource":"rs_real_{app}.{dt}","records":220}
{"taskId":1,"execTimeMillis":1574302020151,"resource":"rs_real_{app}.{dt}","records":221}
{"taskId":2,"execTimeMillis":1574302020154,"resource":"rs_real_{app}.{dt}","records":219}
{"taskId":3,"execTimeMillis":1574302020151,"resource":"rs_real_{app}.{dt}","records":221}
{"taskId":4,"execTimeMillis":1574302020154,"resource":"rs_real_{app}.{dt}","records":220} 


例如: 文件类型作为 sink,默认写出到各个 $path/_spark_metadata 目录下 ,参考 FileStreamSink

hdfs 写出时内容为,文件路径=$path/_spark_metadata/560504
v1
{"path":"hdfs://xx:8020/$path/1.c000.snappy.parquet","size":8937,"isDir":false,"modificationTime":1574321763584,"blockReplication":2,"blockSize":134217728,"action":"add"}
{"path":"hdfs://xx:8020/$path/2.c000.snappy.parquet","size":11786,"isDir":false,"modificationTime":1574321763596,"blockReplication":2,"blockSize":134217728,"action":"add"}

4 解决方案

根据实际业务情况合理调整日志输出参数(配置见4.1/4.2说明):

  • 关闭日志输出
  • 控制保留并可以恢复的最小批次数量且减小日志文件保留时间
  • 调整日志文件合并阈值

无论如何调整参数,.compact(合并的文件)大小会一直增长,目前关闭可以解决。调整其他阈值可减小任务出现耗时情况次数。
针对该问题已提交给官方 SPARK-29995尝试解决

4.1 File 作为数据源或者数据接收端

  • spark.sql.streaming.minBatchesToRetain (默认100) 保留并可以恢复的最小批次数量
  • spark.sql.streaming.commitProtocolClass 默认:org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol 合并实现类,其余支持实现参考FileCommitProtocol实现类

fileSource 数据源端:配置在 FileStreamSourceLog 引用

  • spark.sql.streaming.fileSource.log.deletion (默认true),删除过期日志文件
  • spark.sql.streaming.fileSource.log.compactInterval (默认10),日志文件合并阈值
  • spark.sql.streaming.fileSource.log.cleanupDelay (默认10m),日志文件保留时间

fileSink 接收端:配置在 FileStreamSinkLog 引用

  • spark.sql.streaming.fileSink.log.deletion (默认true),删除过期日志文件CompactibleFileStreamLog
  • spark.sql.streaming.fileSink.log.compactInterval (默认10),日志文件合并阈值
  • spark.sql.streaming.fileSink.log.cleanupDelay (默认10m),日志文件保留时间

4.2 Elasticsearch 作为接收端

elasticsearch-spark 官方文档,es 官方重写变量命名及赋值方式,参考EsSinkMetadataLog

  • es.spark.sql.streaming.sink.log.enabled(默认true) 启用或禁用流作业的提交日志。默认情况下,该日志处于启用状态,并且具有相同批次ID的输出批次将被跳过,以避免重复写入。设置false为时,将禁用提交日志,并且所有输出都将发送到Elasticsearch,无论它们是否在先前的执行中已发送。
  • es.spark.sql.streaming.sink.log.path 设置存储此流查询的日志数据的位置。如果未设置此值,那么Elasticsearch接收器会将其提交日志存储在中给定的路径下checkpointLocation。任何与HDFS客户端兼容的URI都是可以接受的。
  • es.spark.sql.streaming.sink.log.cleanupDelay(默认10m) 提交日志通过Spark的HDFS客户端进行管理。一些与HDFS兼容的文件系统(例如Amazon的S3)以异步方式传播文件更改。为了解决这个问题,在压缩了一组日志文件之后,客户端将等待此时间,然后再清理旧文件。
  • es.spark.sql.streaming.sink.log.deletion(默认true) 确定日志是否应删除不再需要的旧日志。提交每个批次后,客户端将检查是否有已压缩且可以安全删除的提交日志。如果设置为false,日志将跳过此清理步骤,为每个批次保留一个提交文件。
  • es.spark.sql.streaming.sink.log.compactInterval(默认10) 设置压缩日志文件之前要处理的批次数。默认情况下,每10批提交日志将被压缩为一个包含所有以前提交的批ID的文件。

这篇关于Spark-StructuredStreaming checkpointLocation分析、优化耗时的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1027168

相关文章

Vue3 的 shallowRef 和 shallowReactive:优化性能

大家对 Vue3 的 ref 和 reactive 都很熟悉,那么对 shallowRef 和 shallowReactive 是否了解呢? 在编程和数据结构中,“shallow”(浅层)通常指对数据结构的最外层进行操作,而不递归地处理其内部或嵌套的数据。这种处理方式关注的是数据结构的第一层属性或元素,而忽略更深层次的嵌套内容。 1. 浅层与深层的对比 1.1 浅层(Shallow) 定义

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动