源码- Spark Broadcast源码分析

2024-05-27 12:48
文章标签 分析 源码 spark broadcast

本文主要是介绍源码- Spark Broadcast源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本博文的主要内容包括:
1、Broadcast功能描述
2、Broadcast创建过程
3、Broadcast读写原理
一、功能描述
Broadcast是指将数据从一个节点发送到其他节点,供其计算使用,是spark在计算过程中非常常用的方式,通常使用方式,包括共享配置文件,map数据集,树形数据结构等,为能够更好更快速为TASK任务使用相关变量。但是Broadcast不适合存放过大的数据,这会导致网络IO性能变差或者过重的单点压力。
Broadcast的基本用法:

      本文是借鉴网络大神的经验,结合自己的走读的一些总结,如有雷同之处,希望谅解!


二、创建过程


Broadcast是典型的建造者模式方法,相对内部设计相对较为简单,同时初始化并非直接创建Broadcast对象,作用有两个方面:

1)依据配置属性(spark.broadcast.factory)创建BroadcastFactory对象 - 反射创建。

2)将sparkConf对象注入Broadcast中,同时定义压缩编码。


初始化入口sparkContext启动时创建,调用过程如下:

1)SparkContext#构造方法
2)SparkEnv#create
3)BroadcastManager#initialize()
4)TorrentBroadcastFactoryr#initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager)
5)TorrentBroadcast#initialize(_isDriver: Boolean, conf: SparkConf)


SparkContext初始化SparkEnv,在SparkEnv内创建BroadcastManager,代码如下:

SparkContext

[html] view plain copy
  1. // Create the Spark execution environment (cache, map output tracker, etc)    
  2.   // 创建spark的执行环境    
  3.   private[spark] val env = SparkEnv.create(    
  4.     conf, // spark配置文件    
  5.     "<driver>",    
  6.     conf.get("spark.driver.host"), // 主机名    
  7.     conf.get("spark.driver.port").toInt, // 端口号    
  8.     isDriver = true, // 默认启动SparkContext客户端,便是Driver    
  9.     isLocal = isLocal,// 是否是本地运行,是通过master获取该值,如果是submit提交,请参考SparkSubmitArguments类,会将参数转换为master    
  10.     listenerBus = listenerBus     
  11.     /* spark监听总线(LiveListenerBus),他是负责监听spark事件,包括job启动和介绍、BlockManage的添加等等,简单理解UI能看到的变化都是这块监听的,   
  12.      * 如果有时间,可以将这块与大家分享一下,底层使用队列实现,典型观察者模式实现,未使用akka实现 */    
  13.     )    
  14.   SparkEnv.set(env) // 注册SparkEnv对象  

SparkEnv中初始化BroadcastManager
[html] view plain copy
  1. val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,    
  2.       serializer, conf, securityManager, mapOutputTracker, shuffleManager)    
  3.     
  4. val connectionManager = blockManager.connectionManager    
  5.     
  6. val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)    
  7.     
  8. val cacheManager = new CacheManager(blockManager)  

BroadcastManager构造函数调用initialize方法构建
[html] view plain copy
  1. // Called by SparkContext or Executor before using Broadcast    
  2.   // 一个context仅初始化一次,默认是Torrent    
  3.   private def initialize() {    
  4.     // TODO 初始化BroadcastFactory    
  5.     // 1.确定仅有第一次进入时,创建BroadcastFactory对象    
  6.     // 2.初始化BroadcastFactory,并与BroadcastManager建立hook    
  7.         
  8.     synchronized {    
  9.       if (!initialized) {    
  10.    
  11.         val broadcastFactoryClass =    
  12.           conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")  //默认采用<span style="font-family: Arial, Helvetica, sans-serif;">TorrentBroadcastFactory</span>  
  13.         broadcastFactory =    
  14.           Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]    
  15.     
  16.         //初始化BroadcastFactory,并与BroadcastManager建立hook    
  17.         broadcastFactory.initialize(isDriver, conf, securityManager)    
  18.         //表示第一次进入完毕    
  19.         initialized = true    
  20.       }    
  21.     }    
  22.   }  

