【pyspark速成专家】11_Spark性能调优方法2

2024-05-28 22:52

本文主要是介绍【pyspark速成专家】11_Spark性能调优方法2,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

​编辑

二,Spark任务UI监控

三,Spark调优案例


二,Spark任务UI监控

Spark任务启动后,可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。

该界面中可以从多个维度以直观的方式非常细粒度地查看Spark任务的执行情况,包括任务进度,耗时分析,存储分析,shuffle数据量大小等。

最常查看的页面是 Stages页面和Excutors页面。

Jobs: 每一个Action操作对应一个Job,以Job粒度显示Application进度。有时间轴Timeline。

Stages: Job在遇到shuffle切开Stage,显示每个Stage进度,以及shuffle数据量。可以点击某个Stage进入详情页,查看其下面每个Task的执行情况以及各个partition执行的费时统计。

Storage:

监控cache或者persist导致的数据存储大小。

Environment:
显示spark和scala版本,依赖的各种jar包及其版本。

Excutors : 监控各个Excutors的存储和shuffle情况。

SQL: 显示各种SQL命令在那些Jobs中被执行。

三,Spark调优案例

下面介绍几个调优的典型案例:

1,资源配置优化

2,利用缓存减少重复计算

3,数据倾斜调优

4,broadcast+map代替join

5,reduceByKey/aggregateByKey代替groupByKey

1,资源配置优化

下面是一个资源配置的例子:

优化前:

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 8 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py 

优化后:这里主要减小了 executor-cores数量,一般设置为1~4,过大的数量可能会造成每个core计算和存储资源不足产生OOM,也会增加GC时间。 此外也将默认分区数调到了1600,并设置了2G的堆外内存。

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 2 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.default.parallelism=1600 \
--conf spark.sql.shuffle.partitions=1600 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g\
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py 

2, 利用缓存减少重复计算

%%time
# 优化前:
import math 
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))
s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n 
print(mean)%%time 
# 优化后: 
import math 
from  pyspark.storagelevel import StorageLevel
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n 
rdd_data.unpersist()
print(mean)

3, 数据倾斜调优

%%time 
# 优化前: 
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_count = rdd_one.reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect()) %%time 
# 优化后: 
import random 
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_mid_key = rdd_one.map(lambda x:(x[0]+"_"+str(random.randint(0,999)),x[1]))
rdd_mid_count = rdd_mid_key.reduceByKey(lambda a,b:a+b+0.0)
rdd_count = rdd_mid_count.map(lambda x:(x[0].split("_")[0],x[1])).reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect())  #作者按:此处仅示范原理,单机上该优化方案难以获得性能优势

4, broadcast+map代替join

该优化策略一般限于有一个参与join的rdd的数据量不大的情况。

%%time 
# 优化前:rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])
rdd_students = rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))print(rdd_students.collect())%%time # 优化后:
rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)
ages = rdd_age.collect()
broads = sc.broadcast(ages)def get_age(it):result = []ages = dict(broads.value)for x in it:name = x[0]age = ages.get(name,0)result.append((x[0],age,x[1]))return iter(result)rdd_students = rdd_gender.mapPartitions(get_age)print(rdd_students.collect())

5,reduceByKey/aggregateByKey代替groupByKey

groupByKey算子是一个低效的算子,其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通过在每个partition内部先做一次数据的合并操作,大大减少了shuffle的数据量。

%%time 
# 优化前:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))
names = rdd_names.collect()
print(names)%%time 
# 优化后:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.aggregateByKey([],lambda arr,name:arr+[name],lambda arr1,arr2:arr1+arr2)names = rdd_names.collect()
print(names)

这篇关于【pyspark速成专家】11_Spark性能调优方法2的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1011919

相关文章

问题:第一次世界大战的起止时间是 #其他#学习方法#微信

问题:第一次世界大战的起止时间是 A.1913 ~1918 年 B.1913 ~1918 年 C.1914 ~1918 年 D.1914 ~1919 年 参考答案如图所示

[word] word设置上标快捷键 #学习方法#其他#媒体

