Spark Core源码精读计划3 | SparkContext辅助属性及后初始化

本文主要是介绍Spark Core源码精读计划3 | SparkContext辅助属性及后初始化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

推荐阅读

《关于MQ面试的几件小事 | 消息队列的用途、优缺点、技术选型》         《关于MQ面试的几件小事 | 如何保证消息队列高可用和幂等》 《关于MQ面试的几件小事 | 如何保证消息不丢失》 《关于MQ面试的几件小事 | 如何保证消息按顺序执行》 《关于MQ面试的几件小事 | 消息积压在消息队列里怎么办》 《关于Redis的几件小事 | 使用目的与问题及线程模型》 《关于Redis的几件小事 | Redis的数据类型/过期策略/内存淘汰》 《关于Redis的几件小事 | 高并发和高可用》 《关于Redis的几件小事 | 持久化/缓存雪崩与穿透》 《关于Redis的几件小事 | 缓存与数据库双写时的数据一致性》 《关于Redis的几件小事 | 并发竞争和Cluster模式》 本文适用于知识共享-署名-相同方式共享(CC-BY-SA)3.0协议 目录
  • 前言

  • SparkContext中的辅助属性

    • creationSite

    • allowMultipleContexts

    • startTime & stopped

    • addedFiles/addedJars & _files/_jars

    • persistentRdds

    • executorEnvs & _executorMemory & _sparkUser

    • checkpointDir

    • localProperties

    • _eventLogDir & _eventLogCodec

    • _applicationId & _applicationAttemptId

    • _shutdownHookRef

    • nextShuffleId & nextRddId

  • SparkContext后初始化

    • setupAndStartListenerBus()方法

    • postEnvironmentUpdate()方法

    • postApplicationStart()方法

    • 其他事项

  • 总结


前言

在文章#2中,我们了解了SparkContext的主体部分,即组件初始化。除了它之外,SparkContext中还有一些与其内部机制紧密相关的属性,下文为了简单,就将它们称为“辅助属性”。另外,在组件初始化完成后,还有一些善后工作,即后初始化(Post-init)。本文就来研究这两块内容。

SparkContext中的辅助属性

仿照文章#2中的方式,仍然先将我们要关注的这些属性整理出来。 代码#3.1 - SparkContext中的辅助属性
 private val creationSite: CallSite = Utils.getCallSite()  private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false)  val startTime = System.currentTimeMillis()  private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)  private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala  private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala  private[spark] val persistentRdds = {    val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()    map.asScala  }  private[spark] val executorEnvs = HashMap[String, String]()  val sparkUser = Utils.getCurrentUserName()  private[spark] var checkpointDir: Option[String] = None  protected[spark] val localProperties = new InheritableThreadLocal[Properties] {    override protected def childValue(parent: Properties): Properties = {      SerializationUtils.clone(parent)    }    override protected def initialValue(): Properties = new Properties()  }  private val nextShuffleId = new AtomicInteger(0)  private val nextRddId = new AtomicInteger(0)
private var _eventLogDir: Option[URI] = None private var _eventLogCodec: Option[String] = None private var _executorMemory: Int = _ private var _applicationId: String = _ private var _applicationAttemptId: Option[String] = None private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _


以下划线开头的字段如同代码#2.2中一样,也有对应的Getter方法。 为了节省篇幅,就不列出来了。 下面按照它们初始化的顺序和相关性来介绍,必要时仍然会附上一些源码。


creationSite
creationSite指示SparkContext是在哪里创建的。 CallSite是个简单的数据结构,只有shortForm与longForm两个属性,用来描述代码的位置。 Utils.getCallSite()方法遍历当前线程的线程栈,并找到最后一个(即最靠近栈顶的)Spark方法调用,与最先一个(即最靠近栈底的)用户方法调用,将它们的短形式和长形式包装在CallSite中返回。 有兴趣的看官可以自行去看这个方法的源代码,不难。 以代码#0.1的WordCount为例,运行时打上断点,观察creationSite的内容如下图。


640?wx_fmt=other


allowMultipleContexts
allowMultipleContexts指示是否允许一个JVM(即一个Application)内存在多个活动的SparkContext实例。 它由spark.driver.allowMultipleContexts参数控制,默认为false,即只允许存在一个活动的SparkContext实例,如果有多个就会抛出异常。 设为true的话,在有多个活动的SparkContext时只会输出警告。 关于它在下一篇文章中还会涉及到,这里就不多说了。


startTime & stopped
startTime指示SparkContext启动时的时间戳。 stopped则指示SparkContext是否停止,它采用AtomicBoolean类型。


addedFiles/addedJars & _files/_jars
Spark支持在提交应用时,附带用户自定义的其他文件与JAR包。 addedFiles和addedJars是两个ConcurrentHashMap,用来维护自定义文件及JAR包的URL路径,及它们被加入ConcurrentHashMap当时的时间戳。 _files与_jars则接受Spark配置中定义的文件或JAR包路径。 由于它们的逻辑基本相同, 下面以JAR包为例来看一下代码。 代码#3.2 - 构造方法中自定义JAR包的初始化
    _jars = Utils.getUserJars(_conf)    if (jars != null) {      jars.foreach(addJar)    }

