本文主要是介绍39 BlockManager深入理解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
主要内容:
1. BlockManager源码再探
2. BlockManagerMaster
3. BlockManager具体读写数据源码
一、BlockManager 概述
BlockManager也是Master-slaves结构。Driver中的BlockManager会管理所有Executor中的BlockManager,在Executor启动时,会启动相应的BlockManager。
其次,BlockManager提供了读取和写数据的接口,可以从本地读写数据,也可以从远程读写数据。BlockManager管理的数据可以是存储在内存、磁盘以及OffHeap(堆外空间,例如Tachyon)。
BlockManager的结构:
BlockManager的默认构造参数如下,
private[spark] class BlockManager(executorId: String, //BlockManager运行在哪个Executor之上rpcEnv: RpcEnv, //远程通信体val master: BlockManagerMaster, //BlockManagerMaster,管理整个集群的BlockMangerdefaultSerializer: Serializer, //默认序列化器val conf: SparkConf,memoryManager: MemoryManager, //内存管理器mapOutputTracker: MapOutputTracker, //shuffle输出shuffleManager: ShuffleManager, //shuffle管理器blockTransferService: BlockTransferService, //用于Block间的网络通信(进行备份时)securityManager: SecurityManager,numUsableCores: Int)extends BlockDataManager with Logging {//磁盘中的Block管理器val diskBlockManager = new DiskBlockManager(this, conf)//Block信息,block与block存储的一些参数的对应关系private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
& Block是Spark从程序运行角度来讲,对数据的最小抽象单位。这里的Block可以存储在内存、磁盘以及OffHeap(Tachyon)中。
二、BlockManager 注册过程
1.Executor实例化时调用BlockManager#initialize方法。
/*** Executor实例化时*/
if (!isLocal) {env.metricsSystem.registerSource(executorSource)env.blockManager.initialize(conf.getAppId)}
& Executor实例化时会调用BlockManager#initialize方法,来实例化Executor上的BlockManager并且创建BlockManagerSlaveEndpoint这个消息循环体来接收Driver中的BlockManagerMaster发送过来的指令,例如删除Block等。
2.初始化BlockManager
/*** BlockManager#initialize* 通过appId初始化BlockManager,不在构造器中进行初始化,是由于在BlockManager实例化* AppId可能未知(例如,对于driver来说,只有在TaskSchedule注册后才知道应用程序的id)。* 此方法会负责初始化BlockTransferService和ShuffleClient,并到BlockManagerMaster进行* 注册,启动BlockManagerWorker通信体进行网络通信,如果在配置文件中设置了Shuffle * service,也会注册Shuffle service,说明Shuffle过程是要借助BlockManager对象完成的。*/def initialize(appId: String): Unit = {blockTransferService.init(this) //负责网络通信shuffleClient.init(appId) //负责ShuffleblockManagerId = BlockManagerId( //基于executorId获得BlockIdexecutorId, blockTransferService.hostName, blockTransferService.port)shuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}
//向Master注册master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)//如果存在ExternalShuffleServer则需注册ExternalShuffleServerif (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}}
3.向driver中的BlockManagerMaster注册。
/** 向driver中BlockManager进行注册本机的BlockManager的 id*BlockManagerMaster#registerBlockManager*/
defregisterBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long,slaveEndpoint: RpcEndpointRef): Unit = {logInfo("Trying to registerBlockManager")//注册BlockManagertell(RegisterBlockManager(blockManagerId,maxMemSize, slaveEndpoint))logInfo("RegisteredBlockManager")
}//BlockManager实例化时private valslaveEndpoint = rpcEnv.setupEndpoint("BlockManagerEndpoint" +BlockManager.ID_GENERATOR.next,new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
/***一个在Block 的Master-Slave结构中处于Slave节点上的消息通信体。其内部有一个守护线程
*不断的接收处理消息。*/
private[storage]
class BlockManagerSlaveEndpoint(override val rpcEnv: RpcEnv,blockManager: BlockManager,mapOutputTracker: MapOutputTracker)extends ThreadSafeRpcEndpoint with Logging {//创建消息处理线程private val asyncThreadPool =ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)// 根据接收消息的类型(也就是case Class的类型)不同进行相应的处理override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {//删除Blockcase RemoveBlock(blockId) =>doAsync[Boolean]("removing block " + blockId, context) {blockManager.removeBlock(blockId)true}//删除RDDcase RemoveRdd(rddId) =>doAsync[Int]("removing RDD " + rddId, context) {blockManager.removeRdd(rddId)}//删除Shuffle数据case RemoveShuffle(shuffleId) =>doAsync[Boolean]("removing shuffle " + shuffleId, context) {if (mapOutputTracker != null) {mapOutputTracker.unregisterShuffle(shuffleId)}SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)}//删除广播数据case RemoveBroadcast(broadcastId, _) =>doAsync[Int]("removing broadcast " + broadcastId, context) {blockManager.removeBroadcast(broadcastId, tellMaster = true)}//获得广播变量case GetBlockStatus(blockId, _) =>context.reply(blockManager.getStatus(blockId))//获取Shuffle数据case GetMatchingBlockIds(filter, _) =>context.reply(blockManager.getMatchingBlockIds(filter))//获取线程Dump消息case TriggerThreadDump =>context.reply(Utils.getThreadDump())}
}
回到正题,这里注册到Master是会调用tell方法。
/** 发送单向消息给driverEndpoint,若driverEndpoint返回为false则报错。*/private def tell(message: Any) {if (!driverEndpoint.askWithRetry[Boolean](message)) {throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")}}
这里,会有疑问?driverEndpoint并不是Driver端的消息循环体,而是BlockManagerMasterEndpoint。
参见以下源码。
//SparkEnv#create() 当有Worker中的Executor产生时会触发这个方法执行//所以BlockManager的实例化实在SparkEnv#create方法中,但此时BlockManager不可用,只有当
//Executor对其进行了初始化才能使用。
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),conf, isDriver)// 这里的blockManager是无效的,知道其initialize方法被Executor调用。val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializer, conf, memoryManager, mapOutputTracker, shuffleManager,blockTransferService, securityManager, numUsableCores)val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
//BlockManagerMaster.scala
class BlockManagerMaster(var driverEndpoint: RpcEndpointRef,conf: SparkConf,isDriver: Boolean)extends Logging {
& 其实如果把此处的driverEndpoint当成driver的消息循环体的话,在driver端接收处理消息的类型并没有这里发送的消息RegisterBlockManager,这就说明Driver消息循环器是不负责处理这个消息的。这里的命名有可能是开发者的笔误吧。
4.BlockManagerMasterEndpoint接收消息并处理
//BlockManagerMasterEndpoint# receiveAndReplycase RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>register(blockManagerId, maxMemSize, slaveEndpoint)context.reply(true) //此时,reply不为true会报异常
private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {val time = System.currentTimeMillis()//查看blockManagerInfo中是否已存在改BlockManagerId的信息,不存在才会创建if (!blockManagerInfo.contains(id)) { //关于blockManagerInfo的结构见提示// blockManagerIdByExecutor记录了集群中所有BlockManager与Executor的对应关系blockManagerIdByExecutor.get(id.executorId) match { case Some(oldId) => // executor已被其他Block Manager注册,所以将旧的去除(假定其已挂掉),替换//新的准备注册的Block Manager。logError("Got two different block manager registrations on same executor - "+ s" will replace old one $oldId with new one $id")removeExecutor(id.executorId) //详细过程见下页case None =>}logInfo("Registering block manager %s with %s RAM, %s".format(id.hostPort, Utils.bytesToString(maxMemSize), id))//将Executor与blockManager对应关系加入到数据结构blockManagerIdByExecutor中blockManagerIdByExecutor(id.executorId) = id创建新的BlockManagerInfo(Block中的元数据信息)blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)}//通知系统增加了blockManagerlistenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))}
& privateval blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
& blockManagerInfo主要是将 block manager的Id与其对应的block manager的信息对应起来。
& class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int)
extends Externalizable
& 而表示block manager id的BlockManagerId数据结构中记录了block manager所在的executor的id,host地址和端口号,这样就可以通过blockManagerInfo来获取block的信息了。
& BlockManager中重要的数据结果BlockManagerInfo。
& private[spark] class BlockManagerInfo(
val blockManagerId:BlockManagerId,
timeMs: Long,
val maxMem: Long,
val slaveEndpoint:RpcEndpointRef)
extends Logging {
& 这里BlockManagerInfo里的构造器参数都是由block注册时生成的,此时就有了这个block的id,以及其内存使用状况和通信体的引用。这时,就可给这个block发送指令执行相应的操作了。
//3处被调用
private def removeExecutor(execId: String) {logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)}
private def removeBlockManager(blockManagerId: BlockManagerId) {//由于blockManager管理多个Block,所以为了删除blockManager管理的所有Block需要先
//记录blockManager管理的信息。val info = blockManagerInfo(blockManagerId)// 从blockManagerIdByExecutor中删除该blockManager与Executor的对应关系.blockManagerIdByExecutor -= blockManagerId.executorId//先删除blockManager,然后利用info来删除内存中blockManager管理的所有block。blockManagerInfo.remove(blockManagerId)val iterator = info.blocks.keySet.iteratorwhile (iterator.hasNext) {val blockId = iterator.next//获得拥有该block信息的blockManager的集合,有可能有多个blockManager有同一个//block的信息(block会有副本)val locations = blockLocations.get(blockId)//去除集合中,我们要remove掉的blockManger的信息locations -= blockManagerIdif (locations.size == 0) {//如果这个集合为空则去除这条映射。blockLocations.remove(blockId)}}listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))logInfo(s"Removing block manager $blockManagerId")}
到这里BlockManager的注册过程就结束了。
三、BlockManager 功能性模块
以下,我们来看看BlockManager注册后,有哪些工作要做。
1.BlockManager报告其管理的所有Blocks信息
/*
*再次向BlockManager汇报所有的blocks。这样做是非常有必要的,如果掉线需要通过
*BlockManager找回blocks,或者在executor崩溃后,来恢复磁盘上的blocks。
*如果master返回是false时,这个函数则也是失败的。通过下次心跳执行,错误的条件将会被发现
*或者一个新的block的注册和将重新注册全部blocks的时候,这个方法将被使用。*/
private def reportAllBlocks(): Unit = {logInfo(s"Reporting ${blockInfo.size} blocks to the master.")for ((blockId, info) <- blockInfo) {val status = getCurrentBlockStatus(blockId, info)if (!tryToReportBlockStatus(blockId, info, status)) {logError(s"Failed to report $blockId to master; giving up.")return}}
}
这个方法的使用总结下来会是:
再次将所有的blocks汇报给BlockManager。这个方法强调所有的blocks必须都能在BlockManager的管理下,因为可能会出现各种因素,如slave需要重新注册、进程冲突导致block变化等,让blocks产生变化。
方法中使用到getCurrentBlockStatus方法,具体实现如下:
/*返回指定block所在存储块的最新信息。特别的,当block从内存移到磁盘时,更改其存储级别并
*更新内存和磁盘大小。*/
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {info.synchronized {info.level match {case null =>BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)case level =>val inMem = level.useMemory && memoryStore.contains(blockId)val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)val onDisk = level.useDisk && diskStore.contains(blockId)val deserialized = if (inMem) level.deserialized else falseval replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1val storageLevel =StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)val memSize = if (inMem) memoryStore.getSize(blockId) else 0Lval externalBlockStoreSize =if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0Lval diskSize = if (onDisk) diskStore.getSize(blockId) else 0LBlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)}}
}
2.通过blockid或得其所在的所有BlockManager
private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
//这里需要向Master要Block所在的BlockManager的信息。
val locations = master.getLocations(blockIds).toArraylogDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs)))locations
}
/** BlockManagerMaster#getLocation*Master端*通过向driverEndpoint(BlockManagerMasterEndpoint)发消息,让它去具体查找,并将结*果返回 */def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))}
// BlockManagerMasterEndpoint#receiveAndReply
case GetLocationsMultipleBlockIds(blockIds) =>context.reply(getLocationsMultipleBlockIds(blockIds))
//BlockManagerMasterEndpoint#getLocationsMultipleBlockIds 查询多个Blocks所在的
//BlockManagerMasters信息
private def getLocationsMultipleBlockIds(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {blockIds.map(blockId => getLocations(blockId))}
//BlockManagerMasterEndpoint#getLocations 分解为逐个查询每个Blocks所在的
//BlockManagerMasters信息
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {//查看内存缓存结构blockLocations中有该block,有则返回该block的信息,否则为空。if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty}
& blockLocations是一个以blockid为Key,值是BlockManager组成的集合。这里由于block有副本,所以一个block对应的应该是一个由多个BlockManager组成的集合。
3.从本地block中获取数据
/*** 从本blockManager获取block数据*/def getLocal(blockId: BlockId): Option[BlockResult] = {logDebug(s"Getting local block $blockId")doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]}
//具体获得block操作的实现方法
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {val info = blockInfo.get(blockId).orNullif (info != null) {info.synchronized {//使用block前,再次确认block的状态。保证block未被remove掉。if (blockInfo.get(blockId).isEmpty) {logWarning(s"Block $blockId had been removed")return None}// 如果其他线程在使用该block则需等待.if (!info.waitForReady()) {//读取失败.logWarning(s"Block $blockId was marked as failure.")return None}//获得block的存储级别val level = info.levellogDebug(s"Level for block $blockId is $level")// 如果存储级别是在内存中,则取内存中读数据块blockif (level.useMemory) {logDebug(s"Getting block $blockId from memory")val result = if (asBlockResult) { //是否block未被序列化//获得block块数据memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))} else {//获得block序列化后的字节数据memoryStore.getBytes(blockId)}result match {case Some(values) =>return resultcase None =>logDebug(s"Block $blockId not found in memory")}}//block的存储级别为offHeap即Tachyon中,处理逻辑与存储在内存中类似。if (level.useOffHeap) {logDebug(s"Getting block $blockId from ExternalBlockStore")if (externalBlockStore.contains(blockId)) {val result = if (asBlockResult) {externalBlockStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))} else {externalBlockStore.getBytes(blockId)}result match {case Some(values) =>return resultcase None =>logDebug(s"Block $blockId not found in ExternalBlockStore")}}}// 存储级别为磁盘,如果需要则自动会把block从磁盘存储到内存中。if (level.useDisk) {logDebug(s"Getting block $blockId from disk")//一般在磁盘中存储的block都是序列化后的block数据。val bytes: ByteBuffer = diskStore.getBytes(blockId) match {case Some(b) => bcase None =>throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")}assert(0 == bytes.position())if (!level.useMemory) {// 如果block不需要存储到内存中,则会直接返回block结果if (asBlockResult) {//是否需要获得block的结果,即反序列化后的结果return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,info.size))} else {return Some(bytes)}} else {// 否则,我们需要将一些数据存储到内存中。if (!level.deserialized || !asBlockResult) {/*如果block的存储级别是序列化后内存存储,那么我们只需将序列化后的字节存储*到内存中;否则,则需要将这些字节反序列化为对象存储在内存中。但是一般情况
*下我们只需要block序列化后的结果。*/memoryStore.putBytes(blockId, bytes.limit, () => {//这里使用函数字面量“()=>ByteBuffer”并且这个字面量是lazily级别的原因//在于如果文件的大小大于空闲内存的大小时,会发生内存溢出。此时,我们就 //不能将这些文件存储到内存中去,相应的copyForMemory对象就没有必要被创//建了。val copyForMemory = ByteBuffer.allocate(bytes.limit)copyForMemory.put(bytes)})bytes.rewind()}if (!asBlockResult) {//如果需要的不是反序列化后的block内容(对象),那就直//接返回序列化后的字节。return Some(bytes)} else {val values = dataDeserialize(blockId, bytes)if (level.deserialized) { // 现在内存中缓存要返回的结果val putResult = memoryStore.putIterator(blockId, values, level, returnValues = true, allowPersistToDisk = false)//这里的操作可能成功也可能失败,主要取决于内存中是否有足够的回滚空间,//但无论成功与否,都会返回一个iterator对象。putResult.data match {case Left(it) =>return Some(new BlockResult(it, DataReadMethod.Disk, info.size))case _ =>// 当内存中的数据drop到磁盘中时,磁盘中此数据丢失。throw new SparkException("Memory store did not return an iterator!")}} else { //不需要cache时,直接返回由BlockResult封装的反序列化后的block//数据。return Some(new BlockResult(values, DataReadMethod.Disk, info.size))}}}}}} else {logDebug(s"Block $blockId not registered locally")}None}
4. 读取远程的block数据
/*** 通过远程的blockManager获得block数据。*/def getRemote(blockId: BlockId): Option[BlockResult] = {logDebug(s"Getting remote block $blockId")doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]}
& 如果是远程读取block数据,由于blockId对应的block内容有若干个副本,此时只需要读取一个副本上的数据即可。
//具体的远程读数据的实现方法。private defdoGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {require(blockId != null, "BlockId isnull")//随机读取一个blockid对应的副本val locations =Random.shuffle(master.getLocations(blockId))var numFetchFailures = 0for (loc <- locations) {logDebug(s"Getting remote block$blockId from $loc")val data = try {//通过blockTransferService去拉取远程block的内容。blockTransferService.fetchBlockSync(loc.host, loc.port, loc.executorId,blockId.toString).nioByteBuffer()} catch {case NonFatal(e) =>//可能会出现拉取失败的情况numFetchFailures += 1if (numFetchFailures ==locations.size) {//如果拉取失败的次数等于副本次数// 则说明所有副本拉取数据失败,此时远程读取block操作失败。throw newBlockFetchException(s"Failed to fetch block from" +s" ${locations.size}locations. Most recent failure cause:", e)} else {// 这个block副本拉取数据失败,则从剩下副本中重新选取一个副本拉取block数据logWarning(s"Failed to fetchremote block $blockId " +s"from $loc (failed attempt$numFetchFailures)", e)null}}if (data != null) {if (asBlockResult) {return Some(new BlockResult(dataDeserialize(blockId, data),DataReadMethod.Network,data.limit()))} else {return Some(data)}}logDebug(s"The value of block $blockIdis null")}logDebug(s"Block $blockId notfound")None}
具体的远程读取block数据是通过Netty实现的。
// blockTransferService#fetchBlockSync
def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {// 监控等待的线程.val result = Promise[ManagedBuffer]()fetchBlocks(host, port, execId, Array(blockId),new BlockFetchingListener {override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {result.failure(exception)}override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {val ret = ByteBuffer.allocate(data.size.toInt)ret.put(data.nioByteBuffer())ret.flip()result.success(new NioManagedBuffer(ret))}})
// blockTransferService#fetchBlocks 抽象方法有NettyBlockTransferService实现。
override def fetchBlocks(host: String,port: Int,execId: String,blockIds: Array[String],listener: BlockFetchingListener): Unit
//NettyBlockTransferService#fetchBlocks
override def fetchBlocks(host: String,port: Int,execId: String,blockIds: Array[String],listener: BlockFetchingListener): Unit = {logTrace(s"Fetch blocks from $host:$port (executor id $execId)")try {val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {//通过C/S模式从远程进行通信,来拉去数据。val client = clientFactory.createClient(host, port)new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()}}//skip some codes}
5.添加新的block
添加block可以通过两种方法,putIterator和putArray。两者的逻辑相似,只有输入的代表Values的值的类型不同而已。两者的具体添加block操作的实现都是通过调用doPut方法实现。
/***根据store level将block提交给相应的BlockManager,视需要进行备份操作。*/private def doPut(blockId: BlockId,data: BlockValues,level: StorageLevel,tellMaster: Boolean = true,effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {//skip some codes// 创建需要返回的对象val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]/* 这里需要记住block的存储级别,以便我们在该block读入内存使用完后,在内存紧张时,*正确的将其drop到磁盘。在drop的过程中,其他线程是不能获取该block的内容的,直到*这里调用该block的BlockInfo 的markReady方法。*/val putBlockInfo = {val tinfo = new BlockInfo(level, tellMaster)// Do atomically !val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)if (oldBlockOpt.isDefined) {if (oldBlockOpt.get.waitForReady()) {logWarning(s"Block $blockId already exists on this machine; not re-adding it")return updatedBlocks}oldBlockOpt.get} else {tinfo}}
将传入的value保存到相应的block。
try {// returnValues – Whether to return thevalues put// blockStore - The type of storage toput these values intoval (returnValues, blockStore:BlockStore) = {if (putLevel.useMemory) {// 这里会先将block放在内存中,即使其使用useDisk也是可以获得数据,如果内存存储//不够的时候,就会将blockdrop到磁盘中。(true, memoryStore)} else if (putLevel.useOffHeap) {// 使用一个扩展的block存储器,例如Tachyon(false, externalBlockStore)} else if (putLevel.useDisk) {//?? Don't get back the bytes from put unless we replicate them(putLevel.replication > 1,diskStore)} else {assert(putLevel ==StorageLevel.NONE)throw new BlockException(blockId, s"Attempted to putblock $blockId without specifying storage level!")}}// 实际提交值的代码,将value存储到block中。val result = data match {case IteratorValues(iterator) =>blockStore.putIterator(blockId,iterator, putLevel, returnValues)case ArrayValues(array) =>blockStore.putArray(blockId, array,putLevel, returnValues)case ByteBufferValues(bytes) =>bytes.rewind()blockStore.putBytes(blockId, bytes,putLevel)}
追踪更新的blocks和BlockStatus。
//持续追踪需要从内存dropped的blocksif (putLevel.useMemory) {result.droppedBlocks.foreach { updatedBlocks += _ }}//获取block的最新的状态信息。val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)if (putBlockStatus.storageLevel != StorageLevel.NONE) {// 此时block put成功,告诉master其他线程可以读取这个block了。marked = trueputBlockInfo.markReady(size)if (tellMaster) {reportBlockStatus(blockId, putBlockInfo, putBlockStatus) //9}updatedBlocks += ((blockId, putBlockStatus))}}
Block的备份
// 异步方式进行副本的存储if (putLevel.replication > 1) {data match {case ByteBufferValues(bytes) =>if (replicationFuture != null) {Await.ready(replicationFuture, Duration.Inf)}case _ =>val remoteStartTime = System.currentTimeMillis// Serialize the block if not already doneif (bytesAfterPut == null) {if (valuesAfterPut == null) {throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")}bytesAfterPut = dataSerialize(blockId, valuesAfterPut)}replicate(blockId, bytesAfterPut, putLevel) //10logDebug("Put block %s remotely took %s".format(blockId, Utils.getUsedTimeMs(remoteStartTime)))}}BlockManager.dispose(bytesAfterPut)if (putLevel.replication > 1) {logDebug("Putting block %s with replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))} else {logDebug("Putting block %s without replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))}updatedBlocks}
具体告诉Master,数据已经成功的put到block中了。
BlockManagerMaster端更新BlockInfo
//blockManager#reportBlockStatus 9处调用
private def reportBlockStatus(blockId: BlockId,info: BlockInfo,status: BlockStatus,droppedMemorySize: Long = 0L): Unit = {//向blockManagerMaster发送消息,告诉其更新BlockStatus信息val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)if (needReregister) {logInfo(s"Got told to re-register updating block $blockId")// Re-registering will report our new block for free.asyncReregister()}logDebug(s"Told master about block $blockId")}
//BlockManager#tryToReportBlockStatusprivate def tryToReportBlockStatus(blockId: BlockId,info: BlockInfo,status: BlockStatus,droppedMemorySize: Long = 0L): Boolean ={if (info.tellMaster) {val storageLevel = status.storageLevelval inMemSize = Math.max(status.memSize,droppedMemorySize)val inExternalBlockStoreSize =status.externalBlockStoreSizeval onDiskSize = status.diskSize//BlockManagerMaster会更新其BlockInfomaster.updateBlockInfo(blockManagerId, blockId, storageLevel,inMemSize, onDiskSize, inExternalBlockStoreSize)} else {true}}
6.备份副本操作
//复制块到另一个节点。实际上会开辟一个线程来执行备份操作。
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)val numPeersToReplicateTo = level.replication - 1val peersForReplication = new ArrayBuffer[BlockManagerId]val peersReplicatedTo = new ArrayBuffer[BlockManagerId]val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]val tLevel = StorageLevel(level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)val startTime = System.currentTimeMillisval random = new Random(blockId.hashCode)var replicationFailed = falsevar failures = 0var done = false//获得可以备份block的节点集合是一个listpeersForReplication ++= getPeers(forceFetch = false)//该循环完成了备份操作,其执行逻辑如下://一个接一个的选择需要进行进行备份Block的节点,并尝试上传要备份的block到该节点上。//?复制到该节点的Block内?//如果复制失败(例如目标节点挂掉),则会强制在进行备份block的节点list中的所有节点//都重新从driver的BlockManagerMaster中拉去最新的block状态数据,(即多个节点同时备//份一个block时,block状态改变需要通知所有相关blockManager更新数据);然后重新随//机选择一个备份block的节点,并将备份失败的节点加入到黑名单中。while (!done) { //循环结束,即备份结束的条件,以下三者之一满足即可//(1)备份已达到指定的数量,备份成功。//(2)备份失败的次数太多,达到一个阈值//(3)没有可以继续备份的节点getRandomPeer() match { //10见下页case Some(peer) =>try {val onePeerStartTime = System.currentTimeMillisdata.rewind()logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")//利用blockTransferService进行远程的数据传输,备份blockTransferService.uploadBlockSync(peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms".format(System.currentTimeMillis - onePeerStartTime))//将备份成功的节点加入相应已备份的list中,并从可备份的list删去该节点。peersReplicatedTo += peerpeersForReplication -= peerreplicationFailed = falseif (peersReplicatedTo.size == numPeersToReplicateTo) {done = true // 备份的个数达到要求的数量。备份结束。}} catch { //备份失败case e: Exception =>logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)failures += 1replicationFailed = truepeersFailedToReplicateTo += peerif (failures > maxReplicationFailures) { // 备份失败次数达到阈值,备份结束done = true }}case None => // 没有还可以利用的节点进行备份了done = true}}//随机得到一个备份节点。备份节点是通过选择block的id确定的。因此,这里假设可以备份//block节点的集合的列表peerForReplication没有改变且没有出现复制失败的情况,如果?在//同一个节点多次尝试复制相同的块中,相同的一组对等体将被选中。def getRandomPeer(): Option[BlockManagerId] = {// 如果复制失败,然后强制清空可以利用来做备份的节点列表,并重新获得可以备份block//的节点集合,并去掉那些已经做过备份的节点,和备份失败的节点。即获得新的可以利用的//备份节点集合if (replicationFailed) {peersForReplication.clear()peersForReplication ++= getPeers(forceFetch = true)peersForReplication --= peersReplicatedTopeersForReplication --= peersFailedToReplicateTo}if (!peersForReplication.isEmpty) { //不为空随机选取一个节点做备份Some(peersForReplication(random.nextInt(peersForReplication.size)))} else { //为空达到退出循环的条件三,结束循环。None}}val timeTakeMs = (System.currentTimeMillis - startTime)logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")if (peersReplicatedTo.size < numPeersToReplicateTo) {logWarning(s"Block $blockId replicated to only " +s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")}}
& 默认备份失败的阈值为1,但是可以通过配置文件中设置spark.storage.maxReplicationFailures来配置。maxReplicationFailures定义如下所示。
& val maxReplicationFailures =conf.getInt("spark.storage.maxReplicationFailures", 1)
7. 内存和磁盘的转换dropFromMemory操作
所谓的dropFromMemory操作,其实就是指当内存在程序运行过程中不够时,系统会尝试释放一部分内存给要使用内存的应用或操作,此时需要考虑释放的那部分内存中的数据是直接丢弃掉,还是转存到磁盘上。这使一个关键性问题,例如在如下场景中,5000个步骤作为一个Stage,前面进行到第4000个步骤时,进行了cache操作(假设cache了100万个partition),由于在计算时内存使用比较紧张,系统丢弃了100个partition,如果丢弃的这100个partition后面的步骤要利用的话,则需要对这100个partition进行重新计算,即重新执行这4000个步骤。但是如果用户设置了storeLevel的级别为MEMORY_AND_DISK的方式,此时,由于在计算时内存使用比较紧张,就会把部分内存的数据转存到磁盘中,然后在释放这部分内存来供系统使用。以后的步骤要利用这些数据时,所需做的操作就是从磁盘中读数据到内存即可,而不是对相应的partition重新计算前4000个步骤。
& 这里的MEMORY_AND_DISK方式的storeLevel,并不是将数据放在磁盘和内存,而是优先将数据放在内存中,只有在内存不够的时候才会考虑部分放到磁盘。如果不是MEMORY_AND_DISK级别的话,那么在进行dropFromMemory操作时,这部分内存存储的数据就会被直接丢弃。
& 这里的storeLevel是在用户应用程序代码中进行cache或persist时设置的。
/***当内存存储器存储容量满了,但仍需要一些空闲空间来进行计算操作,我们便会将一些内存的数
*据进行drop操作(如果程序配置了需要drop到磁盘就将数据drop到磁盘,否则直接丢弃)。* 如果数据不需要drop到磁盘,我们会直接丢弃掉这部分数据。* 如果给定的block状态发生了改变(数据drop到磁盘中的block),我们就返回block的状态信*息,否则返回为空.*/def dropFromMemory(blockId: BlockId,data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {logInfo(s"Dropping block $blockId from memory")val info = blockInfo.get(blockId).orNull// 如果block数据还没进行drop操作if (info != null) {info.synchronized {if (!info.waitForReady()) {// 数据读入内存中的block出错.logWarning(s"Block $blockId was marked as failure. Nothing to drop")return None} else if (blockInfo.get(blockId).isEmpty) {logWarning(s"Block $blockId was already dropped.")return None}var blockIsUpdated = falseval level = info.level//如果用户设置的storage level需要,则Drop内存数据到磁盘if (level.useDisk && !diskStore.contains(blockId)) {logInfo(s"Writing block $blockId to disk")data() match {//根据不同的值类型进行相应的将block写入磁盘操作case Left(elements) => diskStore.putArray(blockId, elements, level, returnValues = false)case Right(bytes) =>diskStore.putBytes(blockId, bytes, level)}blockIsUpdated = true}// 得到drop掉的内存大小。val droppedMemorySize =if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L//drop内存中的blockval blockIsRemoved = memoryStore.remove(blockId) //11if (blockIsRemoved) {blockIsUpdated = true} else {logWarning(s"Block $blockId could not be dropped from memory as it does not exist")}//报告master block的状态改变val status = getCurrentBlockStatus(blockId, info)if (info.tellMaster) {reportBlockStatus(blockId, info, status, droppedMemorySize)}if (!level.useDisk) {// 若果不需要转存到磁盘,则block会直接drop掉,相应的blockInfo中直接删掉//该blockblockInfo.remove(blockId)}//返回更新了的block状态信息。if (blockIsUpdated) {return Some(status)}}}None}
Memeory Storage 去除内存中的blockid对应的block。
/MemeoryStore#removeoverride defremove(blockId: BlockId): Boolean = memoryManager.synchronized {//主要是从entries数据结果中删除blockval entry = entries.synchronized {entries.remove(blockId) }if (entry != null) {memoryManager.releaseStorageMemory(entry.size)logDebug(s"Block $blockId of size${entry.size} dropped " +s"from memory (free ${maxMemory -blocksMemoryUsed})")true} else {false}}
这里的entries是一个linkedHashMap,主要用于存储block与MemoryEntry对应关系。
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
可以看出MemoryStorage是利用这个LinkedHashMap来维护所使用的内存的。具体MemoryEntry表示一个block中数据存储的信息,如下:
private case classMemoryEntry(value: Any, size: Long, deserialized: Boolean)
这些信息主要有,值,大小,是否反序列化。
所以Memory Storage的删除block操作,主要是从LinkedHashMap中去除该block的信息。
8.补充:在写数据到磁盘时使用了NIO
def getDiskWriter(blockId: BlockId,file: File,serializerInstance: SerializerInstance,bufferSize: Int,writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)val syncWrites = conf.getBoolean("spark.shuffle.sync", false)new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream,syncWrites, writeMetrics, blockId)}
//DiskBlockObjectWriter.scala 创建磁盘读写对象。
private[spark] class DiskBlockObjectWriter(val file: File,serializerInstance: SerializerInstance,bufferSize: Int,compressStream: OutputStream => OutputStream,syncWrites: Boolean,// These write metrics concurrently shared with other active DiskBlockObjectWriters who// are themselves performing writes. All updates must be relative.writeMetrics: ShuffleWriteMetrics,val blockId: BlockId = null)extends OutputStreamwith Logging {/** 这里利用NIO的channel机制进行文件的写操作. */private var channel: FileChannel = null
至此,BlockManger的源码大部分已经进行了解析。介绍了BlockManger的主要功能性的模块,基本上对于BlockManager讲解完了。
_____________________________________EOF____________________________________________________________________________感谢:网上提供spark资料和书籍的朋友。
这篇关于39 BlockManager深入理解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!