spark 资源大小分配与并行处理

2024-09-06 11:32

本文主要是介绍spark 资源大小分配与并行处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

起因

写这篇博客的起因我在跑一个spark job时,有时能跑通,有时跑不通。程序的需求是对比两个hbase表。程序逻辑很简单,分别从两个hbase表读取全量数据,然后以cogroup二者,对比同一个rowkey下每个列是否一致。

跑不通的错误日志如下:

17/02/25 21:24:20 INFO collection.ExternalAppendOnlyMap: Thread 1896 spilling in-memory map of 83.6 MB to disk (46 times so far)

17/02/25 21:24:22 WARN server.TransportChannelHandler: Exception in connection from /10.110.1.57:57832

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

……

17/02/25 21:24:22 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1258210057016, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/data-g/hadoop/yarn/local-dir/usercache/test/appcache/application_1466564207556_1562806/blockmgr-ebe23f0d-5a9e-4a37-952b-73bfab6cceed/3f/shuffle_0_6_0.data, offset=474965639, length=95049579}} to /10.130.1.27:53263; closing connection

java.nio.channels.ClosedChannelException

17/02/25 21:24:22 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from c1-hd-dn8.bdp.idc/10.130.1.27:50014 is closed

17/02/25 21:24:22 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms

17/02/25 21:24:22 INFO collection.ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 84.1 MB to disk (44 times so far)

17/02/25 21:24:23 INFO collection.ExternalAppendOnlyMap: Thread 1895 spilling in-memory map of 83.9 MB to disk (47 times so far)

 17/02/25 21:24:27 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)

java.io.IOException: Failed to connect to someHost/someIP:50004

……

Caused by: java.net.ConnectException: Connection refused: someHost/someIP:50004

……


stage0读取第一个hbase表的数据;stage1读取第二个hbase表的数据;stage2 cogroup两表并做数据对比

关注点

针对上面这个问题,做了相关的尝试,解决了以下几个问题:

(1)运行spark job该分配多少资源,即我们该分配多少个executor?每个executor分配多少内存、多少个core?

(2)spark job 的并行度由什么因素决定?

(3)为什么yarn UI也的executor显示的used memory内存大小比配置的内存小?


(1)运行spark job该分配多少资源,即我们该分配多少个executor?每个executor分配多少内存、多少个core?

该分配多少资源主要看输入量的大小、资源计算的复杂度。一般瓶颈会在shuffle阶段,如果执行某个shuffle的task内存不足,那很可能会跑不下去,程序挂掉。

spark中的计算任务都是一个个task单独执行,executor内存越多,单个task执行时内存越足,执行越顺利。 executor越多,core越多,可并行执行的task数目也就越多。假如总共100个task,5个executor,4个core,那么平均需要执行100/(5*4) = 5个批次;如果是2个executor,4个core,那么需要执行100/(2*4) = 13个批次。

core的数量一般根据内存大小和机器物理核数来定。最好不要超过物理核数。如果executor内存是4G,分配了4个core,那么每个core只有4G/4 = 1G内存。所以core不宜太大,如果太大,每个task执行时的内存将会变小,影响正常执行。

举个例子,我们的输入是两个hbase表,均为3.5G。shuffle阶段两个表会根据rowkey 做join,会产生几十G的shuffle数据。我们这样设置资源:

--driver-memory 1g \

--executor-memory 4g \

--num-executors 6 \

--executor-cores 4 \

(2)spark job 的并行度由什么因素决定?

并行度分为理论上最大的并行度和实际执行的并行度两种,“理论上”指的是总共的partition数目,一个partition对应一个task执行,如果数据有100个partition,那么理论上并行度最高可以达到100。“实际执行”指的是这些task实际分到executor各个core执行时的并行度。加入有100个partition,但是分配的资源只有10个executor,每个executor2个core,那么他们的并行度是10*2=40, 实际执行时会分批执行,分为100/(10*2) = 5批。我们一般讨论的并行度是理论上的并行度。

并行度(partition数目)由初始数据大小、初始数据类型,程序中设定的numPartitions大小,分配资源的executor、core数目共同决定。并行度一般在shuffle时发生改变,如果未设定,则默认取上一个stage中最大的partition数目作为当前stage的并行度。所以如果不做设定,那么并行度与初始数据的并行度紧密相关。

1.初始数据文件类型因素

如果读入的数据为hdfs文件,那么默认的并行度是block数量。block大小默认是64MB或128MB。

如果读入的数据是hbase表,那么默认的并行度是表的region数目。

2.人为设定numPartitions

如果人为的在读取数据或者在shuffle类算子中设定numPartitions,那么整体的并行度将会以人为设定的为准。

3.人为设定spark.default.parallelism

spark.default.parallelism参数是全局的,优先级低于人为设定的numPartiton。在shuffle时,如果没有设定numPartiton,那么将为以spark.default.parallelism设定的数目作为并行度。