TorrentBroadcastFactory调用initialize方法
[java] view plain copy
  1. override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {    
  2.     TorrentBroadcast.initialize(isDriver, conf)    
  3. }  

将sparkConf对象注入Broadcast中,并定义压缩方式
[java] view plain copy
  1. /** 初始化TorrentBroadcast属性 */    
  2.   def initialize(_isDriver: Boolean, conf: SparkConf) {    
  3.     TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests    
  4.     synchronized {    
  5.       if (!initialized) {    
  6.         compress = conf.getBoolean("spark.broadcast.compress"true)    
  7.         compressionCodec = CompressionCodec.createCodec(conf)    
  8.         initialized = true    
  9.       }    
  10.     }    
  11.   }  

broadcast是在sparkContext创建时完成的,broadcast类型、压缩方式也是在创建过程中完成的定义,但是,目前还无法实现app中不同job使用不同的broadcast,广播的方式只能选择TorrentBroadcast和HttpBroadcast的一种。spark默认使用TorrentBroadcast(并发),效率相对要比http要好,同时避免单机热点的产生,比较适合分布式系统的思想。思想类似于迅雷BT下载,已使用的executor越多,速度越快。


Broadcast创建

driver首先要将值序列化到byteArray中,然后再按block大小进行分割(默认是4M),将信息存放在driver的blockmanage中,并通知BlockManageMaster,完成注册,并可以让所有executor读取,存储方式MEMORY_AND_DISK。

使用write顺序:

1)SparkContext#broadcast  外层方法,使用sc.broadcast 进行广播

2) BroadcastManager#newBroadcast(value_ : T, isLocal: Boolean)

3)TorrentBroadcastFactory#newBroadcast(value_ : T, isLocal: Boolean, id:Long)

4)TorrentBroadcast#构造函数

5)TorrentBroadcast#writeBlocks

6)BlockManage#putBytes(

      blockId: BlockId,

      bytes: ByteBuffer,

      level: StorageLevel,

      tellMaster: Boolean = true,

      effectiveStorageLevel:Option[StorageLevel] = None)   最终存储

当然,使用广播较为简单,但是,如果sparkContext长时间执行多个job时,则考虑注销广播,或者尽量广播要小,否则会造成性能严重下降,具体原因尚未研究。

注销方式代码如下:

