Spark Core源码精读计划7 | Spark执行环境的初始化

2024-09-06 21:48

本文主要是介绍Spark Core源码精读计划7 | Spark执行环境的初始化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

推荐阅读

Spark源码精度计划 | SparkConf

Spark Core源码精读计划 | SparkContext组件初始化

Spark Core源码精读计划3 | SparkContext辅助属性及后初始化

《Spark Core源码精读计划4 | SparkContext提供的其他功能》

《Spark Core源码精读计划5 | 事件总线及ListenerBus》

《Spark Core源码精读计划6 | AsyncEventQueue与LiveListenerBus》

目录

  • 前言

  • SparkEnv的入口

  • SparkEnv初始化的组件

    • SecurityManager

    • RpcEnv

    • SerializerManager

    • BroadcastManager

    • MapOutputTracker

    • ShuffleManager

    • MemoryManager

    • BlockManager

    • MetricsSystem

    • OutputCommitCoordinator

  • SparkEnv的创建与保存

  • 总结


前言

继事件总线之后,SparkContext第二个初始化的主要组件是SparkEnv,即Spark执行环境。Driver和Executor的正常运行都依赖SparkEnv提供的环境作为支持。SparkEnv初始化成功之后,与Spark存储、计算、监控等相关的底层功能才会真正准备好,可见它几乎与SparkContext同等重要。

SparkEnv内部也包含了很多种组件,比起SparkContext的组件会稍微接地气一点。我们采用与研究SparkContext近似的方式来研究它。


SparkEnv的入口

在文章#2的代码#2.5~#2.6中,我们已经得知Driver执行环境是通过调用SparkEnv.createDriverEnv()方法来创建的,这个方法位于SparkEnv类的伴生对象中。同理,也有createExecutorEnv()方法。我们从这两个方法入手来看一下代码。

代码#7.1 - o.a.s.SparkEnv.createDriverEnv()与createExecutorEnv()方法


  private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
None
}
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}

private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
hostname,
hostname,
None,
isLocal,
numCores,
ioEncryptionKey
)
SparkEnv.set(env)
env
}

可见,它们都是调用伴生对象内的create()方法来创建SparkEnv的。这个方法很长,所以先来看一看它的声明。

代码#7.2 - o.a.s.SparkEnv.create()方法的声明

 
private def create(	conf: SparkConf,	executorId: String,	bindAddress: String,	advertiseAddress: String,	port: Option[Int],	isLocal: Boolean,	numUsableCores: Int,	ioEncryptionKey: Option[Array[Byte]],	listenerBus: LiveListenerBus = null,	mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { /*...*/ }

其中有几个参数需要说明一下。

  • executorId:Executor的唯一标识。如果是Driver的话,值就是字符串"driver"。

  • bindAddress/advertiseAddress:分别是监听Socket绑定的地址,与RPC端点的地址。

  • isLocal:是否为本地模式。

  • numUsableCores:分配给Driver或Executor的CPU核心数。

  • ioEncryptionKey:I/O加密的密钥,当spark.io.encryption.enabled配置项启用时才有效。


SparkEnv初始化的组件

我们按照create()方法中的代码顺序,对SparkEnv内涉及到的组件做简要介绍。


SecurityManager

SecurityManager即安全管理器。它负责通过共享密钥的方式进行认证,以及基于ACL(Access Control List,访问控制列表)管理Spark内部的账号和权限。其初始化代码如下。

代码#7.3 - create()方法中SecurityManager的初始化


    val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}

ioEncryptionKey.foreach { _ =>
if (!securityManager.isEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
}
RpcEnv

RpcEnv即RPC环境。在前面已经讲过,Spark的各个实体间必然会涉及大量的网络通信,这些通信实体在Spark的RPC体系中会抽象为RPC端点(RpcEndpoint)及其引用(RpcEndpointRef)。RpcEnv为RPC端点提供处理消息的环境,并负责RPC端点的注册,端点之间消息的路由,以及端点的销毁等。RPC环境的初始化代码如下。

代码#7.4 - create()方法中RpcEnv的初始化

   val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)

if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
}

代码#7.5 - o.a.s.rpc.RpcEnv.create()方法


  def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}

Spark的RPC底层是利用Netty实现的,NettyRpcEnv目前也是RpcEnv唯一的实现类。RPC的内部细节很多,之后会用多篇文章来详细分析。


SerializerManager

SerializerManager即序列化管理器。在Spark存储或交换数据时,往往先需要将数据序列化或反序列化,为了节省空间可能还要对数据进行压缩,SerializerManager就是负责这些工作的组件。其初始化代码如下。

