Spark Streaming 流计算优化记录(4)-时间都去哪儿了,关于调度与空转

2023-12-19 16:59

本文主要是介绍Spark Streaming 流计算优化记录(4)-时间都去哪儿了,关于调度与空转,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!




6. 时间都去where了,青春不能等,调度也是
除了上述优化, 我们还注意到一个奇怪的现象:
 
怎么回事, 即使接收不到消息都要花掉5秒?!! 虽然Spark Streaming空转依然会产生空task, 这些空task依然会消耗序列化, 压缩, 调度等时间, 但也不至于那么多吧!!!
我们拿一个Stage看看, 就拿处理Kafka消息的那个Stage作例子吧:
 
Kafka没有任何消息进来的情况下, 这个Stage竟然耗费我3秒青春, 有无搞错! 时间都去where了? 
接着我们看了一下task的时间分布图:
 
从图中, 我们可以看到Spark总共调度分发了两批次task set, 每个task set的处理(含序列化和压缩之类的工作)都不超过100毫秒, 那么该Stage何来消耗3秒呢? 慢着, 貌似这两批次的task set分发的时间相隔得有点长啊, 隔了2秒多. 为什么会隔这么就才调度一次呢?
此处要引入一个配置项” spark.locality.wait”, 它配置了本地化调度降级所需要的时间. 这里概要补充下Spark本地化调度的知识, Spark的task一般都会分发到它所需数据的那个节点, 这称之为”NODE_LOCAL”, 但在资源不足的情况下, 数据所在节点未必有资源处理task, 因此Spark在等待了” spark.locality.wait”所配置的时间长度后, 会退而求其次, 分发到数据所在节点的同一个机架的其它节点上, 这是”RACK_LOCAL”, 当然, 也有更惨的, 就是再等了一段” spark.locality.wait”的时间长度后, 干脆随便找一台机器去跑task, 这就是”ANY”策略了. 
而从上例看到, 即使用最差的”ANY”策略进行调度, task set的处理也只是花了100毫秒, 因此, 没必要非得为了”NODE_LOCAL”策略的生效而去等待那么长的时间, 特别是在流计算这种场景上. 所以把” spark.locality.wait”果断调小, 从1秒到500毫秒, 最后干脆调到100毫秒算了.
调了之后的处理时间是酱紫的:
 
原来两个Stage空转需要5秒, 现在变成1秒了. 调度不能等啊.


7. 进一步减少空转耗时
上一节以处理Kafka消息的那个Stage作为例子, 讲了如何发现时间消耗, 如何减少等待时间, 这里再讲下在没数据处理的情况下如何非侵入式地减少不必要的空转. (呵呵,所谓非侵入式就是不修改Spark源代码啦,否则后期维护很烦人的)
这一节, 我们以进行数据join的Stage作为例子.
 
该Stage所做的事情就是从HDFS中加载数据, 进行转换处理后, 缓存在内存中, 然后与Kafka过来的数据在本机内存中进行join操作. 空转时的耗时是1秒, 时间分布如下:
 
调度等待和序列化的耗时还算正常, 但为毛在task set中啥都没有的情况下对task set的处理都需要1秒呢?
通过研究可知, 即使join的双方有一方没数据的情况下, Spark依然会循环另一方的数据, 以按key对value进行汇总.
 
额, 就是这个循环耗了我们近1秒青春. 而其实在这个场景下, 当Kafka方面没数据输入时, 就根本不要进这个循环, 直接返回空就是了. 因此我们引入了新的SkipableCoGroupedRDD.

 

该RDD负责两个不同RDD的join操作, 但与一般的join操作不同的是, 它会把第一个RDD作为是否能够跳过join操作的参照, 若第一个RDD中根本没有数据, 那么整个join操作会被跳过.

使用了SkipableCoGroupedRDD的处理结果如下:
 
在空转的情况下, 整个join的Stage的处理时间只需要0.2秒. 空转作业的处理时间进一步降低到0.2~0.3秒.
 