[java] view plain copy
  1. val broadcastValue = sc.broadcast(存储值)  
  2. broadcastValue.unpersist() //方法一  
  3. SparkEnv.get.broadcastManager.unbroadcast(id, falsefalse//方法二  

创建时,使用SparkContext的broadcast方法,并将值一直传递至TorrentBroadcast,并构建TorrentBroadcast对象,同时完成将值交给BlockManage进行注册,并序列化在本地存储。(SparkEnv.get.blockManager.putBytes方法)

TorrentBroadcast

[java] view plain copy
  1. private[spark] class TorrentBroadcast[T: ClassTag](    
  2.     obj : T,    
  3.     @transient private val isLocal: Boolean,    
  4.     id: Long)    
  5.   extends Broadcast[T](id) with Logging with Serializable {    
  6.     
  7.     
  8.  /** 1.driver是直接读取本地的值  
  9.   *  2.其他executor是依靠blockManager读取(readObject) */    
  10.   @transient private var _value: T = obj    
  11.     
  12.   /* 固定格式:  
  13.   * broadcastId = broadcast_广播ID  
  14.   * blockID = broadcast_广播ID_piece[1,2,3,4] */    
  15.   private val broadcastId = BroadcastBlockId(id)    
  16.     
  17.   /** 1.广播值交给blockManager管理  
  18.    *  2.广播转换为ByteArray,返回数据块的长度 */    
  19.   private val numBlocks: Int = writeBlocks()    
  20.     
  21.   override protected def getValue() = _value  
[java] view plain copy
  1. }  

writeBlocks是主要执行写方法,主要功能便是按照定义的广播块大小切分数据(默认是4M,spark.broadcast.blockSize),其后将块注册blockManage,并写入本地磁盘中。

writeBlocks(){
   1.blockifyObject  数据切分方法
   2.BlockManage.putBytes  数据存储方法
}

blockifyObject  代码如下:

[java] view plain copy
  1. /** 切分数据,方法较为实用,可作为工具类 
  2.    *  @param obj 切分数据对象 */    
  3.   def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {    
  4.     // TODO: Create a special ByteArrayOutputStream that splits the output directly into chunks    
  5.     // so we don't need to do the extra memory copy.    
  6.     // TODO 数据切块,按照默认的4M切分数据块,返回4MByteBuffer(数据体检变小)    
  7.     //      数据 -> 压缩 -> 序列化 -> 分割    
  8.     // 1. 声明输出流(定义压缩方式和序列化)    
  9.     // 2. 压缩后数据按4M进行分割    
  10.     // 3. 返回ByteBuffer字符    
  11.         
  12.     // 1.0 定义输出流    
  13.     val bos = new ByteArrayOutputStream()    
  14.     // 1.1 包装压缩方式    
  15.     val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos    
  16.     // 1.2 创建序列化对象    
  17.     val ser = SparkEnv.get.serializer.newInstance()    
  18.     // 1.3 包装序列化输出流(默认java序列化,不过一般推荐KryoSerializer,建议修改spark-defaults.conf)    
  19.     val serOut = ser.serializeStream(out)    
  20.     // 1.4 将value写至ByteArray中    
  21.     serOut.writeObject[T](obj).close()    
  22.     val byteArray = bos.toByteArray    
  23.     // 2.0 将ByteArray转换为输入流    
  24.     val bais = new ByteArrayInputStream(byteArray)    
  25.     // 2.1 获取分割块数,ceil有余数+1    
  26.     val numBlocks = math.ceil(byteArray.length.toDouble / BLOCK_SIZE).toInt    
  27.     // 2.2 定义数据块集合    
  28.     val blocks = new Array[ByteBuffer](numBlocks)    
  29.     // 2.3 定义块ID    
  30.     var blockId = 0    
  31.     // 2.4 循环按4M分割数据块,步长为4M    
  32.     for (i <- 0 until (byteArray.length, BLOCK_SIZE)) {    
  33.       // 2.4.1 定义装载4M的byte的容器    
  34.       val thisBlockSize = math.min(BLOCK_SIZE, byteArray.length - i)    
  35.       val tempByteArray = new Array[Byte](thisBlockSize)    
  36.       // 2.4.2 装载数据    
  37.       bais.read(tempByteArray, 0, thisBlockSize)    
  38.       blocks(blockId) = ByteBuffer.wrap(tempByteArray)    
  39.       // 2.4.3 index加一    
  40.       blockId += 1    
  41.     }    
  42.     // 3.0 切分结束,关闭流    
  43.     bais.close()    
  44.     // 3.1 返回流    
  45.     blocks    
  46.   }  


Broadcast读取

broadcase写入是优先写入依据存储策略写入本地(BlockManage#putBytes方法),既然序列化数据是本地存储,由此而来的问题是读取问题,BlockManage存储数据并不似hdfs会依据备份策略存储多份数据放置不同节点(但是多提一句,spark的taskScheblue是拥有类似机架感知策略分配任务),如没有备份数据,那么必然产生一下数个问题:
   1.节点故障,无法访问节点数据
   2.数据热点,所有任务皆使用该数据
   3.网络传输,所有节点频繁访问单节点
那么解决该问题,spark并没有使用HDFS的思想,而选择是P2P点对点方式(BT下载)解决问题,是只要使用过broadcase数据,则在本接节点存储数据,由此变成新的数据源,随和数据源不断增加速度也会越来越快,刚开始传输则相对会慢一些,同时,以上不建议使用大文件broadcase,亦是如此,如果使用较为频繁的数据,他相当于每个节点都要存储一份,形成网状传输方式交换数据,因此建议存储配置文件或某种数据结构为上佳选择。
 
调用顺序:
1)TorrentBroadcast#readObject()
2)TorrentBroadcast#readBlocks()
3)BlockManage#getLocalBytes(blockId:BlockId) / getRemoteBytes(blockId: BlockId)
4)BlockManage#putBytes()

readObject是broadcase读取的主方法,管理整个读取策略