word设置上标快捷键 办公中,少不了使用word,这个是大家必备的软件,今天给大家分享word设置上标快捷键,希望在办公中能帮到您! 1、添加上标 在录入一些公式,或者是化学产品时,需要添加上标内容,按下快捷键Ctrl+shift++就能将需要的内容设置为上标符号。 word设置上标快捷键的方法就是以上内容了,需要的小伙伴都可以试一试呢!

大学湖北中医药大学法医学试题及答案,分享几个实用搜题和学习工具 #微信#学习方法#职场发展

今天分享拥有拍照搜题、文字搜题、语音搜题、多重搜题等搜题模式,可以快速查找问题解析,加深对题目答案的理解。 1.快练题 这是一个网站 找题的网站海量题库,在线搜题,快速刷题~为您提供百万优质题库,直接搜索题库名称,支持多种刷题模式:顺序练习、语音听题、本地搜题、顺序阅读、模拟考试、组卷考试、赶快下载吧! 2.彩虹搜题 这是个老公众号了 支持手写输入,截图搜题,详细步骤,解题必备

电脑不小心删除的文件怎么恢复?4个必备恢复方法!

“刚刚在对电脑里的某些垃圾文件进行清理时,我一不小心误删了比较重要的数据。这些误删的数据还有机会恢复吗?希望大家帮帮我,非常感谢!” 在这个数字化飞速发展的时代,电脑早已成为我们日常生活和工作中不可或缺的一部分。然而,就像生活中的小插曲一样,有时我们可能会在不经意间犯下一些小错误,比如不小心删除了重要的文件。 当那份文件消失在眼前,仿佛被时间吞噬,我们不禁会心生焦虑。但别担心,就像每个问题

邮件群发推送的方法技巧?有哪些注意事项?

邮件群发推送的策略如何实现?邮件推送怎么评估效果? 电子邮件营销是现代企业进行推广和沟通的重要工具。有效的邮件群发推送不仅能提高客户参与度,还能促进销售增长。AokSend将探讨一些关键的邮件群发推送方法和技巧,以帮助企业优化其邮件营销策略。 邮件群发推送:目标受众 了解他们的需求、兴趣和行为习惯有助于你设计出更具吸引力和相关性的邮件内容。通过收集和分析数据,创建详细的客户画像,可以更精

上采样(upsample)的方法

上采样(upsample)的方法   在神经网络中,扩大特征图的方法,即upsample/上采样的方法   1)unpooling:恢复max的位置,其余部分补零   2)deconvolution(反卷积):先对input补零,再conv   3)插值方法,双线性插值等;   4)扩张卷积,dilated conv;

青龙面板部署通用教程,含服务器、路由器、X86等部署方法

1. 拉取镜像/更新镜像 docker pull whyour/qinglong:latest 2. 删除镜像 docker rmi whyour/qinglong:latest 3. 启动容器 普通服务器 docker run -dit \-v $PWD/ql/config:/ql/config \-v $PWD/ql/log:/ql/log \-v $PWD/ql/db:

Java中如何优化数据库查询性能?

Java中如何优化数据库查询性能? 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将深入探讨在Java中如何优化数据库查询性能,这是提升应用程序响应速度和用户体验的关键技术。 优化数据库查询性能的重要性 在现代应用开发中,数据库查询是最常见的操作之一。随着数据量的增加和业务复杂度的提升,数据库查询的性能优化显得尤为重

# bash: chkconfig: command not found 解决方法

bash: chkconfig: command not found 解决方法 一、chkconfig 错误描述: 这个错误表明在 Bash 环境下,尝试执行 chkconfig 命令,但是系统找不到这个命令。chkconfig 命令是一个用于管理 Linux 系统中服务的启动和停止的工具,通常它是 initscripts 包的一部分,但在最新的 Linux 发行版中可能已经被 syste

Python几种建表方法运行时间的比较

建立一个表[0,1,2,3.......10n],下面几种方法都能实现,但是运行时间却截然不同哦 import time#方法一def test1(n):list=[]for i in range(n*10):list=list+[i]return list#方法二def test2(n):list=[]for i in range(n*10):list.append(i)#方法三d