代码#7.6 - create()方法中SerializerManager的初始化


    val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

val closureSerializer = new JavaSerializer(conf)

instantiateClassFromConf()方法是create()方法内定义的,它调用了工具类Utils的classForName()方法,通过反射创建类的实例。序列化器的类型可以用SparkConf配置项spark.serializer指定,其默认值是org.apache.spark.serializer.JavaSerializer。我们在日常开发中常用的还有KryoSerializer。


序列化器有两个,serializer是数据的序列化器,closureSerializer则是闭包的序列化器。后者在调度逻辑(如DAGScheduler、TaskSetManager)中经常用到,其类型固定为JavaSerializer,不能修改。


BroadcastManager

BroadcastManager即广播管理器,它在前面的代码#4.3中已经出现过。它除了为用户提供广播共享数据的功能之外,在Spark Core内部也有广泛的应用,如共享通用配置项或通用数据结构等等。其初始化代码只有一句,不再贴了。


MapOutputTracker

MapOutputTracker即Map输出跟踪器。在Shuffle过程中,Map任务通过Shuffle Write阶段产生了中间数据,Reduce任务进行Shuffle Read时需要知道哪些数据位于哪个节点上,以及Map输出的状态等信息。MapOutputTracker就负责维护这些信息,其初始化代码如下。

代码#7.7 - create()方法中MapOutputTracker的初始化

    val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}

mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

可见是按照当前实体是Driver或Executor分为两种情况处理的。创建完MapOutputTracker实例之后,还会调用registerOrLookupEndpoint()方法,注册(Driver情况)或查找(Executor情况)对应的RPC端点,并返回其引用。


ShuffleManager

ShuffleManager即Shuffle管理器。顾名思义,它负责管理Shuffle阶段的机制,并提供Shuffle方法的具体实现。其初始化代码如下。

代码#7.8 - create()方法中ShuffleManager的初始化


   val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

ShuffleManager的种类可以通过配置项spark.shuffle.manager设置,默认为sort,即SortShuffleManager。取得对应的ShuffleManager类名之后,通过反射构建其实例。Shuffle是Spark计算过程中非常重要的一环,之后会深入地研究它。


MemoryManager

MemoryManager即内存管理器。顾名思义,它负责Spark集群节点内存的分配、利用和回收。Spark作为一个内存优先的大数据处理框架,内存管理机制是非常精细的,主要涉及存储和执行两大方面。其初始化代码如下。

代码#7.9 - create()方法中MemoryManager的初始化


    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}

MemoryManager有两种实现,可以使用spark.memory.useLegacyMode配置项控制使用哪种。旧版的内存管理器是StaticMemoryManager,即静态内存管理器。新版(1.6.0版本之后)的内存管理器是UnifiedMemoryManager,即统一内存管理器,它也是当前的默认实现,相对于静态内存管理而言也更为先进。在之后讲解涉及存储和计算方面的细节时,会一同探究MemoryManager的具体实现。


BlockManager

BlockManager即块管理器。块作为Spark内部数据的基本单位,与操作系统中的“块”和HDFS中的“块”都不太相同。它可以存在于堆内内存,也可以存在于堆外内存和外存(磁盘)中,是Spark数据的通用表示方式。BlockManager就负责管理块的存储、读写流程和状态信息,其初始化代码如下。

代码#7.10 - create()方法中BlockManager的初始化


   val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)

val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)

val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)

在初始化BlockManager之前,还需要先初始化块传输服务BlockTransferService,以及BlockManager的主节点BlockManagerMaster。BlockManager也是采用主从结构设计的,Driver上存在主RPC端点BlockManagerMasterEndpoint,而各个Executor上都存在从RPC端点BlockManagerSlaveEndpoint。


BlockManager是整个Spark存储子系统的基石,之后会先于上面的MemoryManager做介绍。


MetricsSystem

MetricsSystem即度量系统。它是Spark监控体系的后端部分,负责收集与输出度量(也就是各类监控指标)数据。度量系统由系统实例Instance、度量数据源Source、度量输出目的地Sink三部分组成。其在SparkEnv里的初始化代码如下。

代码7.11 - create()方法中MetricsSystem的初始化

640?wx_fmt=png

这里也是分两种情况处理的。在Driver端初始化MetricsSystem时,需要依赖TaskScheduler初始化完毕后生成的Application ID,故不会马上启动它,可以参见代码#2.7。在Executor端初始化时就不用等待,因为Executor ID已经存在了。