[java] view plain copy
  1. /** Used by the JVM when deserializing this object. */    
  2.   private def readObject(in: ObjectInputStream) {    
  3.     // TODO 读取广播变量,有便读取本地,没有则远程并存储在本地    
  4.         
  5.     // 1.0 可读取对象中静态变量    
  6.     in.defaultReadObject()    
  7.     // 2.0 读取广播变量(单个executor独享)    
  8.     TorrentBroadcast.synchronized {    
  9.       // 2.1 读取本地广播数据    
  10.       SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {    
  11.         // 2.2 获取本地数据成功    
  12.         case Some(x) =>    
  13.           _value = x.asInstanceOf[T]    
  14.         // 2.3 获取本地数据失败    
  15.         case None =>    
  16.           // 2.4 获取Blocks,同时将块存储到本地    
  17.           logInfo("启动读取 broadcast variable " + id)    
  18.           val start = System.nanoTime()    
  19.           val blocks = readBlocks()    
  20.           val time = (System.nanoTime() - start) / 1e9    
  21.           logInfo("Reading broadcast variable " + id + " took " + time + " s")    
  22.     
  23.           // 2.5 将数据块反序列化,并解压缩    
  24.           _value = TorrentBroadcast.unBlockifyObject[T](blocks)    
  25.           // Store the merged copy in BlockManager so other tasks on this executor don't    
  26.           // need to re-fetch it.    
  27.           SparkEnv.get.blockManager.putSingle(    
  28.             broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)    
  29.       }    
  30.     }    
  31.   }  


readBlocks则是实现P2P思想的具体实现者,代码如下:
[java] view plain copy
  1. /** Fetch torrent blocks from the driver and/or other executors. */    
  2.   private def readBlocks(): Array[ByteBuffer] = {    
  3.     // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported    
  4.     // to the driver, so other executors can pull these chunks from this executor as well.    
  5.         
  6.     // 1.0 定义数据块集合    
  7.     val blocks = new Array[ByteBuffer](numBlocks)    
  8.     // 1.1 引用blockManager    
  9.     val bm = SparkEnv.get.blockManager    
  10.         
  11.     // 2.0 循环遍历所有块,避免访问热点,随机顺序读    
  12.     for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {    
  13.       // 2.1 组装块ID    
  14.       val pieceId = BroadcastBlockId(id, "piece" + pid)    
  15.     
  16.       // First try getLocalBytes because  there is a chance that previous attempts to fetch the    
  17.       // broadcast blocks have already fetched some of the blocks. In that case, some blocks    
  18.       // would be available locally (on this executor).    
  19.       // 2.2 他会先查本地,继而查询远程,但是前面已经查找的是广播,现在查找的是认数据块(区别)    
  20.       var blockOpt = bm.getLocalBytes(pieceId)    
  21.       // 2.3 如果本地为查询到结果,则通过blockManager远程获取,并将数据存储到本地    
  22.       if (!blockOpt.isDefined) {    
  23.         blockOpt = bm.getRemoteBytes(pieceId)    
  24.         blockOpt match {    
  25.           case Some(block) =>    
  26.             // If we found the block from remote executors/driver's BlockManager, put the block    
  27.             // in this executor's BlockManager.    
  28.             SparkEnv.get.blockManager.putBytes(    
  29.               pieceId,    
  30.               block,    
  31.               StorageLevel.MEMORY_AND_DISK_SER,    
  32.               tellMaster = true)    
  33.     
  34.           case None =>    
  35.             throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)    
  36.         }    
  37.       }    
  38.       // If we get here, the option is defined.    
  39.       // 3.0 赋值数据块集合    
  40.       blocks(pid) = blockOpt.get    
  41.     }    
  42.     // 3.1 返回数据块    
  43.     blocks    
  44.   }  

相关配置属性说明:(在spark-default.conf中设置)
spark.broadcast.factory 定义使用http或Torrent方式,默认是Torrent,无需修改
spark.broadcast.blockSize 数据库块大小,blockifyObject依据此属性切分数据块,默认4M
spark.broadcast.compress 是否压缩,默认是使用,sparkcontext初始化该属性,无需修改。


这篇关于源码- Spark Broadcast源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007527

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。