首先用Utils.getUserJars()方法从SparkConf的spark.jars配置项中取出路径组成的序列,然后分别调用addJar()方法。 代码#3.3 - o.a.s.SparkContext.addJar()方法
  def addJar(path: String) {    def addJarFile(file: File): String = {      try {        if (!file.exists()) {          throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")        }        if (file.isDirectory) {          throw new IllegalArgumentException(            s"Directory ${file.getAbsoluteFile} is not allowed for addJar")        }        env.rpcEnv.fileServer.addJar(file)      } catch {        case NonFatal(e) =>          logError(s"Failed to add $path to Spark environment", e)          null      }    }
if (path == null) { logWarning("null specified as parameter to addJar") } else { val key = if (path.contains("\\")) { addJarFile(new File(path)) } else { val uri = new URI(path) Utils.validateURL(uri) uri.getScheme match { case null => addJarFile(new File(uri.getRawPath)) case "file" => addJarFile(new File(uri.getPath)) case "local" => "file:" + uri.getPath case _ => path } } if (key != null) { val timestamp = System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() } } } }


addJar()方法检查JAR包路径的合法性和类型,然后调用RpcEnv中的RpcEnvFileServer.addJar()方法,将JAR包加进RPC环境中。 在该方法的最后还调用了postEnvironmentUpdate(),用来更新执行环境,这属于后初始化逻辑的一部分,下一节会讲到。


persistentRdds
Spark支持RDD的持久化,可以持久化到内存或磁盘。 persistentRdds维护的是持久化RDD的ID与其弱引用的映射关系。 通过RDD内自带的cache()/persist()/unpersist()方法可以持久化与反持久化一个RDD,它们最终调用的是SparkContext.persistRDD()/unpersistRDD()内部方法。 代码#3.4 - o.a.s.SparkContext.persistRDD()与unpersistRDD()方法
  private[spark] def persistRDD(rdd: RDD[_]) {    persistentRdds(rdd.id) = rdd  }
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) { env.blockManager.master.removeRdd(rddId, blocking) persistentRdds.remove(rddId) listenerBus.post(SparkListenerUnpersistRDD(rddId)) }

executorEnvs & _executorMemory & _sparkUser
executorEnvs是一个HashMap,用来存储需要传递给Executor的环境变量。 _executorMemory与_sparkUser就是其中之二,分别代表Executor内存大小和当前启动SparkContext的用户名。


代码#3.5 - 构造方法中Executor环境变量的初始化
    _executorMemory = _conf.getOption("spark.executor.memory")      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))      .orElse(Option(System.getenv("SPARK_MEM"))      .map(warnSparkMem))      .map(Utils.memoryStringToMb)      .getOrElse(1024)
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES") = v } executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser


可见,Executor内存可以通过spark.executor.memory配置项、SPARK_EXECUTOR_MEMORY环境变量、SPARK_MEM环境变量指定,优先级依次降低,且默认大小是1GB。 用户名是通过Utils.getCurrentUserName()方法获得的。


checkpointDir
checkpointDir指定集群状态下,RDD检查点在HDFS上保存的目录。 检查点的存在是为了当计算过程出错时,能够快速恢复,而不必从头重新计算。 SparkContext提供了setCheckpointDir()方法用来设定检查点目录,如下。 代码#3.6 - o.a.s.SparkContext.setCheckpointDir()方法
 
 def setCheckpointDir(directory: String) {    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {      logWarning("Spark is not running in local mode, therefore the checkpoint directory " +        s"must not be on the local filesystem. Directory '$directory' " +        "appears to be on the local filesystem.")    }
checkpointDir = Option(directory).map { dir => val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) fs.getFileStatus(path).getPath.toString } }

localProperties
localProperties用于维护一个Properties数据类型的线程本地变量。 它是InheritableThreadLocal类型,继承自ThreadLocal,在后者的基础上允许本地变量从父线程到子线程的继承,也就是该Properties会沿着线程栈传递下去。


_eventLogDir & _eventLogCodec
这两个属性与EventLoggingListener相关。 EventLoggingListener打开时,事件日志会写入_eventLogDir指定的目录,可以用spark.eventLog.dir参数设置。 _eventLogCodec指定事件日志的压缩算法,当通过spark.eventLog.compress参数启用压缩后,就根据spark.io.compression.codec参数配置压缩算法,目前支持lz4、lzf、snappy、zstd四种。


_applicationId & _applicationAttemptId
这两个ID都是TaskScheduler初始化完毕并启动之后才分配的。 TaskScheduler启动之后,应用代码的逻辑才真正被执行,并且可能会进行多次尝试。 在SparkUI、BlockManager和EventLoggingListener初始化时,也会用到它们。 代码#3.7 - 构造方法中_applicationId与_applicationAttemptId的初始化
 
   _applicationId = _taskScheduler.applicationId()    _applicationAttemptId = taskScheduler.applicationAttemptId()
_shutdownHookRef
它用来定义SparkContext的关闭钩子,主要是在JVM退出时,显式地执行SparkContext.stop()方法,以防止用户忘记而留下烂摊子。 这实际上是后初始化逻辑,在下面的代码#3.8中会出现。