OutputCommitCoordinator

OutputCommitCoordinator即输出提交协调器。如果需要将Spark作业的结果数据持久化到外部存储(最常见的就是HDFS),就需要用到它来判定作业的每个Stage是否有权限提交。其初始化代码如下。

代码#7.12 - create()方法中OutputCommitCoordinator的初始化


640?wx_fmt=png

可见,在Driver上还注册了其RPC端点OutputCommitCoordinatorEndpoint,各个Executor会通过其引用来访问它。


SparkEnv的创建与保存

在create()方法的最后,会构建SparkEnv类的实例,创建Driver端的临时文件夹,并返回该实例。

代码#7.13 - SparkEnv.create()方法的结尾


    val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)

if (isDriver) {
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
envInstance.driverTmpDir = Some(sparkFilesDir)
}

envInstance

SparkEnv的全部初始化流程都在伴生对象中,其类中反而没有太多东西,主要是控制SparkEnv停止的相关逻辑,不再赘述。


如同SparkContext一样,SparkEnv在伴生对象中也会将已创建的实例保存起来,避免重复创建,也保证在同一节点上执行环境的一致性。get()与set()方法的代码非常简单,就不贴出来了。


总结

本文从SparkEnv的初始化方法入手,按顺序简述了十余个与Spark执行环境相关的内部组件及其初始化逻辑。这些组件与Spark框架的具体执行流程息息相关,我们之后也会深入研究其中的一部分,特别重要的如RPC环境RpcEnv、Shuffle管理器ShuffleManager、内存管理器MemoryManager、块管理器BlockManager等。

最后仍然用一张简图来概括。

640?wx_fmt=other

图#7.1 - SparkEnv初始化顺序


下一篇文章计划研究RPC环境。它比前面讲过的事件总线更加底层,因此也有更多的细节等着我们去探索。


— THE END — 640?wx_fmt=png

这篇关于Spark Core源码精读计划7 | Spark执行环境的初始化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143213

相关文章

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

C++初始化数组的几种常见方法(简单易懂)

《C++初始化数组的几种常见方法(简单易懂)》本文介绍了C++中数组的初始化方法,包括一维数组和二维数组的初始化,以及用new动态初始化数组,在C++11及以上版本中,还提供了使用std::array... 目录1、初始化一维数组1.1、使用列表初始化(推荐方式)1.2、初始化部分列表1.3、使用std::

在MySQL执行UPDATE语句时遇到的错误1175的解决方案

《在MySQL执行UPDATE语句时遇到的错误1175的解决方案》MySQL安全更新模式(SafeUpdateMode)限制了UPDATE和DELETE操作,要求使用WHERE子句时必须基于主键或索引... mysql 中遇到的 Error Code: 1175 是由于启用了 安全更新模式(Safe Upd

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

VScode连接远程Linux服务器环境配置图文教程

《VScode连接远程Linux服务器环境配置图文教程》:本文主要介绍如何安装和配置VSCode,包括安装步骤、环境配置(如汉化包、远程SSH连接)、语言包安装(如C/C++插件)等,文中给出了详... 目录一、安装vscode二、环境配置1.中文汉化包2.安装remote-ssh,用于远程连接2.1安装2

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

详解如何在React中执行条件渲染

《详解如何在React中执行条件渲染》在现代Web开发中,React作为一种流行的JavaScript库,为开发者提供了一种高效构建用户界面的方式,条件渲染是React中的一个关键概念,本文将深入探讨... 目录引言什么是条件渲染?基础示例使用逻辑与运算符(&&)使用条件语句列表中的条件渲染总结引言在现代

Java中的Opencv简介与开发环境部署方法

《Java中的Opencv简介与开发环境部署方法》OpenCV是一个开源的计算机视觉和图像处理库,提供了丰富的图像处理算法和工具,它支持多种图像处理和计算机视觉算法,可以用于物体识别与跟踪、图像分割与... 目录1.Opencv简介Opencv的应用2.Java使用OpenCV进行图像操作opencv安装j

mysql-8.0.30压缩包版安装和配置MySQL环境过程

《mysql-8.0.30压缩包版安装和配置MySQL环境过程》该文章介绍了如何在Windows系统中下载、安装和配置MySQL数据库,包括下载地址、解压文件、创建和配置my.ini文件、设置环境变量... 目录压缩包安装配置下载配置环境变量下载和初始化总结压缩包安装配置下载下载地址:https://d