4.系统默认的spark.default.parallelism

系统默认的spark.default.parallelism = executor数目*core数目

以上4个因素的优先级:

1.numPartitions参数 > 2. spark.default.parallelism参数 > 3. 读取初始文件产生的并行度

(3)为什么yarn UI也的executor显示的used memory内存大小比配置的内存小?

spark中的内存分为多个部分,UI页面上显示的只是缓存RDD用的storage memory,大约是(总内存 - 300M) * 60% * 50% 的量,所以会偏小。具体内存分配如下图:

参见 Apache Spark 内存管理详解

以上。

这篇关于spark 资源大小分配与并行处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141883

相关文章

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

49个权威的网上学习资源网站

艺术与音乐 Dave Conservatoire — 一个完全免费的音乐学习网站,口号是“让每一个人都可以接受世界级的音乐教育”,有视频,有练习。 Drawspace — 如果你想学习绘画,或者提高自己的绘画技能,就来Drawspace吧。 Justin Guitar — 超过800节免费的吉他课程,有自己的app,还有电子书、DVD等实用内容。 数学,数据科学与工程 Codecad

PDFQFZ高效定制:印章位置、大小随心所欲

前言 在科技编织的快节奏时代,我们不仅追求速度,更追求质量,让每一分努力都转化为生活的甜蜜果实——正是在这样的背景下,一款名为PDFQFZ-PDF的实用软件应运而生,它以其独特的功能和高效的处理能力,在PDF文档处理领域脱颖而出。 它的开发,源自于对现代办公效率提升的迫切需求。在数字化办公日益普及的今天,PDF作为一种跨平台、不易被篡改的文档格式,被广泛应用于合同签署、报告提交、证书打印等各个

string字符会调用new分配堆内存吗

gcc的string默认大小是32个字节,字符串小于等于15直接保存在栈上,超过之后才会使用new分配。

【CSS in Depth 2 精译_024】4.2 弹性子元素的大小

当前内容所在位置(可进入专栏查看其他译好的章节内容) 第一章 层叠、优先级与继承(已完结) 1.1 层叠1.2 继承1.3 特殊值1.4 简写属性1.5 CSS 渐进式增强技术1.6 本章小结 第二章 相对单位(已完结) 2.1 相对单位的威力2.2 em 与 rem2.3 告别像素思维2.4 视口的相对单位2.5 无单位的数值与行高2.6 自定义属性2.7 本章小结 第三章 文档流与盒模型(已

Linux下获取硬盘空间的大小

1. df 命令查看所有硬盘设备的信息 2. 查看指定路径的磁盘空间大小 代码获取空间大小案例:

Windows11电脑上自带的画图软件修改照片大小(不裁剪尺寸的情况下)

针对一张图片,有时候上传的图片有大小限制,那么在这种情况下如何修改其大小呢,在不裁剪尺寸的情况下 步骤如下: 1.选定一张图片,右击->打开方式->画图,如下: 第二步:打开图片后,我们可以看到图片的大小为82.1kb,点击上面工具栏的“重设大小和倾斜”进行调整,如下: 第三步:修改水平和垂直的数字,此处我修改为分别都修改为50,然后保存,可以看到大小变成63.5kb,如下:

汇编:嵌入式软件架构学习资源

成为嵌入式软件架构设计师需要掌握多方面的知识,包括嵌入式系统、实时操作系统、硬件接口、软件设计模式等。 以下是一些推荐的博客和网站,可以帮助你深入学习嵌入式软件架构设计: ### 1. **Embedded.com**    - **网址**: [Embedded.com](https://www.embedded.com/)    - **简介**: 这是一个专注于嵌入式系统设计的专业网

Unity 资源 之 Super Confetti FX:点亮项目的璀璨粒子之光

Unity 资源 之 Super Confetti FX:点亮项目的璀璨粒子之光 一,前言二,资源包内容三,免费获取资源包 一,前言 在创意的世界里,每一个细节都能决定一个项目的独特魅力。今天,要向大家介绍一款令人惊艳的粒子效果包 ——Super Confetti FX。 二,资源包内容 💥充满活力与动态,是 Super Confetti FX 最显著的标签。它宛如一位

操作系统是怎么为不同的程序分配所需的内存空间的

操作系统为不同的程序分配内存空间的过程涉及多个关键步骤,确保每个程序都有其所需的内存资源,同时避免程序之间的冲突。以下是操作系统如何为程序分配内存空间的详细过程: 1. 内存管理的基础概念 虚拟内存:现代操作系统使用虚拟内存机制来为程序提供隔离的内存空间。每个程序运行在其独立的虚拟地址空间中,这使得程序间的内存互不干扰。物理内存:实际的 RAM(随机存取存储器),由操作系统和硬件共同管理。虚拟