Spark Streaming 流计算优化记录(4)-时间都去哪儿了,关于调度与空转

2023-12-19 16:59

本文主要是介绍Spark Streaming 流计算优化记录(4)-时间都去哪儿了,关于调度与空转,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!




6. 时间都去where了,青春不能等,调度也是
除了上述优化, 我们还注意到一个奇怪的现象:
 
怎么回事, 即使接收不到消息都要花掉5秒?!! 虽然Spark Streaming空转依然会产生空task, 这些空task依然会消耗序列化, 压缩, 调度等时间, 但也不至于那么多吧!!!
我们拿一个Stage看看, 就拿处理Kafka消息的那个Stage作例子吧:
 
Kafka没有任何消息进来的情况下, 这个Stage竟然耗费我3秒青春, 有无搞错! 时间都去where了? 
接着我们看了一下task的时间分布图:
 
从图中, 我们可以看到Spark总共调度分发了两批次task set, 每个task set的处理(含序列化和压缩之类的工作)都不超过100毫秒, 那么该Stage何来消耗3秒呢? 慢着, 貌似这两批次的task set分发的时间相隔得有点长啊, 隔了2秒多. 为什么会隔这么就才调度一次呢?
此处要引入一个配置项” spark.locality.wait”, 它配置了本地化调度降级所需要的时间. 这里概要补充下Spark本地化调度的知识, Spark的task一般都会分发到它所需数据的那个节点, 这称之为”NODE_LOCAL”, 但在资源不足的情况下, 数据所在节点未必有资源处理task, 因此Spark在等待了” spark.locality.wait”所配置的时间长度后, 会退而求其次, 分发到数据所在节点的同一个机架的其它节点上, 这是”RACK_LOCAL”, 当然, 也有更惨的, 就是再等了一段” spark.locality.wait”的时间长度后, 干脆随便找一台机器去跑task, 这就是”ANY”策略了. 
而从上例看到, 即使用最差的”ANY”策略进行调度, task set的处理也只是花了100毫秒, 因此, 没必要非得为了”NODE_LOCAL”策略的生效而去等待那么长的时间, 特别是在流计算这种场景上. 所以把” spark.locality.wait”果断调小, 从1秒到500毫秒, 最后干脆调到100毫秒算了.
调了之后的处理时间是酱紫的:
 
原来两个Stage空转需要5秒, 现在变成1秒了. 调度不能等啊.


7. 进一步减少空转耗时
上一节以处理Kafka消息的那个Stage作为例子, 讲了如何发现时间消耗, 如何减少等待时间, 这里再讲下在没数据处理的情况下如何非侵入式地减少不必要的空转. (呵呵,所谓非侵入式就是不修改Spark源代码啦,否则后期维护很烦人的)
这一节, 我们以进行数据join的Stage作为例子.
 
该Stage所做的事情就是从HDFS中加载数据, 进行转换处理后, 缓存在内存中, 然后与Kafka过来的数据在本机内存中进行join操作. 空转时的耗时是1秒, 时间分布如下:
 
调度等待和序列化的耗时还算正常, 但为毛在task set中啥都没有的情况下对task set的处理都需要1秒呢?
通过研究可知, 即使join的双方有一方没数据的情况下, Spark依然会循环另一方的数据, 以按key对value进行汇总.
 
额, 就是这个循环耗了我们近1秒青春. 而其实在这个场景下, 当Kafka方面没数据输入时, 就根本不要进这个循环, 直接返回空就是了. 因此我们引入了新的SkipableCoGroupedRDD.

 

该RDD负责两个不同RDD的join操作, 但与一般的join操作不同的是, 它会把第一个RDD作为是否能够跳过join操作的参照, 若第一个RDD中根本没有数据, 那么整个join操作会被跳过.

使用了SkipableCoGroupedRDD的处理结果如下:
 
在空转的情况下, 整个join的Stage的处理时间只需要0.2秒. 空转作业的处理时间进一步降低到0.2~0.3秒.
 

这篇关于Spark Streaming 流计算优化记录(4)-时间都去哪儿了,关于调度与空转的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/512986

相关文章

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

使用C#代码计算数学表达式实例

《使用C#代码计算数学表达式实例》这段文字主要讲述了如何使用C#语言来计算数学表达式,该程序通过使用Dictionary保存变量,定义了运算符优先级,并实现了EvaluateExpression方法来... 目录C#代码计算数学表达式该方法很长,因此我将分段描述下面的代码片段显示了下一步以下代码显示该方法如

Python 标准库time时间的访问和转换问题小结

《Python标准库time时间的访问和转换问题小结》time模块为Python提供了处理时间和日期的多种功能,适用于多种与时间相关的场景,包括获取当前时间、格式化时间、暂停程序执行、计算程序运行时... 目录模块介绍使用场景主要类主要函数 - time()- sleep()- localtime()- g

MySQL不使用子查询的原因及优化案例

《MySQL不使用子查询的原因及优化案例》对于mysql,不推荐使用子查询,效率太差,执行子查询时,MYSQL需要创建临时表,查询完毕后再删除这些临时表,所以,子查询的速度会受到一定的影响,本文给大家... 目录不推荐使用子查询和JOIN的原因解决方案优化案例案例1:查询所有有库存的商品信息案例2:使用EX

MySQL中my.ini文件的基础配置和优化配置方式

《MySQL中my.ini文件的基础配置和优化配置方式》文章讨论了数据库异步同步的优化思路,包括三个主要方面:幂等性、时序和延迟,作者还分享了MySQL配置文件的优化经验,并鼓励读者提供支持... 目录mysql my.ini文件的配置和优化配置优化思路MySQL配置文件优化总结MySQL my.ini文件

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Servlet中配置和使用过滤器的步骤记录

《Servlet中配置和使用过滤器的步骤记录》:本文主要介绍在Servlet中配置和使用过滤器的方法,包括创建过滤器类、配置过滤器以及在Web应用中使用过滤器等步骤,文中通过代码介绍的非常详细,需... 目录创建过滤器类配置过滤器使用过滤器总结在Servlet中配置和使用过滤器主要包括创建过滤器类、配置过滤