这篇关于Spark Streaming 流计算优化记录(4)-时间都去哪儿了,关于调度与空转的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/512986

相关文章

JAVA线程的周期及调度机制详解

《JAVA线程的周期及调度机制详解》Java线程的生命周期包括NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING和TERMINATED,线程调度依赖操作系统,采用抢占... 目录Java线程的生命周期线程状态转换示例代码JAVA线程调度机制优先级设置示例注意事项JAVA线程

Python中4大日志记录库比较的终极PK

《Python中4大日志记录库比较的终极PK》日志记录框架是一种工具,可帮助您标准化应用程序中的日志记录过程,:本文主要介绍Python中4大日志记录库比较的相关资料,文中通过代码介绍的非常详细,... 目录一、logging库1、优点2、缺点二、LogAid库三、Loguru库四、Structlogphp

Spring Boot基于 JWT 优化 Spring Security 无状态登录实战指南

《SpringBoot基于JWT优化SpringSecurity无状态登录实战指南》本文介绍如何使用JWT优化SpringSecurity实现无状态登录,提高接口安全性,并通过实际操作步骤... 目录Spring Boot 实战:基于 JWT 优化 Spring Security 无状态登录一、先搞懂:为什

SpringBoot整合Apache Spark实现一个简单的数据分析功能

《SpringBoot整合ApacheSpark实现一个简单的数据分析功能》ApacheSpark是一个开源的大数据处理框架,它提供了丰富的功能和API,用于分布式数据处理、数据分析和机器学习等任务... 目录第一步、添加android依赖第二步、编写配置类第三步、编写控制类启动项目并测试总结ApacheS

Java JAR 启动内存参数配置指南(从基础设置到性能优化)

《JavaJAR启动内存参数配置指南(从基础设置到性能优化)》在启动Java可执行JAR文件时,合理配置JVM内存参数是保障应用稳定性和性能的关键,本文将系统讲解如何通过命令行参数、环境变量等方式... 目录一、核心内存参数详解1.1 堆内存配置1.2 元空间配置(MetASPace)1.3 线程栈配置1.

Python连接Spark的7种方法大全

《Python连接Spark的7种方法大全》ApacheSpark是一个强大的分布式计算框架,广泛用于大规模数据处理,通过PySpark,Python开发者能够无缝接入Spark生态系统,本文给大家介... 目录第一章:python与Spark集成概述PySpark 的核心优势基本集成配置步骤启动一个简单的

java时区时间转为UTC的代码示例和详细解释

《java时区时间转为UTC的代码示例和详细解释》作为一名经验丰富的开发者,我经常被问到如何将Java中的时间转换为UTC时间,:本文主要介绍java时区时间转为UTC的代码示例和详细解释,文中通... 目录前言步骤一:导入必要的Java包步骤二:获取指定时区的时间步骤三:将指定时区的时间转换为UTC时间步

docker编写java的jar完整步骤记录

《docker编写java的jar完整步骤记录》在平常的开发工作中,我们经常需要部署项目,开发测试完成后,最关键的一步就是部署,:本文主要介绍docker编写java的jar的相关资料,文中通过代... 目录all-docker/生成Docker打包部署文件配置服务A的Dockerfile (a/Docke

MySQL使用EXISTS检查记录是否存在的详细过程

《MySQL使用EXISTS检查记录是否存在的详细过程》EXISTS是SQL中用于检查子查询是否返回至少一条记录的运算符,它通常用于测试是否存在满足特定条件的记录,从而在主查询中进行相应操作,本文给大... 目录基本语法示例数据库和表结构1. 使用 EXISTS 在 SELECT 语句中2. 使用 EXIS

Docker多阶段镜像构建与缓存利用性能优化实践指南

《Docker多阶段镜像构建与缓存利用性能优化实践指南》这篇文章将从原理层面深入解析Docker多阶段构建与缓存机制,结合实际项目示例,说明如何有效利用构建缓存,组织镜像层次,最大化提升构建速度并减少... 目录一、技术背景与应用场景二、核心原理深入分析三、关键 dockerfile 解读3.1 Docke