nextShuffleId & nextRddId
这两个ID都是AtomicInteger类型。 Shuffle和RDD都需要唯一ID来进行标识,并且它们是递增的。 在代码#3.4中已经出现过了RDD ID。

SparkContext后初始化

在文章#2的ContextCleaner初始化之后,还有一小部分后初始化逻辑,其代码如下所示。 代码#3.8 - SparkContext后初始化逻辑
    setupAndStartListenerBus()    postEnvironmentUpdate()    postApplicationStart()
_taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) }
logDebug("Adding shutdown hook") // force eager creation of logger _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") stop() }
    SparkContext.setActiveContext(this, allowMultipleContexts)
它的主要逻辑在开头的三个方法中,下面来逐一看它们的代码。


setupAndStartListenerBus()方法
代码#3.9 - o.a.s.SparkContext.setupAndStartListenerBus()方法
  private def setupAndStartListenerBus(): Unit = {    try {      conf.get(EXTRA_LISTENERS).foreach { classNames =>        val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)        listeners.foreach { listener =>          listenerBus.addToSharedQueue(listener)          logInfo(s"Registered listener ${listener.getClass().getName()}")        }      }    } catch {      case e: Exception =>        try {          stop()        } finally {          throw new SparkException(s"Exception when registering SparkListener", e)        }    }
listenerBus.start(this, _env.metricsSystem) _listenerBusStarted = true }

这个方法用于注册自定义的监听器,并最终启动LiveListenerBus。 自定义监听器都实现了SparkListener特征,通过spark.extraListeners配置参数来指定。 然后调用Utils.loadExtensions()方法,通过反射来构建自定义监听器的实例,并将它们注册到LiveListenerBus。


postEnvironmentUpdate()方法
代码#3.10 - o.a.s.SparkContext.postEnvironmentUpdate()方法
  private def postEnvironmentUpdate() {    if (taskScheduler != null) {      val schedulingMode = getSchedulingMode.toString      val addedJarPaths = addedJars.keys.toSeq      val addedFilePaths = addedFiles.keys.toSeq      val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,        addedFilePaths)      val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)      listenerBus.post(environmentUpdate)    }  }

该方法在添加自定义文件和JAR包时也都有调用,因为添加的资源会对程序的执行环境造成影响。 它会取得当前的自定义文件和JAR包列表,以及Spark配置、调度方式,然后通过SparkEnv.environmentDetails()方法再取得JVM参数、Java系统属性等,一同封装成SparkListenerEnvironmentUpdate事件,并投递给事件总线。


postApplicationStart()方法
代码#3.11 - o.a.s.SparkContext.postApplicationStart()方法
  private def postApplicationStart() {    listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),      startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))  }

这个方法比较简单,就是向事件总线投递SparkListenerApplicationStart事件,表示Application已经启动。


其他事项
在这三个方法之后的其他事项如下。
  • 调用TaskScheduler.postStartHook()方法,等待SchedulerBackend初始化完毕。

  • 在度量系统中注册DAGScheduler、BlockManager、ExecutionAllocationManager的度量源,以收集它们的监控数据。

  • 添加关闭钩子,这个在之前已经提过了,不再赘述。

  • 调用伴生对象中的setActiveContext()方法,将当前SparkContext设为活动的。


总结

本文通过梳理SparkContext中的多个辅助属性,进一步了解了一些细节特性,如外部文件和JAR包的初始化、RDD持久化和检查点等。 在SparkContext构造方法的最后,还会执行一些扫尾的工作,如启动事件总线、更新执行环境等。 SparkContext除了初始化之外,还对外提供了不少通用的功能,如生成RDD,产生广播变量与累加器,启动Job等等。 另外,SparkContext类也有伴生对象,里面维护了一些常用的逻辑。 下一篇文章作为SparkContext概况的收尾,就来研究这些剩下的东西。


— THE END —

640?wx_fmt=png

这篇关于Spark Core源码精读计划3 | SparkContext辅助属性及后初始化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143210

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

滚雪球学Java(87):Java事务处理:JDBC的ACID属性与实战技巧!真有两下子!

咦咦咦,各位小可爱,我是你们的好伙伴——bug菌,今天又来给大家普及Java SE啦,别躲起来啊,听我讲干货还不快点赞,赞多了我就有动力讲得更嗨啦!所以呀,养成先点赞后阅读的好习惯,别被干货淹没了哦~ 🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,助你一臂之力,带你早日登顶🚀,欢迎大家关注&&收藏!持续更新中,up!up!up!! 环境说明:Windows 10

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

c++的初始化列表与const成员

初始化列表与const成员 const成员 使用const修饰的类、结构、联合的成员变量,在类对象创建完成前一定要初始化。 不能在构造函数中初始化const成员,因为执行构造函数时,类对象已经创建完成,只有类对象创建完成才能调用成员函数,构造函数虽然特殊但也是成员函数。 在定义const成员时进行初始化,该语法只有在C11语法标准下才支持。 初始化列表 在构造函数小括号后面,主要用于给

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除